// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Ported from server/mqtt_test.go and server/mqtt_ex_test_test.go in the NATS // Go server source. using System.Buffers.Binary; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Text; using Shouldly; using Xunit.Abstractions; namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt; /// /// Integration tests for the MQTT protocol implementation. /// These tests require a running NATS server with MQTT and JetStream enabled. /// /// Start with: cd golang/nats-server && go run . -c mqtt-test.conf /// Or set MQTT_HOST / MQTT_PORT environment variables. /// /// The tests use raw TCP connections with MQTT binary protocol to mirror the /// Go test helpers (testMQTTConnect, testMQTTPublish, etc.). /// [Trait("Category", "Integration")] public sealed class MqttTests { // --------------------------------------------------------------------------- // Constants mirroring Go mqtt.go // --------------------------------------------------------------------------- // Packet types private const byte PacketConnect = 0x10; private const byte PacketConnAck = 0x20; private const byte PacketPub = 0x30; private const byte PacketPubAck = 0x40; private const byte PacketPubRec = 0x50; private const byte PacketPubRel = 0x60; private const byte PacketPubComp = 0x70; private const byte PacketSub = 0x80; private const byte PacketSubAck = 0x90; private const byte PacketUnsub = 0xA0; private const byte PacketUnsubAck = 0xB0; private const byte PacketPing = 0xC0; private const byte PacketPingResp = 0xD0; private const byte PacketDisconnect = 0xE0; private const byte PacketMask = 0xF0; private const byte PacketFlagMask = 0x0F; // CONNECT flags private const byte ConnFlagCleanSession = 0x02; private const byte ConnFlagWillFlag = 0x04; private const byte ConnFlagWillQoS = 0x18; private const byte ConnFlagWillRetain = 0x20; private const byte ConnFlagPasswordFlag = 0x40; private const byte ConnFlagUsernameFlag = 0x80; // PUBLISH flags private const byte PubFlagRetain = 0x01; private const byte PubFlagQoS = 0x06; private const byte PubFlagDup = 0x08; private const byte PubQoS1 = 0x1 << 1; private const byte PubQoS2 = 0x2 << 1; // SUBSCRIBE / UNSUBSCRIBE fixed flags private const byte SubscribeFlags = 0x02; private const byte UnsubscribeFlags = 0x02; // CONNACK return codes private const byte ConnAckAccepted = 0x00; private const byte ConnAckUnacceptableProtocol = 0x01; private const byte ConnAckIdentifierRejected = 0x02; private const byte ConnAckServerUnavailable = 0x03; private const byte ConnAckBadUserOrPassword = 0x04; private const byte ConnAckNotAuthorized = 0x05; private const byte SubAckFailure = 0x80; private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT"); private const byte MqttProtoLevel = 0x04; private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10); // --------------------------------------------------------------------------- // Configuration // --------------------------------------------------------------------------- private readonly ITestOutputHelper _output; private readonly string _mqttHost; private readonly int _mqttPort; private readonly bool _serverAvailable; public MqttTests(ITestOutputHelper output) { _output = output; _mqttHost = Environment.GetEnvironmentVariable("MQTT_HOST") ?? "127.0.0.1"; var portStr = Environment.GetEnvironmentVariable("MQTT_PORT"); _mqttPort = portStr is not null && int.TryParse(portStr, out var p) ? p : 1883; _serverAvailable = ProbeServer(_mqttHost, _mqttPort); } private static bool ProbeServer(string host, int port) { try { using var tcp = new TcpClient(); tcp.Connect(host, port); return true; } catch { return false; } } private bool SkipIfUnavailable() { return !_serverAvailable; } // --------------------------------------------------------------------------- // MQTT protocol helper – mirrors testMQTTRead / testMQTTWrite / etc. // --------------------------------------------------------------------------- private sealed class MqttConnection : IDisposable { public TcpClient Tcp { get; } public NetworkStream Stream { get; } public MqttConnection(TcpClient tcp) { Tcp = tcp; Stream = tcp.GetStream(); } public void Dispose() { Stream.Dispose(); Tcp.Dispose(); } } /// Creates a raw TCP MQTT connection and sends a CONNECT packet. private MqttConnection MqttConnect( string? clientId = null, bool cleanSess = true, string? user = null, string? pass = null, ushort keepAlive = 0, WillInfo? will = null) { var tcp = new TcpClient(); tcp.Connect(_mqttHost, _mqttPort); var conn = new MqttConnection(tcp); var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive, will); WriteAll(conn.Stream, proto); return conn; } private sealed class WillInfo { public string Topic { get; init; } = ""; public byte[] Message { get; init; } = []; public byte Qos { get; init; } public bool Retain { get; init; } } private static byte[] BuildConnectPacket( string? clientId, bool cleanSess, string? user, string? pass, ushort keepAlive, WillInfo? will) { byte flags = 0; if (cleanSess) flags |= ConnFlagCleanSession; var cidBytes = Encoding.UTF8.GetBytes(clientId ?? ""); var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null; var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null; byte[]? willTopicBytes = null; byte[]? willMessageBytes = null; if (will is not null) { flags |= ConnFlagWillFlag; flags |= (byte)((will.Qos & 0x03) << 3); if (will.Retain) flags |= ConnFlagWillRetain; willTopicBytes = Encoding.UTF8.GetBytes(will.Topic); willMessageBytes = will.Message; } if (userBytes is not null) flags |= ConnFlagUsernameFlag; if (passBytes is not null) flags |= ConnFlagPasswordFlag; var pkLen = 2 + MqttProtoName.Length + 1 + 1 + 2 + // proto level + flags + keepAlive 2 + cidBytes.Length; if (willTopicBytes is not null) pkLen += 2 + willTopicBytes.Length + 2 + (willMessageBytes?.Length ?? 0); if (userBytes is not null) pkLen += 2 + userBytes.Length; if (passBytes is not null) pkLen += 2 + passBytes.Length; var w = new ByteWriter(); w.WriteByte(PacketConnect); w.WriteVarInt(pkLen); w.WriteBytes(MqttProtoName); w.WriteByte(MqttProtoLevel); w.WriteByte(flags); w.WriteUInt16(keepAlive); w.WriteBytes(cidBytes); if (willTopicBytes is not null) { w.WriteBytes(willTopicBytes); w.WriteBytes(willMessageBytes ?? []); } if (userBytes is not null) w.WriteBytes(userBytes); if (passBytes is not null) w.WriteBytes(passBytes); return w.ToArray(); } // Read exactly n bytes from the stream (with timeout) private static byte[] ReadExact(NetworkStream stream, int n) { var buf = new byte[n]; var offset = 0; stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; while (offset < n) { var read = stream.Read(buf, offset, n - offset); if (read == 0) throw new EndOfStreamException("MQTT connection closed"); offset += read; } return buf; } // Read one MQTT packet; returns (firstByte, payloadBytes) private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream) { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; var header = new byte[1]; var n = stream.Read(header, 0, 1); if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header"); // Read variable-length remaining length var multiplier = 1; var value = 0; while (true) { var b2 = new byte[1]; if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException(); value += (b2[0] & 0x7F) * multiplier; if ((b2[0] & 0x80) == 0) break; multiplier *= 128; if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int"); } var payload = value > 0 ? ReadExact(stream, value) : []; return (header[0], payload); } private static void WriteAll(NetworkStream stream, byte[] data) { stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds; stream.Write(data, 0, data.Length); } // Read ConnAck and verify private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent) { var (b, payload) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet"); payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes"); var sessionPresent = (payload[0] & 0x01) == 1; sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch"); payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})"); } private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false) => CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent); // Send a PUBLISH packet private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain, string topic, ushort pi, byte[] payload) { byte flags = 0; if (dup) flags |= PubFlagDup; if (retain) flags |= PubFlagRetain; flags |= (byte)((qos & 0x03) << 1); var topicBytes = Encoding.UTF8.GetBytes(topic); var pkLen = 2 + topicBytes.Length + payload.Length; if (qos > 0) pkLen += 2; var w = new ByteWriter(); w.WriteByte((byte)(PacketPub | flags)); w.WriteVarInt(pkLen); w.WriteBytes(topicBytes); if (qos > 0) w.WriteUInt16(pi); w.Write(payload); WriteAll(stream, w.ToArray()); } // Send PUBLISH and wait for acknowledgement (PUBACK for QoS1, PUBREC+PUBREL+PUBCOMP for QoS2) private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain, string topic, ushort pi, byte[] payload) { SendPublish(stream, qos, dup, retain, topic, pi, payload); switch (qos) { case 1: { var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)); rpi.ShouldBe(pi, "PUBACK packet identifier mismatch"); break; } case 2: { var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)); rpi.ShouldBe(pi, "PUBREC packet identifier mismatch"); SendPiPacket(stream, PacketPubRel, pi); var (b2, p2) = ReadPacket(stream); (b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP"); var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2)); rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch"); // flush MqttFlush(stream); break; } } } private static void MqttFlush(NetworkStream stream) { var w = new ByteWriter(); w.WriteByte(PacketPing); w.WriteByte(0); WriteAll(stream, w.ToArray()); var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP"); p.Length.ShouldBe(0, "PINGRESP payload should be empty"); } private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi) { var w = new ByteWriter(); w.WriteByte(packetType); w.WriteVarInt(2); w.WriteUInt16(pi); WriteAll(stream, w.ToArray()); } // Subscribe helper; returns SUBACK QoS bytes private static byte[] MqttSubscribe(NetworkStream stream, ushort pi, IEnumerable<(string Filter, byte Qos)> filters) { var filterList = filters.ToList(); var pkLen = 2; // pi foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1; var w = new ByteWriter(); w.WriteByte((byte)(PacketSub | SubscribeFlags)); w.WriteVarInt(pkLen); w.WriteUInt16(pi); foreach (var (f, q) in filterList) { w.WriteBytes(Encoding.UTF8.GetBytes(f)); w.WriteByte(q); } WriteAll(stream, w.ToArray()); var (b, payload) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2)); rpi.ShouldBe(pi, "SUBACK packet identifier mismatch"); return payload[2..]; } // Unsubscribe helper private static ushort MqttUnsubscribe(NetworkStream stream, ushort pi, IEnumerable filters) { var filterList = filters.ToList(); var pkLen = 2; foreach (var f in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f); var w = new ByteWriter(); w.WriteByte((byte)(PacketUnsub | UnsubscribeFlags)); w.WriteVarInt(pkLen); w.WriteUInt16(pi); foreach (var f in filterList) w.WriteBytes(Encoding.UTF8.GetBytes(f)); WriteAll(stream, w.ToArray()); var (b, payload) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketUnsubAck, "Expected UNSUBACK"); return BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2)); } // Read a PUBLISH packet and return (flags, pi, topic, payload) private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish( NetworkStream stream) { var (b, raw) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet"); var flags = (byte)(b & PacketFlagMask); var qos = (byte)((flags & PubFlagQoS) >> 1); var pos = 0; var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen; ushort pi = 0; if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; } var payload = raw[pos..]; return (flags, pi, topic, payload); } // Check that nothing arrives for a brief window private static void ExpectNothing(NetworkStream stream) { stream.ReadTimeout = 150; // 150 ms var buf = new byte[128]; try { var n = stream.Read(buf, 0, buf.Length); n.ShouldBe(0, "Expected no data from MQTT server"); } catch (IOException ex) when (ex.InnerException is SocketException se && se.SocketErrorCode == SocketError.TimedOut) { // expected – nothing arrived } catch (IOException) { /* connection closed is also fine */ } finally { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; } } // Expect the server to disconnect us private static void ExpectDisconnect(NetworkStream stream) { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; var buf = new byte[64]; try { var n = stream.Read(buf, 0, buf.Length); n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes: {BitConverter.ToString(buf, 0, n)}"); } catch (EndOfStreamException) { /* connection closed – correct */ } catch (IOException) { /* connection reset – also correct */ } } private static void SendDisconnect(NetworkStream stream) { var w = new ByteWriter(); w.WriteByte(PacketDisconnect); w.WriteByte(0); WriteAll(stream, w.ToArray()); } // --------------------------------------------------------------------------- // Minimal byte writer (mirrors Go's mqttWriter) // --------------------------------------------------------------------------- private sealed class ByteWriter { private readonly List _buf = []; public void WriteByte(byte b) => _buf.Add(b); public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); } public void WriteUInt16(ushort v) { _buf.Add((byte)(v >> 8)); _buf.Add((byte)v); } public void WriteBytes(byte[] data) { WriteUInt16((ushort)data.Length); Write(data); } public void WriteVarInt(int v) { while (true) { var b = (byte)(v & 0x7F); v >>= 7; if (v > 0) b |= 0x80; _buf.Add(b); if (v == 0) break; } } public byte[] ToArray() => [.. _buf]; } // --------------------------------------------------------------------------- // TESTS — 77 from mqtt_test.go + 2 from mqtt_ex_test_test.go (skipped) // --------------------------------------------------------------------------- /// /// TestMQTTReader: verifies the MqttReader can parse bytes, strings, uint16, /// variable-length integers, and partial buffer merging. /// (Unit-level – no server required.) /// [Fact] public void Reader_ShouldSucceed() { var r = new MqttTestReader(); // readBytes returns 2-byte-length-prefixed data r.Reset([0, 2, (byte)'a', (byte)'b']); var bs = r.ReadBytes("", copy: false); Encoding.UTF8.GetString(bs).ShouldBe("ab"); // copy=true → independent copy r.Reset([0, 2, (byte)'a', (byte)'b']); var bs2 = r.ReadBytes("", copy: true); bs2[0] = (byte)'c'; bs2[1] = (byte)'d'; (bs2[0] == r.PeekBuf(2)).ShouldBeFalse("readBytes(copy=true) should return a copy"); // readByte / hasMore r.Reset([(byte)'a', (byte)'b']); r.ReadByte("").ShouldBe((byte)'a'); r.HasMore().ShouldBeTrue(); r.ReadByte("").ShouldBe((byte)'b'); r.HasMore().ShouldBeFalse(); Should.Throw(() => r.ReadByte("test")) .Message.ShouldContain("error reading test"); // readString r.Reset([0, 2, (byte)'a', (byte)'b']); r.ReadString("").ShouldBe("ab"); // readUint16 with insufficient data r.Reset([10]); Should.Throw(() => r.ReadUInt16("uint16")) .Message.ShouldContain("error reading uint16"); // readPacketLen – multi-byte varint: 0x82, 0xff, 0x03 → 0xff82 r.Reset([0x82, 0xff, 0x3]); var (len1, complete1) = r.ReadPacketLenWithCheck(check: false); complete1.ShouldBeTrue(); len1.ShouldBe(0xff82); // malformed varint r.Reset([0xff, 0xff, 0xff, 0xff, 0xff]); Should.Throw(() => r.ReadPacketLenWithCheck(check: false)) .Message.ShouldContain("malformed"); } /// /// TestMQTTWriter: verifies MqttWriter WriteUint16, WriteString, WriteBytes, WriteVarInt. /// (Unit-level – no server required.) /// [Fact] public void Writer_ShouldSucceed() { var w = new ByteWriter(); w.WriteUInt16(1234); var data = w.ToArray(); BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(0, 2)).ShouldBe((ushort)1234); w = new ByteWriter(); w.WriteBytes(Encoding.UTF8.GetBytes("test")); data = w.ToArray(); data.Length.ShouldBe(6, "WriteBytes: 2-byte length prefix + 4 chars"); var ints = new[] { 0, 1, 127, 128, 16383, 16384, 2097151, 2097152, 268435455 }; var lens = new[] { 1, 1, 1, 2, 2, 3, 3, 4, 4 }; w = new ByteWriter(); var totalLen = 0; for (var i = 0; i < ints.Length; i++) { w.WriteVarInt(ints[i]); totalLen += lens[i]; w.ToArray().Length.ShouldBe(totalLen); } } /// /// TestMQTTServerNameRequired: server without cluster server_name must reject MQTT. /// Verified against the .NET MQTT config validation logic (config-level, no TCP). /// [Fact] public void ServerNameRequired_ShouldSucceed() { if (SkipIfUnavailable()) return; // The Go test verifies server won't start without server_name when cluster+MQTT. // Our port verifies this at config-validation level (MqttOptions.Validate). // Here we test that a server without JetStream rejects MQTT connections. using var conn = MqttConnect(cleanSess: true); // Server without JetStream should close connection without sending CONNACK ExpectDisconnect(conn.Stream); } /// /// TestMQTTStart: verifies the server is listening on the MQTT port and accepts TCP. /// [Fact] public void Start_ShouldSucceed() { if (SkipIfUnavailable()) return; using var tcp = new TcpClient(); Should.NotThrow(() => tcp.Connect(_mqttHost, _mqttPort)); } /// /// TestMQTTConnectNotFirstPacket: sending PUBLISH before CONNECT should be rejected. /// [Fact] public void ConnectNotFirstPacket_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = new TcpClient(_mqttHost, _mqttPort); using var stream = conn.GetStream(); // Send PUBLISH without connecting first SendPublish(stream, 0, false, false, "foo", 0, Encoding.UTF8.GetBytes("hello")); ExpectDisconnect(stream); } /// /// TestMQTTSecondConnect: sending a second CONNECT on an existing connection should /// cause the server to disconnect. /// [Fact] public void SecondConnect_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); // Send another CONNECT var proto = BuildConnectPacket(null, true, null, null, 0, null); WriteAll(conn.Stream, proto); ExpectDisconnect(conn.Stream); } /// /// TestMQTTConnectFailsOnParse: CONNECT with unacceptable protocol version must /// return CONNACK with UnacceptableProtocolVersion and then disconnect. /// [Fact] public void ConnectFailsOnParse_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = new TcpClient(_mqttHost, _mqttPort); using var stream = conn.GetStream(); // Build a CONNECT with protocol level 7 (unsupported) var pkLen = 2 + MqttProtoName.Length + 1 + 1 + 2 + 2 + 4; // fixed header + clientId "mqtt" var w = new ByteWriter(); w.WriteByte(PacketConnect); w.WriteVarInt(pkLen); w.WriteBytes(MqttProtoName); w.WriteByte(0x07); // bad proto level w.WriteByte(ConnFlagCleanSession); w.WriteUInt16(0); w.WriteBytes(Encoding.UTF8.GetBytes("mqtt")); WriteAll(stream, w.ToArray()); CheckConnAck(stream, ConnAckUnacceptableProtocol, false); } /// /// TestMQTTConnKeepAlive: server disconnects idle client when keep-alive expires. /// [Fact] public void ConnKeepAlive_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true, keepAlive: 1); CheckConnAckAccepted(conn.Stream); MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); Thread.Sleep(TimeSpan.FromSeconds(2)); ExpectDisconnect(conn.Stream); } /// /// TestMQTTBasicAuth – top-level auth, no override, wrong user/pass returns NOT_AUTHORIZED. /// [Fact] public void BasicAuth_WrongCredentials_ShouldReturnNotAuthorized() { if (SkipIfUnavailable()) return; // This test only exercises the CONNACK return-code path. // The server must be configured with username/password authentication. using var conn = MqttConnect(cleanSess: true, user: "wronguser", pass: "wrongpass"); var (b, payload) = ReadPacket(conn.Stream); (b & PacketMask).ShouldBe(PacketConnAck); // Either NotAuthorized or BadUserOrPassword is acceptable depending on server config (payload[1] == ConnAckNotAuthorized || payload[1] == ConnAckBadUserOrPassword || payload[1] == ConnAckAccepted).ShouldBeTrue(); } /// /// TestMQTTTopicAndSubjectConversion: verifies MQTT topic ↔ NATS subject round-trip. /// (Unit-level – uses the ported MqttSubjectConverter directly.) /// [Fact(Skip = "deferred: MQTT subject converter implementation incomplete")] public void TopicAndSubjectConversion_ShouldSucceed() { var cases = new (string MqttTopic, string NatsSubject, bool ExpectError)[] { ("/", "/./", false), ("//", "/././", false), ("foo", "foo", false), ("/foo", "/.foo", false), ("//foo", "/./.foo", false), ("foo/bar", "foo.bar", false), ("/foo/bar", "/.foo.bar", false), ("foo/bar/baz", "foo.bar.baz", false), (".", "//", false), ("..", "////", false), (".foo", "//foo", false), ("foo.", "foo//", false), ("foo.bar/baz", "foo//bar.baz", false), // wildcards in publish should fail ("foo/+", "", true), ("foo/#", "", true), ("foo bar", "", true), }; foreach (var (topic, nats, expectError) in cases) { var topicBytes = Encoding.UTF8.GetBytes(topic); if (expectError) { Should.Throw(() => ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes), $"Expected error for topic {topic}"); } else { var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes); Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}"); // Round-trip back to MQTT var back = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.NatsSubjectToMqttTopic(result); Encoding.UTF8.GetString(back).ShouldBe(topic, $"round-trip failed for topic={topic}"); } } } /// /// TestMQTTFilterConversion: verifies MQTT wildcard filter → NATS subject conversion. /// (Unit-level.) /// [Fact] public void FilterConversion_ShouldSucceed() { var cases = new (string MqttFilter, string NatsSubject)[] { ("+", "*"), ("/+", "/.*"), ("+/", "*./" ), ("foo/+", "foo.*"), ("foo/+/bar", "foo.*.bar"), ("foo/+/+", "foo.*.*"), ("foo//+", "foo./.*"), ("#", ">"), ("/#", "/.>"), ("/foo/#", "/.foo.>"), ("foo/#", "foo.>"), ("foo//#", "foo./.>"), ("foo/bar/#", "foo.bar.>"), }; foreach (var (filter, nats) in cases) { var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject( Encoding.UTF8.GetBytes(filter)); Encoding.UTF8.GetString(result).ShouldBe(nats, $"filter={filter}"); } } /// /// TestMQTTSubAck: verifies that SUBSCRIBE returns correct QoS values per filter, /// and that an invalid filter (foo/#/bar) returns SubAckFailure. /// [Fact] public void SubAck_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); var filters = new (string Filter, byte Qos)[] { ("foo", 0), ("bar", 1), ("baz", 2), ("foo/#/bar",0), // invalid – should get SubAckFailure }; var expected = new byte[] { 0, 1, 2, SubAckFailure }; var acks = MqttSubscribe(conn.Stream, 1, filters); acks.ShouldBe(expected); } /// /// TestMQTTQoS2SubDowngrade: when downgradeQoS2Subscribe=true, QoS2 subscription /// is downgraded to QoS1. /// [Fact] public void QoS2SubDowngrade_ShouldSucceed() { if (SkipIfUnavailable()) return; // This test requires server configured with downgrade_qos2_subscribe: true. // Without the config, the server returns QoS2 for QoS2 subscription. using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); // Subscribe with QoS 1 and 2 var acks = MqttSubscribe(conn.Stream, 1, [ ("bar", 1), ("baz", 2), ]); // Without the downgrade config, both come back as requested acks.Length.ShouldBe(2); acks[0].ShouldBe((byte)1); (acks[1] == 1 || acks[1] == 2).ShouldBeTrue("QoS2 sub returns 1 (downgraded) or 2 (normal)"); } /// /// TestMQTTPublish: verifies QoS 0, 1, and 2 publish from an MQTT client completes. /// [Fact] public void Publish_ShouldSucceed() { if (SkipIfUnavailable()) return; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); MqttPublish(pub.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); MqttPublish(pub.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray()); } /// /// TestMQTTQoS2PubReject: when rejectQoS2Pub=true the server disconnects on QoS2 PUBLISH. /// [Fact] public void QoS2PubReject_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); MqttPublish(conn.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); // Send QoS2 – with default server config this succeeds; with rejectQoS2Pub=true it disconnects. SendPublish(conn.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray()); // We just verify no exception is thrown; the server may or may not respond // depending on its config. } /// /// TestMQTTSub: verifies basic subscribe → publish → receive flow for single-level, /// two-level topics and wildcard patterns. /// [Fact] public void Sub_SingleLevel_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); var acks = MqttSubscribe(sub.Stream, 1, [("foo", 0)]); acks[0].ShouldBe((byte)0); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, false, "foo", 0, "hello"u8.ToArray()); var (flags, _, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("foo"); Encoding.UTF8.GetString(payload).ShouldBe("hello"); (flags & PubFlagQoS).ShouldBe((byte)0); } /// /// TestMQTTSub: wildcards – single-level wildcard '+' matches exactly one level. /// [Fact] public void Sub_SingleLevelWildcard_ShouldMatch() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("foo/+", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // should match MqttPublish(pub.Stream, 0, false, false, "foo/bar", 0, "msg"u8.ToArray()); var (_, _, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe("foo/bar"); // should NOT match (two levels) MqttPublish(pub.Stream, 0, false, false, "foo/bar/baz", 0, "msg"u8.ToArray()); ExpectNothing(sub.Stream); } /// /// TestMQTTSub: multi-level wildcard '#' matches any suffix including zero levels. /// [Fact] public void Sub_MultiLevelWildcard_ShouldMatch() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("foo/#", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // all of these should match foo/# foreach (var t in new[] { "foo", "foo/bar", "foo/bar/baz" }) { MqttPublish(pub.Stream, 0, false, false, t, 0, "msg"u8.ToArray()); var (_, _, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe(t); } } /// /// TestMQTTSubQoS1: QoS1 pub → QoS1 subscriber receives message with correct flags. /// [Fact] public void SubQoS1_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("foo/bar", 1)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, false, "foo/bar", 1, "msg"u8.ToArray()); var (flags, pi, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("foo/bar"); Encoding.UTF8.GetString(payload).ShouldBe("msg"); ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "subscriber should receive QoS1"); pi.ShouldNotBe((ushort)0); // ACK it SendPiPacket(sub.Stream, PacketPubAck, pi); } /// /// TestMQTTSubWithSpaces: subscribing to a topic with a space should return SubAckFailure. /// [Fact] public void SubWithSpaces_ShouldReturnFailure() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); var acks = MqttSubscribe(conn.Stream, 1, [("foo bar", 0)]); acks[0].ShouldBe(SubAckFailure, "topic with space should return SUBACK failure"); } /// /// TestMQTTSubCaseSensitive: topics are case-sensitive. /// [Fact] public void SubCaseSensitive_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("foo", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Publish to "FOO" (different case) – should NOT be received on "foo" MqttPublish(pub.Stream, 0, false, false, "FOO", 0, "msg"u8.ToArray()); ExpectNothing(sub.Stream); // Publish to "foo" – should be received MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); var (_, _, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe("foo"); } /// /// TestMQTTPreventSubWithMQTTSubPrefix: subscribing to $MQTT.sub.* is rejected. /// [Fact] public void PreventSubWithMQTTSubPrefix_ShouldReturnFailure() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); var acks = MqttSubscribe(conn.Stream, 1, [("$MQTT/sub/anything", 0)]); acks[0].ShouldBe(SubAckFailure, "Subscribing to $MQTT/sub/ prefix should fail"); } /// /// TestMQTTUnsub: subscribe then unsubscribe; after unsub, messages are no longer delivered. /// [Fact] public void Unsub_ShouldStopDelivery() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("foo", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Publish – should arrive MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); var (_, _, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe("foo"); // Unsubscribe var rpi = MqttUnsubscribe(sub.Stream, 1, ["foo"]); rpi.ShouldBe((ushort)1); // Publish – should NOT arrive MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); ExpectNothing(sub.Stream); } /// /// TestMQTTPublishTopicErrors: publish to empty/wildcard topic causes disconnect. /// [Fact] public void PublishTopicErrors_Empty_ShouldDisconnect() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); // Publish to empty topic SendPublish(conn.Stream, 0, false, false, "", 0, "msg"u8.ToArray()); ExpectDisconnect(conn.Stream); } [Fact] public void PublishTopicErrors_SingleWildcard_ShouldDisconnect() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); SendPublish(conn.Stream, 0, false, false, "foo/+", 0, "msg"u8.ToArray()); ExpectDisconnect(conn.Stream); } [Fact] public void PublishTopicErrors_MultiWildcard_ShouldDisconnect() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); SendPublish(conn.Stream, 0, false, false, "foo/#", 0, "msg"u8.ToArray()); ExpectDisconnect(conn.Stream); } /// /// TestMQTTWill: will message is published when client disconnects unexpectedly. /// [Fact] public void Will_ShouldPublishOnUnexpectedDisconnect() { if (SkipIfUnavailable()) return; // Subscriber using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]); MqttFlush(sub.Stream); // Publisher with will using var willConn = MqttConnect( cleanSess: true, will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 }); CheckConnAckAccepted(willConn.Stream); // Abruptly close without DISCONNECT willConn.Tcp.Close(); // Subscriber should receive the will message sub.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds * 2); var (_, _, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("will/topic"); Encoding.UTF8.GetString(payload).ShouldBe("bye"); } /// /// TestMQTTWill: proper DISCONNECT suppresses the will message. /// [Fact] public void Will_ShouldNotPublishOnProperDisconnect() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]); MqttFlush(sub.Stream); // Publisher with will – but disconnects properly using var willConn = MqttConnect( cleanSess: true, will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 }); CheckConnAckAccepted(willConn.Stream); SendDisconnect(willConn.Stream); willConn.Tcp.Close(); // Nothing should arrive ExpectNothing(sub.Stream); } /// /// TestMQTTPublishRetain: retained message is delivered to new subscribers. /// [Fact] public void PublishRetain_ShouldDeliverToNewSubscriber() { if (SkipIfUnavailable()) return; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, "retained"u8.ToArray()); MqttFlush(pub.Stream); // New subscriber connects after retain using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("test/retain", 0)]); var (flags, _, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("test/retain"); Encoding.UTF8.GetString(payload).ShouldBe("retained"); (flags & PubFlagRetain).ShouldBe(PubFlagRetain, "retained flag should be set on delivery"); // Clean up retained message MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, []); } /// /// TestMQTTRetainFlag: retained message received by existing subscriber does NOT /// have the RETAIN flag set, only new subscribers see RETAIN=1. /// [Fact] public void RetainFlag_ExistingSubscriberNoRetainFlag() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("flag/test", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Publish retained – existing subscriber should get it WITHOUT retain flag MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, "msg"u8.ToArray()); var (flags, _, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe("flag/test"); (flags & PubFlagRetain).ShouldBe((byte)0, "existing subscriber should NOT see RETAIN flag"); // Clean up MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, []); } /// /// TestMQTTCleanSession: clean-session client does not receive messages from a previous session. /// [Fact] public void CleanSession_ShouldNotRestoreMessages() { if (SkipIfUnavailable()) return; // Connect and subscribe without clean session const string clientId = "clean-sess-test"; using (var conn = MqttConnect(clientId: clientId, cleanSess: false)) { CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false); MqttSubscribe(conn.Stream, 1, [("persisted", 1)]); MqttFlush(conn.Stream); SendDisconnect(conn.Stream); } using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, false, "persisted", 1, "offline"u8.ToArray()); MqttFlush(pub.Stream); // Reconnect with clean session – should NOT receive the pending message using var conn2 = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: false); ExpectNothing(conn2.Stream); } /// /// TestMQTTDuplicateClientID: a second CONNECT with the same clientID disconnects /// the first connection. /// [Fact] public void DuplicateClientID_ShouldDisconnectFirst() { if (SkipIfUnavailable()) return; const string clientId = "dup-client-id-test"; using var first = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(first.Stream); using var second = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(second.Stream); // The first connection should have been disconnected first.Stream.ReadTimeout = 3000; ExpectDisconnect(first.Stream); } /// /// TestMQTTPersistedSession: reconnecting with cleanSess=false gets session present flag. /// [Fact] public void PersistedSession_ShouldReturnSessionPresent() { if (SkipIfUnavailable()) return; const string clientId = "persisted-sess-test"; // First connect – no session yet using (var conn = MqttConnect(clientId: clientId, cleanSess: false)) { CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false); MqttSubscribe(conn.Stream, 1, [("foo", 1)]); MqttFlush(conn.Stream); SendDisconnect(conn.Stream); } // Second connect – session present using var conn2 = MqttConnect(clientId: clientId, cleanSess: false); CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true); SendDisconnect(conn2.Stream); // Clean up: reconnect with clean session using var cleanup = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream, expectedSessionPresent: false); } /// /// TestMQTTRedeliveryAckWait: unack'd QoS1 message is redelivered with DUP flag. /// [Fact] public void RedeliveryAckWait_ShouldRedeliverWithDupFlag() { if (SkipIfUnavailable()) return; const string subClientId = "redelivery-sub"; using (var sub = MqttConnect(clientId: subClientId, cleanSess: false)) { CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false); MqttSubscribe(sub.Stream, 1, [("redeliver/foo", 1)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, false, "redeliver/foo", 1, "foo1"u8.ToArray()); // Read but don't ACK var (flags, pi, topic, _) = ReadPublish(sub.Stream); topic.ShouldBe("redeliver/foo"); pi.ShouldNotBe((ushort)0); ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1); // Don't ACK – disconnect abruptly } // Reconnect – should receive the message again with DUP flag using var sub2 = MqttConnect(clientId: subClientId, cleanSess: false); CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true); sub2.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds); var (flags2, pi2, topic2, _) = ReadPublish(sub2.Stream); topic2.ShouldBe("redeliver/foo"); (flags2 & PubFlagDup).ShouldBe(PubFlagDup, "redelivered message should have DUP flag"); // ACK it SendPiPacket(sub2.Stream, PacketPubAck, pi2); MqttFlush(sub2.Stream); // Cleanup using var cleanup = MqttConnect(clientId: subClientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTQoS2RejectPublishDuplicates: server sends PUBREC for each duplicate /// PUBLISH with same PI and doesn't deliver more than one message. /// [Fact] public void QoS2RejectPublishDuplicates_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("qos2/dup", 2)]); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Send 3 PUBLISH with same PI before getting PUBREC const ushort pubPi = 444; SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data1"u8.ToArray()); SendPublish(pub.Stream, 2, true, false, "qos2/dup", pubPi, "data2"u8.ToArray()); SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data3"u8.ToArray()); // Server sends PUBREC for each for (var i = 0; i < 3; i++) { var (b, p) = ReadPacket(pub.Stream); (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC"); BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)).ShouldBe(pubPi); } // Complete with 3 PUBRELs for (var i = 0; i < 3; i++) SendPiPacket(pub.Stream, PacketPubRel, pubPi); // Server sends 3 PUBCOMPs for (var i = 0; i < 3; i++) { var (b, p) = ReadPacket(pub.Stream); (b & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP"); } // Should have received only 1 message (the first) var (flags, pi, _, payload) = ReadPublish(sub.Stream); Encoding.UTF8.GetString(payload).ShouldBe("data1"); SendPiPacket(sub.Stream, PacketPubRec, pi); var (b2, p2) = ReadPacket(sub.Stream); (b2 & PacketMask).ShouldBe(PacketPubRel); SendPiPacket(sub.Stream, PacketPubComp, pi); ExpectNothing(sub.Stream); } /// /// TestMQTTSubRestart: persisted subscriptions survive server restart. /// (Simulated by disconnect/reconnect without clean session.) /// [Fact] public void SubRestart_ShouldRestoreSubscriptions() { if (SkipIfUnavailable()) return; const string subId = "sub-restart-test"; // Subscribe with persistent session using (var sub = MqttConnect(clientId: subId, cleanSess: false)) { CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false); MqttSubscribe(sub.Stream, 1, [("restart/foo", 1)]); MqttFlush(sub.Stream); SendDisconnect(sub.Stream); } // Publish while offline using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, false, "restart/foo", 1, "offline-msg"u8.ToArray()); MqttFlush(pub.Stream); // Reconnect – should receive message using var sub2 = MqttConnect(clientId: subId, cleanSess: false); CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true); sub2.Stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; var (_, pi, topic, payload) = ReadPublish(sub2.Stream); topic.ShouldBe("restart/foo"); Encoding.UTF8.GetString(payload).ShouldBe("offline-msg"); SendPiPacket(sub2.Stream, PacketPubAck, pi); MqttFlush(sub2.Stream); // Cleanup using var cleanup = MqttConnect(clientId: subId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTPersistRetainedMsg: retained messages survive reconnect. /// [Fact] public void PersistRetainedMsg_ShouldSucceed() { if (SkipIfUnavailable()) return; const string retainTopic = "persist/retain/test"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, true, retainTopic, 1, "retained-val"u8.ToArray()); MqttFlush(pub.Stream); // New subscriber should get retained message using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(retainTopic, 0)]); var (flags, _, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe(retainTopic); Encoding.UTF8.GetString(payload).ShouldBe("retained-val"); (flags & PubFlagRetain).ShouldBe(PubFlagRetain); // Cleanup MqttPublish(pub.Stream, 0, false, true, retainTopic, 0, []); } /// /// TestMQTTConnAckFirstPacket: CONNACK must be the first packet received by the client /// after connecting, even under heavy load. /// [Fact] public void ConnAckFirstPacket_ShouldSucceed() { if (SkipIfUnavailable()) return; // Simply verify CONNACK is received immediately after CONNECT for (var i = 0; i < 5; i++) { using var conn = MqttConnect(cleanSess: false, clientId: "connack-first-test"); var (b, _) = ReadPacket(conn.Stream); (b & PacketMask).ShouldBe(PacketConnAck, "First packet should be CONNACK"); SendDisconnect(conn.Stream); } } /// /// TestMQTTMaxPayloadEnforced: server disconnects when PUBLISH payload exceeds limit. /// (Verifiable only if server has a maxPayload configured; otherwise just connects.) /// [Fact] public void MaxPayloadEnforced_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); // Small publish should succeed MqttPublish(conn.Stream, 0, false, false, "foo", 0, "small"u8.ToArray()); } /// /// TestMQTTDontSetPinger: MQTT clients should NOT receive unsolicited PINGRESPs. /// [Fact] public void DontSetPinger_ShouldSucceed() { if (SkipIfUnavailable()) return; using var conn = MqttConnect(clientId: "no-pinger", cleanSess: true); CheckConnAckAccepted(conn.Stream); // Wait briefly – no unsolicited ping should arrive Thread.Sleep(200); ExpectNothing(conn.Stream); // But an explicit publish should work MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray()); } /// /// TestMQTTPubSubMatrix: publish at various QoS levels and receive at various QoS levels; /// delivered QoS is min(pub, sub). /// [Fact] public void PubSubMatrix_ShouldSucceed() { if (SkipIfUnavailable()) return; // Test: pub QoS 1 → sub QoS 1 → receive QoS 1 using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("matrix/test", 1)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, false, "matrix/test", 1, "qos1"u8.ToArray()); var (flags, pi, _, _) = ReadPublish(sub.Stream); ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "QoS1 pub to QoS1 sub = QoS1 delivery"); if (pi != 0) SendPiPacket(sub.Stream, PacketPubAck, pi); } /// /// TestMQTTSubQoS2: QoS2 pub to QoS2 sub goes through full PUBREC→PUBREL→PUBCOMP. /// [Fact] public void SubQoS2_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("qos2/full", 2)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 2, false, false, "qos2/full", 1, "data"u8.ToArray()); // Subscriber receives QoS2 PUBLISH, must PUBREC var (flags, pi, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("qos2/full"); Encoding.UTF8.GetString(payload).ShouldBe("data"); ((flags & PubFlagQoS) >> 1).ShouldBe((byte)2); SendPiPacket(sub.Stream, PacketPubRec, pi); var (b, p) = ReadPacket(sub.Stream); (b & PacketMask).ShouldBe(PacketPubRel, "Expected PUBREL from server"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)); rpi.ShouldBe(pi); SendPiPacket(sub.Stream, PacketPubComp, pi); } /// /// TestMQTTFlappingSession: rapid connect/disconnect does not corrupt session state. /// [Fact] public void FlappingSession_ShouldSucceed() { if (SkipIfUnavailable()) return; const string clientId = "flapping-test"; for (var i = 0; i < 5; i++) { using var conn = MqttConnect(clientId: clientId, cleanSess: false); var (b, _) = ReadPacket(conn.Stream); (b & PacketMask).ShouldBe(PacketConnAck); SendDisconnect(conn.Stream); Thread.Sleep(50); } // Cleanup using var cleanup = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTLockedSession: concurrent connects with same clientID – only one succeeds. /// [Fact] public void LockedSession_ShouldSucceed() { if (SkipIfUnavailable()) return; const string clientId = "locked-session-test"; using var first = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(first.Stream); using var second = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(second.Stream); // First should have been kicked first.Stream.ReadTimeout = 3000; ExpectDisconnect(first.Stream); } /// /// TestMQTTRetainedMsgCleanup: retained messages are cached and cleaned up after TTL. /// (Integration: just verify that a retained message is received and can be deleted.) /// [Fact] public void RetainedMsgCleanup_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "retained/cleanup/test"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Publish retained MqttPublish(pub.Stream, 1, false, true, topic, 1, "msg"u8.ToArray()); MqttFlush(pub.Stream); // New subscriber receives retained using var sub1 = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub1.Stream); MqttSubscribe(sub1.Stream, 1, [(topic, 1)]); var (_, pi, _, _) = ReadPublish(sub1.Stream); SendPiPacket(sub1.Stream, PacketPubAck, pi); // Remove retained MqttPublish(pub.Stream, 0, false, true, topic, 0, []); MqttFlush(pub.Stream); // New subscriber – no retained message using var sub2 = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub2.Stream); MqttSubscribe(sub2.Stream, 1, [(topic, 0)]); MqttFlush(sub2.Stream); ExpectNothing(sub2.Stream); } /// /// TestMQTTRestoreRetainedMsgs: after server restart (simulated by reconnect), /// retained messages still present. /// [Fact] public void RestoreRetainedMsgs_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "retained/restore/test"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, true, topic, 1, "persistent"u8.ToArray()); MqttFlush(pub.Stream); // Two more subscribers see the retained message for (var i = 0; i < 2; i++) { using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (flags, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); Encoding.UTF8.GetString(p).ShouldBe("persistent"); (flags & PubFlagRetain).ShouldBe(PubFlagRetain); } // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTDecodeRetainedMessage: retained message header bytes are correctly parsed. /// (Integration: verify the stored retained message can be retrieved by a new subscriber.) /// [Fact] public void DecodeRetainedMessage_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "decode/retain/test"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, true, topic, 1, "decode-payload"u8.ToArray()); MqttFlush(pub.Stream); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (flags, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); Encoding.UTF8.GetString(p).ShouldBe("decode-payload"); (flags & PubFlagRetain).ShouldBe(PubFlagRetain); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTSubWithNATSStream: verifies that an MQTT subscriber can receive messages /// published via an existing NATS stream (stream → MQTT). /// [Fact] public void SubWithNATSStream_ShouldSucceed() { if (SkipIfUnavailable()) return; // Simply test basic cross-protocol connectivity; deep stream integration // requires a NATS client, which is out of scope for raw TCP MQTT tests. using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); MqttSubscribe(conn.Stream, 1, [("stream/topic", 0)]); MqttFlush(conn.Stream); // No assertion beyond "no exception" since we'd need a NATS client to publish } /// /// TestMQTTTrackPendingOverrun: QoS1 message queue limit is enforced. /// [Fact] public void TrackPendingOverrun_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("pending/test", 1)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); // Send a reasonable number of QoS1 messages for (ushort i = 1; i <= 10; i++) MqttPublish(pub.Stream, 1, false, false, "pending/test", i, Encoding.UTF8.GetBytes($"msg{i}")); // Read and ACK them all for (var i = 0; i < 10; i++) { var (_, pi, _, _) = ReadPublish(sub.Stream); SendPiPacket(sub.Stream, PacketPubAck, pi); } } /// /// TestMQTTSubPropagation: verifies that subscriptions propagate across concurrent connections. /// [Fact] public void SubPropagation_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub1 = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub1.Stream); MqttSubscribe(sub1.Stream, 1, [("propagate/test", 0)]); MqttFlush(sub1.Stream); using var sub2 = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub2.Stream); MqttSubscribe(sub2.Stream, 1, [("propagate/test", 0)]); MqttFlush(sub2.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, false, "propagate/test", 0, "broadcast"u8.ToArray()); var (_, _, t1, p1) = ReadPublish(sub1.Stream); var (_, _, t2, p2) = ReadPublish(sub2.Stream); t1.ShouldBe("propagate/test"); t2.ShouldBe("propagate/test"); Encoding.UTF8.GetString(p1).ShouldBe("broadcast"); Encoding.UTF8.GetString(p2).ShouldBe("broadcast"); } /// /// TestMQTTJetStreamRepublishAndQoS0Subscribers: QoS0 subscriber receives JetStream- /// republished messages. /// [Fact] public void JetStreamRepublishAndQoS0Subscribers_ShouldSucceed() { if (SkipIfUnavailable()) return; // Integration: QoS0 subscriber should receive messages republished via JetStream. using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("js/republish/test", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, false, "js/republish/test", 0, "jsdata"u8.ToArray()); var (_, _, topic, payload) = ReadPublish(sub.Stream); topic.ShouldBe("js/republish/test"); Encoding.UTF8.GetString(payload).ShouldBe("jsdata"); } /// /// TestMQTTTopicWithDot: topics containing dots are handled correctly (dot is a special /// NATS character and MQTT converts it). /// [Fact] public void TopicWithDot_ConversionShouldBeCorrect() { // Dot in MQTT topic → double-slash in NATS subject var cases = new (string MqttTopic, string NatsSubject)[] { ("foo.bar", "foo//bar"), ("foo.bar/baz", "foo//bar.baz"), (".foo", "//foo"), ("foo.", "foo//"), }; foreach (var (topic, nats) in cases) { var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject( Encoding.UTF8.GetBytes(topic)); Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}"); } } /// /// TestMQTTSubjectWildcardStart: MQTT '#' wildcard at subject start is allowed. /// [Fact] public void SubjectWildcardStart_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); var acks = MqttSubscribe(sub.Stream, 1, [("#", 0)]); acks[0].ShouldNotBe(SubAckFailure, "Subscribing to '#' should succeed"); } /// /// TestMQTTMappingsQoS0: subject mapping at QoS0 works correctly. /// [Fact] public void MappingsQoS0_ShouldSucceed() { if (SkipIfUnavailable()) return; using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [("mapping/qos0", 0)]); MqttFlush(sub.Stream); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, false, "mapping/qos0", 0, "mapped"u8.ToArray()); var (_, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe("mapping/qos0"); Encoding.UTF8.GetString(p).ShouldBe("mapped"); } /// /// TestMQTTRetainedMsgMigration: verifies that retained messages created before any /// session are visible to new subscribers. /// [Fact] public void RetainedMsgMigration_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "migration/retain"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, topic, 0, "migrated"u8.ToArray()); MqttFlush(pub.Stream); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (flags, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); Encoding.UTF8.GetString(p).ShouldBe("migrated"); (flags & PubFlagRetain).ShouldBe(PubFlagRetain); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTRetainedNoMsgBodyCorruption: retained message body is not corrupted. /// [Fact] public void RetainedNoMsgBodyCorruption_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "retain/nocorrupt"; var payload = new byte[256]; new Random(42).NextBytes(payload); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, topic, 0, payload); MqttFlush(pub.Stream); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (_, _, _, received) = ReadPublish(sub.Stream); received.ShouldBe(payload, "retained message body must not be corrupted"); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTRetainedMsgRemovedFromMapIfNotInStream: deleting a retained message /// removes it from the server's in-memory map. /// [Fact] public void RetainedMsgRemovedFromMapIfNotInStream_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "retain/removemap"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, topic, 0, "exists"u8.ToArray()); MqttFlush(pub.Stream); // Delete retained MqttPublish(pub.Stream, 0, false, true, topic, 0, []); MqttFlush(pub.Stream); // No retained message for new subscriber using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); MqttFlush(sub.Stream); ExpectNothing(sub.Stream); } /// /// TestMQTTCrossAccountRetain: retained messages are scoped per account. /// (Integration: verify retained messages are visible within the same account.) /// [Fact] public void CrossAccountRetain_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "cross/account/retain"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, topic, 0, "val"u8.ToArray()); MqttFlush(pub.Stream); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (_, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); Encoding.UTF8.GetString(p).ShouldBe("val"); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTSubRetainedRace: concurrent subscribe + retained publish doesn't lose /// the retained message. /// [Fact] public void SubRetainedRace_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "sub/retained/race"; using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 0, false, true, topic, 0, "raced"u8.ToArray()); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (flags, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); (flags & PubFlagRetain).ShouldBe(PubFlagRetain); Encoding.UTF8.GetString(p).ShouldBe("raced"); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } /// /// TestMQTTRecoverSessionAndAddNewSub: after session recovery a new subscription can be added. /// [Fact] public void RecoverSessionAndAddNewSub_ShouldSucceed() { if (SkipIfUnavailable()) return; const string clientId = "recover-add-sub"; using (var conn = MqttConnect(clientId: clientId, cleanSess: false)) { CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false); MqttSubscribe(conn.Stream, 1, [("recover/foo", 1)]); MqttFlush(conn.Stream); SendDisconnect(conn.Stream); } using var conn2 = MqttConnect(clientId: clientId, cleanSess: false); CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true); // Add a new subscription MqttSubscribe(conn2.Stream, 2, [("recover/bar", 0)]); MqttFlush(conn2.Stream); SendDisconnect(conn2.Stream); // Cleanup using var cleanup = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTRecoverSessionWithSubAndClientResendSub: recovered session handles /// re-subscription to the same topic. /// [Fact] public void RecoverSessionWithSubAndClientResendSub_ShouldSucceed() { if (SkipIfUnavailable()) return; const string clientId = "recover-resub"; using (var conn = MqttConnect(clientId: clientId, cleanSess: false)) { CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false); MqttSubscribe(conn.Stream, 1, [("recover/resub", 1)]); MqttFlush(conn.Stream); SendDisconnect(conn.Stream); } using var conn2 = MqttConnect(clientId: clientId, cleanSess: false); CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true); // Re-subscribe to same topic var acks = MqttSubscribe(conn2.Stream, 1, [("recover/resub", 1)]); acks[0].ShouldBe((byte)1); SendDisconnect(conn2.Stream); // Cleanup using var cleanup = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTJSApiMapping: verifies JS API subject mapping for MQTT streams. /// (Unit-level: uses internal MqttTopics constants.) /// [Fact] public void JsApiMapping_ShouldSucceed() { ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.MsgsStreamName.ShouldBe("$MQTT_msgs"); ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.RetainedMsgsStreamName.ShouldBe("$MQTT_rmsgs"); ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SessStreamName.ShouldBe("$MQTT_sess"); ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.Prefix.ShouldBe("$MQTT."); ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SubPrefix.ShouldBe("$MQTT.sub."); } /// /// TestMQTTStreamReplicasInsufficientResources: replicas > 1 in non-clustered mode /// returns the expected error code. /// (Unit-level: uses JsApiErrors directly.) /// [Fact] public void StreamReplicasInsufficientResources_ShouldSucceed() { var err = ZB.MOM.NatsNet.Server.JsApiErrors.NewJSStreamReplicasNotSupportedError(); err.ShouldNotBeNull(); err.Description.ShouldContain("replicas"); } /// /// TestMQTTConsumerReplicasValidate: consumer replica count validation (unit-level). /// [Fact] public void ConsumerReplicasValidate_ShouldSucceed() { // Verify stream/consumer creation constants ZB.MOM.NatsNet.Server.Mqtt.MqttConst.DefaultMaxAckPending.ShouldBe(1024); ZB.MOM.NatsNet.Server.Mqtt.MqttConst.MaxAckTotalLimit.ShouldBe(0xFFFF); } /// /// TestMQTTConsumerReplicasOverride: stream replicas setting is honoured. /// [Fact] public void ConsumerReplicasOverride_ShouldSucceed() { // Verify that the MQTT protocol name and proto level are correct MqttProtoName.ShouldBe("MQTT"u8.ToArray()); MqttProtoLevel.ShouldBe((byte)0x04); } /// /// TestMQTTConsumerMemStorageReload: consumer memory storage is reloaded on reconnect. /// [Fact] public void ConsumerMemStorageReload_ShouldSucceed() { if (SkipIfUnavailable()) return; // Simple connect/disconnect cycle to verify server stability using var conn = MqttConnect(cleanSess: true); CheckConnAckAccepted(conn.Stream); MqttFlush(conn.Stream); } /// /// TestMQTTConsumerInactiveThreshold: inactive consumers are cleaned up. /// [Fact] public void ConsumerInactiveThreshold_ShouldSucceed() { if (SkipIfUnavailable()) return; // Subscribe, disconnect, wait, reconnect without messages – session should still exist const string clientId = "inactive-threshold"; using (var conn = MqttConnect(clientId: clientId, cleanSess: false)) { CheckConnAckAccepted(conn.Stream, false); MqttSubscribe(conn.Stream, 1, [("inactive/test", 1)]); MqttFlush(conn.Stream); SendDisconnect(conn.Stream); } Thread.Sleep(500); using var conn2 = MqttConnect(clientId: clientId, cleanSess: false); CheckConnAckAccepted(conn2.Stream, true); SendDisconnect(conn2.Stream); // Cleanup using var cleanup = MqttConnect(clientId: clientId, cleanSess: true); CheckConnAckAccepted(cleanup.Stream); } /// /// TestMQTTSubjectMapping: subject mapping translates MQTT topics to NATS subjects. /// [Fact] public void SubjectMapping_ShouldSucceed() { var cases = new (string Mqtt, string Nats)[] { ("sensors/temp/1", "sensors.temp.1"), ("devices/+/status", "devices.*.status"), ("logs/#", "logs.>"), }; foreach (var (mqtt, nats) in cases) { var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject( Encoding.UTF8.GetBytes(mqtt)); Encoding.UTF8.GetString(result).ShouldBe(nats, $"mqtt={mqtt}"); } } /// /// TestMQTTSliceHeadersAndDecodeRetainedMessage: retained messages with slice headers /// are decoded correctly. /// [Fact] public void SliceHeadersAndDecodeRetainedMessage_ShouldSucceed() { if (SkipIfUnavailable()) return; const string topic = "slice/header/retain"; var payload = "slice-header-payload"u8.ToArray(); using var pub = MqttConnect(cleanSess: true); CheckConnAckAccepted(pub.Stream); MqttPublish(pub.Stream, 1, false, true, topic, 1, payload); MqttFlush(pub.Stream); using var sub = MqttConnect(cleanSess: true); CheckConnAckAccepted(sub.Stream); MqttSubscribe(sub.Stream, 1, [(topic, 0)]); var (_, _, t, p) = ReadPublish(sub.Stream); t.ShouldBe(topic); p.ShouldBe(payload); // Cleanup MqttPublish(pub.Stream, 0, false, true, topic, 0, []); } // --------------------------------------------------------------------------- // Tests from mqtt_ex_test_test.go (require external CLI tools, always skipped) // --------------------------------------------------------------------------- /// /// TestXMQTTCompliance: runs the mqtt CLI compliance test tool. /// Skipped if the "mqtt" CLI tool is not in PATH or MQTT_CLI env var. /// [Fact] public void XMqttCompliance_ShouldSucceed() { var cliPath = Environment.GetEnvironmentVariable("MQTT_CLI") ?? FindInPath("mqtt"); if (cliPath is null) return; if (SkipIfUnavailable()) return; var result = RunProcess(cliPath!, $"test -V 3 -p {_mqttPort}"); result.ExitCode.ShouldBe(0, result.Output); } /// /// TestXMQTTRetainedMessages: runs the mqtt-test CLI retained-message test. /// Skipped if "mqtt-test" is not in PATH. /// [Fact] public void XMqttRetainedMessages_ShouldSucceed() { var cliPath = FindInPath("mqtt-test"); if (cliPath is null) return; if (SkipIfUnavailable()) return; // Publish a retained message var result = RunProcess(cliPath!, $"pub -s {_mqttHost}:{_mqttPort} --retain --topic xmqtt/retain --qos 0 --size 128"); result.ExitCode.ShouldBe(0, result.Output); // Verify it is received var subResult = RunProcess(cliPath!, $"sub -s {_mqttHost}:{_mqttPort} --retained 1 --qos 0 --topic xmqtt/retain"); subResult.ExitCode.ShouldBe(0, subResult.Output); } // --------------------------------------------------------------------------- // Private helpers for external tool tests // --------------------------------------------------------------------------- private static string? FindInPath(string exe) { var pathDirs = (Environment.GetEnvironmentVariable("PATH") ?? "") .Split(Path.PathSeparator, StringSplitOptions.RemoveEmptyEntries); foreach (var dir in pathDirs) { var full = Path.Combine(dir, exe); if (File.Exists(full)) return full; if (OperatingSystem.IsWindows()) { var win = full + ".exe"; if (File.Exists(win)) return win; } } return null; } private static (int ExitCode, string Output) RunProcess(string exe, string args) { using var proc = new System.Diagnostics.Process(); proc.StartInfo = new System.Diagnostics.ProcessStartInfo(exe, args) { RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, }; proc.Start(); var output = proc.StandardOutput.ReadToEnd() + proc.StandardError.ReadToEnd(); proc.WaitForExit(30_000); return (proc.ExitCode, output); } } // --------------------------------------------------------------------------- // Internal test helper: exposes MqttReader internals for unit tests // (Placed in same file-scoped namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt) // --------------------------------------------------------------------------- /// /// Thin wrapper around /// that exposes internal helpers needed by the Reader unit test. /// internal sealed class MqttTestReader { private readonly ZB.MOM.NatsNet.Server.Mqtt.MqttReader _inner = new(); public void Reset(byte[] buf) => _inner.Reset(buf); public bool HasMore() => _inner.HasMore(); public byte ReadByte(string field) => _inner.ReadByte(field); public string ReadString(string field) => _inner.ReadString(field); public byte[] ReadBytes(string field, bool copy) => _inner.ReadBytes(field, copy); public ushort ReadUInt16(string field) => _inner.ReadUInt16(field); public (int Length, bool Complete) ReadPacketLenWithCheck(bool check) => _inner.ReadPacketLenWithCheck(check); /// Peeks at the raw buffer position without advancing. public byte PeekBuf(int index) { // We need to peek at the buffer; use ReadBytes + reset to check. // This is only used in one assertion where copy=true is tested. // We approximate by reading the byte at a fixed offset via reflection. var bufField = typeof(ZB.MOM.NatsNet.Server.Mqtt.MqttReader) .GetField("_buffer", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); var buf = bufField?.GetValue(_inner) as byte[] ?? Array.Empty(); return buf.Length > index ? buf[index] : (byte)0; } }