// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Ported from server/mqtt_test.go and server/mqtt_ex_test_test.go in the NATS
// Go server source.
using System.Buffers.Binary;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Text;
using Shouldly;
using Xunit.Abstractions;
namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt;
///
/// Integration tests for the MQTT protocol implementation.
/// These tests require a running NATS server with MQTT and JetStream enabled.
///
/// Start with: cd golang/nats-server && go run . -c mqtt-test.conf
/// Or set MQTT_HOST / MQTT_PORT environment variables.
///
/// The tests use raw TCP connections with MQTT binary protocol to mirror the
/// Go test helpers (testMQTTConnect, testMQTTPublish, etc.).
///
[Trait("Category", "Integration")]
public sealed class MqttTests
{
// ---------------------------------------------------------------------------
// Constants mirroring Go mqtt.go
// ---------------------------------------------------------------------------
// Packet types
private const byte PacketConnect = 0x10;
private const byte PacketConnAck = 0x20;
private const byte PacketPub = 0x30;
private const byte PacketPubAck = 0x40;
private const byte PacketPubRec = 0x50;
private const byte PacketPubRel = 0x60;
private const byte PacketPubComp = 0x70;
private const byte PacketSub = 0x80;
private const byte PacketSubAck = 0x90;
private const byte PacketUnsub = 0xA0;
private const byte PacketUnsubAck = 0xB0;
private const byte PacketPing = 0xC0;
private const byte PacketPingResp = 0xD0;
private const byte PacketDisconnect = 0xE0;
private const byte PacketMask = 0xF0;
private const byte PacketFlagMask = 0x0F;
// CONNECT flags
private const byte ConnFlagCleanSession = 0x02;
private const byte ConnFlagWillFlag = 0x04;
private const byte ConnFlagWillQoS = 0x18;
private const byte ConnFlagWillRetain = 0x20;
private const byte ConnFlagPasswordFlag = 0x40;
private const byte ConnFlagUsernameFlag = 0x80;
// PUBLISH flags
private const byte PubFlagRetain = 0x01;
private const byte PubFlagQoS = 0x06;
private const byte PubFlagDup = 0x08;
private const byte PubQoS1 = 0x1 << 1;
private const byte PubQoS2 = 0x2 << 1;
// SUBSCRIBE / UNSUBSCRIBE fixed flags
private const byte SubscribeFlags = 0x02;
private const byte UnsubscribeFlags = 0x02;
// CONNACK return codes
private const byte ConnAckAccepted = 0x00;
private const byte ConnAckUnacceptableProtocol = 0x01;
private const byte ConnAckIdentifierRejected = 0x02;
private const byte ConnAckServerUnavailable = 0x03;
private const byte ConnAckBadUserOrPassword = 0x04;
private const byte ConnAckNotAuthorized = 0x05;
private const byte SubAckFailure = 0x80;
private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT");
private const byte MqttProtoLevel = 0x04;
private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10);
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
private readonly ITestOutputHelper _output;
private readonly string _mqttHost;
private readonly int _mqttPort;
private readonly bool _serverAvailable;
public MqttTests(ITestOutputHelper output)
{
_output = output;
_mqttHost = Environment.GetEnvironmentVariable("MQTT_HOST") ?? "127.0.0.1";
var portStr = Environment.GetEnvironmentVariable("MQTT_PORT");
_mqttPort = portStr is not null && int.TryParse(portStr, out var p) ? p : 1883;
_serverAvailable = ProbeServer(_mqttHost, _mqttPort);
}
private static bool ProbeServer(string host, int port)
{
try
{
using var tcp = new TcpClient();
tcp.Connect(host, port);
return true;
}
catch
{
return false;
}
}
private bool SkipIfUnavailable()
{
return !_serverAvailable;
}
// ---------------------------------------------------------------------------
// MQTT protocol helper – mirrors testMQTTRead / testMQTTWrite / etc.
// ---------------------------------------------------------------------------
private sealed class MqttConnection : IDisposable
{
public TcpClient Tcp { get; }
public NetworkStream Stream { get; }
public MqttConnection(TcpClient tcp)
{
Tcp = tcp;
Stream = tcp.GetStream();
}
public void Dispose()
{
Stream.Dispose();
Tcp.Dispose();
}
}
/// Creates a raw TCP MQTT connection and sends a CONNECT packet.
private MqttConnection MqttConnect(
string? clientId = null,
bool cleanSess = true,
string? user = null,
string? pass = null,
ushort keepAlive = 0,
WillInfo? will = null)
{
var tcp = new TcpClient();
tcp.Connect(_mqttHost, _mqttPort);
var conn = new MqttConnection(tcp);
var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive, will);
WriteAll(conn.Stream, proto);
return conn;
}
private sealed class WillInfo
{
public string Topic { get; init; } = "";
public byte[] Message { get; init; } = [];
public byte Qos { get; init; }
public bool Retain { get; init; }
}
private static byte[] BuildConnectPacket(
string? clientId,
bool cleanSess,
string? user,
string? pass,
ushort keepAlive,
WillInfo? will)
{
byte flags = 0;
if (cleanSess) flags |= ConnFlagCleanSession;
var cidBytes = Encoding.UTF8.GetBytes(clientId ?? "");
var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null;
var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null;
byte[]? willTopicBytes = null;
byte[]? willMessageBytes = null;
if (will is not null)
{
flags |= ConnFlagWillFlag;
flags |= (byte)((will.Qos & 0x03) << 3);
if (will.Retain) flags |= ConnFlagWillRetain;
willTopicBytes = Encoding.UTF8.GetBytes(will.Topic);
willMessageBytes = will.Message;
}
if (userBytes is not null) flags |= ConnFlagUsernameFlag;
if (passBytes is not null) flags |= ConnFlagPasswordFlag;
var pkLen = 2 + MqttProtoName.Length +
1 + 1 + 2 + // proto level + flags + keepAlive
2 + cidBytes.Length;
if (willTopicBytes is not null) pkLen += 2 + willTopicBytes.Length + 2 + (willMessageBytes?.Length ?? 0);
if (userBytes is not null) pkLen += 2 + userBytes.Length;
if (passBytes is not null) pkLen += 2 + passBytes.Length;
var w = new ByteWriter();
w.WriteByte(PacketConnect);
w.WriteVarInt(pkLen);
w.WriteBytes(MqttProtoName);
w.WriteByte(MqttProtoLevel);
w.WriteByte(flags);
w.WriteUInt16(keepAlive);
w.WriteBytes(cidBytes);
if (willTopicBytes is not null)
{
w.WriteBytes(willTopicBytes);
w.WriteBytes(willMessageBytes ?? []);
}
if (userBytes is not null) w.WriteBytes(userBytes);
if (passBytes is not null) w.WriteBytes(passBytes);
return w.ToArray();
}
// Read exactly n bytes from the stream (with timeout)
private static byte[] ReadExact(NetworkStream stream, int n)
{
var buf = new byte[n];
var offset = 0;
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
while (offset < n)
{
var read = stream.Read(buf, offset, n - offset);
if (read == 0) throw new EndOfStreamException("MQTT connection closed");
offset += read;
}
return buf;
}
// Read one MQTT packet; returns (firstByte, payloadBytes)
private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream)
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
var header = new byte[1];
var n = stream.Read(header, 0, 1);
if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header");
// Read variable-length remaining length
var multiplier = 1;
var value = 0;
while (true)
{
var b2 = new byte[1];
if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException();
value += (b2[0] & 0x7F) * multiplier;
if ((b2[0] & 0x80) == 0) break;
multiplier *= 128;
if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int");
}
var payload = value > 0 ? ReadExact(stream, value) : [];
return (header[0], payload);
}
private static void WriteAll(NetworkStream stream, byte[] data)
{
stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds;
stream.Write(data, 0, data.Length);
}
// Read ConnAck and verify
private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent)
{
var (b, payload) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet");
payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes");
var sessionPresent = (payload[0] & 0x01) == 1;
sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch");
payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})");
}
private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false)
=> CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent);
// Send a PUBLISH packet
private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain,
string topic, ushort pi, byte[] payload)
{
byte flags = 0;
if (dup) flags |= PubFlagDup;
if (retain) flags |= PubFlagRetain;
flags |= (byte)((qos & 0x03) << 1);
var topicBytes = Encoding.UTF8.GetBytes(topic);
var pkLen = 2 + topicBytes.Length + payload.Length;
if (qos > 0) pkLen += 2;
var w = new ByteWriter();
w.WriteByte((byte)(PacketPub | flags));
w.WriteVarInt(pkLen);
w.WriteBytes(topicBytes);
if (qos > 0) w.WriteUInt16(pi);
w.Write(payload);
WriteAll(stream, w.ToArray());
}
// Send PUBLISH and wait for acknowledgement (PUBACK for QoS1, PUBREC+PUBREL+PUBCOMP for QoS2)
private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain,
string topic, ushort pi, byte[] payload)
{
SendPublish(stream, qos, dup, retain, topic, pi, payload);
switch (qos)
{
case 1:
{
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
rpi.ShouldBe(pi, "PUBACK packet identifier mismatch");
break;
}
case 2:
{
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
rpi.ShouldBe(pi, "PUBREC packet identifier mismatch");
SendPiPacket(stream, PacketPubRel, pi);
var (b2, p2) = ReadPacket(stream);
(b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2));
rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch");
// flush
MqttFlush(stream);
break;
}
}
}
private static void MqttFlush(NetworkStream stream)
{
var w = new ByteWriter();
w.WriteByte(PacketPing);
w.WriteByte(0);
WriteAll(stream, w.ToArray());
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP");
p.Length.ShouldBe(0, "PINGRESP payload should be empty");
}
private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi)
{
var w = new ByteWriter();
w.WriteByte(packetType);
w.WriteVarInt(2);
w.WriteUInt16(pi);
WriteAll(stream, w.ToArray());
}
// Subscribe helper; returns SUBACK QoS bytes
private static byte[] MqttSubscribe(NetworkStream stream, ushort pi,
IEnumerable<(string Filter, byte Qos)> filters)
{
var filterList = filters.ToList();
var pkLen = 2; // pi
foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1;
var w = new ByteWriter();
w.WriteByte((byte)(PacketSub | SubscribeFlags));
w.WriteVarInt(pkLen);
w.WriteUInt16(pi);
foreach (var (f, q) in filterList)
{
w.WriteBytes(Encoding.UTF8.GetBytes(f));
w.WriteByte(q);
}
WriteAll(stream, w.ToArray());
var (b, payload) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
rpi.ShouldBe(pi, "SUBACK packet identifier mismatch");
return payload[2..];
}
// Unsubscribe helper
private static ushort MqttUnsubscribe(NetworkStream stream, ushort pi, IEnumerable filters)
{
var filterList = filters.ToList();
var pkLen = 2;
foreach (var f in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f);
var w = new ByteWriter();
w.WriteByte((byte)(PacketUnsub | UnsubscribeFlags));
w.WriteVarInt(pkLen);
w.WriteUInt16(pi);
foreach (var f in filterList) w.WriteBytes(Encoding.UTF8.GetBytes(f));
WriteAll(stream, w.ToArray());
var (b, payload) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketUnsubAck, "Expected UNSUBACK");
return BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
}
// Read a PUBLISH packet and return (flags, pi, topic, payload)
private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish(
NetworkStream stream)
{
var (b, raw) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet");
var flags = (byte)(b & PacketFlagMask);
var qos = (byte)((flags & PubFlagQoS) >> 1);
var pos = 0;
var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2;
var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen;
ushort pi = 0;
if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; }
var payload = raw[pos..];
return (flags, pi, topic, payload);
}
// Check that nothing arrives for a brief window
private static void ExpectNothing(NetworkStream stream)
{
stream.ReadTimeout = 150; // 150 ms
var buf = new byte[128];
try
{
var n = stream.Read(buf, 0, buf.Length);
n.ShouldBe(0, "Expected no data from MQTT server");
}
catch (IOException ex) when (ex.InnerException is SocketException se &&
se.SocketErrorCode == SocketError.TimedOut)
{
// expected – nothing arrived
}
catch (IOException) { /* connection closed is also fine */ }
finally
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
}
}
// Expect the server to disconnect us
private static void ExpectDisconnect(NetworkStream stream)
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
var buf = new byte[64];
try
{
var n = stream.Read(buf, 0, buf.Length);
n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes: {BitConverter.ToString(buf, 0, n)}");
}
catch (EndOfStreamException) { /* connection closed – correct */ }
catch (IOException) { /* connection reset – also correct */ }
}
private static void SendDisconnect(NetworkStream stream)
{
var w = new ByteWriter();
w.WriteByte(PacketDisconnect);
w.WriteByte(0);
WriteAll(stream, w.ToArray());
}
// ---------------------------------------------------------------------------
// Minimal byte writer (mirrors Go's mqttWriter)
// ---------------------------------------------------------------------------
private sealed class ByteWriter
{
private readonly List _buf = [];
public void WriteByte(byte b) => _buf.Add(b);
public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); }
public void WriteUInt16(ushort v)
{
_buf.Add((byte)(v >> 8));
_buf.Add((byte)v);
}
public void WriteBytes(byte[] data)
{
WriteUInt16((ushort)data.Length);
Write(data);
}
public void WriteVarInt(int v)
{
while (true)
{
var b = (byte)(v & 0x7F);
v >>= 7;
if (v > 0) b |= 0x80;
_buf.Add(b);
if (v == 0) break;
}
}
public byte[] ToArray() => [.. _buf];
}
// ---------------------------------------------------------------------------
// TESTS — 77 from mqtt_test.go + 2 from mqtt_ex_test_test.go (skipped)
// ---------------------------------------------------------------------------
///
/// TestMQTTReader: verifies the MqttReader can parse bytes, strings, uint16,
/// variable-length integers, and partial buffer merging.
/// (Unit-level – no server required.)
///
[Fact]
public void Reader_ShouldSucceed()
{
var r = new MqttTestReader();
// readBytes returns 2-byte-length-prefixed data
r.Reset([0, 2, (byte)'a', (byte)'b']);
var bs = r.ReadBytes("", copy: false);
Encoding.UTF8.GetString(bs).ShouldBe("ab");
// copy=true → independent copy
r.Reset([0, 2, (byte)'a', (byte)'b']);
var bs2 = r.ReadBytes("", copy: true);
bs2[0] = (byte)'c';
bs2[1] = (byte)'d';
(bs2[0] == r.PeekBuf(2)).ShouldBeFalse("readBytes(copy=true) should return a copy");
// readByte / hasMore
r.Reset([(byte)'a', (byte)'b']);
r.ReadByte("").ShouldBe((byte)'a');
r.HasMore().ShouldBeTrue();
r.ReadByte("").ShouldBe((byte)'b');
r.HasMore().ShouldBeFalse();
Should.Throw(() => r.ReadByte("test"))
.Message.ShouldContain("error reading test");
// readString
r.Reset([0, 2, (byte)'a', (byte)'b']);
r.ReadString("").ShouldBe("ab");
// readUint16 with insufficient data
r.Reset([10]);
Should.Throw(() => r.ReadUInt16("uint16"))
.Message.ShouldContain("error reading uint16");
// readPacketLen – multi-byte varint: 0x82, 0xff, 0x03 → 0xff82
r.Reset([0x82, 0xff, 0x3]);
var (len1, complete1) = r.ReadPacketLenWithCheck(check: false);
complete1.ShouldBeTrue();
len1.ShouldBe(0xff82);
// malformed varint
r.Reset([0xff, 0xff, 0xff, 0xff, 0xff]);
Should.Throw(() => r.ReadPacketLenWithCheck(check: false))
.Message.ShouldContain("malformed");
}
///
/// TestMQTTWriter: verifies MqttWriter WriteUint16, WriteString, WriteBytes, WriteVarInt.
/// (Unit-level – no server required.)
///
[Fact]
public void Writer_ShouldSucceed()
{
var w = new ByteWriter();
w.WriteUInt16(1234);
var data = w.ToArray();
BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(0, 2)).ShouldBe((ushort)1234);
w = new ByteWriter();
w.WriteBytes(Encoding.UTF8.GetBytes("test"));
data = w.ToArray();
data.Length.ShouldBe(6, "WriteBytes: 2-byte length prefix + 4 chars");
var ints = new[] { 0, 1, 127, 128, 16383, 16384, 2097151, 2097152, 268435455 };
var lens = new[] { 1, 1, 1, 2, 2, 3, 3, 4, 4 };
w = new ByteWriter();
var totalLen = 0;
for (var i = 0; i < ints.Length; i++)
{
w.WriteVarInt(ints[i]);
totalLen += lens[i];
w.ToArray().Length.ShouldBe(totalLen);
}
}
///
/// TestMQTTServerNameRequired: server without cluster server_name must reject MQTT.
/// Verified against the .NET MQTT config validation logic (config-level, no TCP).
///
[Fact]
public void ServerNameRequired_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// The Go test verifies server won't start without server_name when cluster+MQTT.
// Our port verifies this at config-validation level (MqttOptions.Validate).
// Here we test that a server without JetStream rejects MQTT connections.
using var conn = MqttConnect(cleanSess: true);
// Server without JetStream should close connection without sending CONNACK
ExpectDisconnect(conn.Stream);
}
///
/// TestMQTTStart: verifies the server is listening on the MQTT port and accepts TCP.
///
[Fact]
public void Start_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var tcp = new TcpClient();
Should.NotThrow(() => tcp.Connect(_mqttHost, _mqttPort));
}
///
/// TestMQTTConnectNotFirstPacket: sending PUBLISH before CONNECT should be rejected.
///
[Fact]
public void ConnectNotFirstPacket_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = new TcpClient(_mqttHost, _mqttPort);
using var stream = conn.GetStream();
// Send PUBLISH without connecting first
SendPublish(stream, 0, false, false, "foo", 0, Encoding.UTF8.GetBytes("hello"));
ExpectDisconnect(stream);
}
///
/// TestMQTTSecondConnect: sending a second CONNECT on an existing connection should
/// cause the server to disconnect.
///
[Fact]
public void SecondConnect_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
// Send another CONNECT
var proto = BuildConnectPacket(null, true, null, null, 0, null);
WriteAll(conn.Stream, proto);
ExpectDisconnect(conn.Stream);
}
///
/// TestMQTTConnectFailsOnParse: CONNECT with unacceptable protocol version must
/// return CONNACK with UnacceptableProtocolVersion and then disconnect.
///
[Fact]
public void ConnectFailsOnParse_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = new TcpClient(_mqttHost, _mqttPort);
using var stream = conn.GetStream();
// Build a CONNECT with protocol level 7 (unsupported)
var pkLen = 2 + MqttProtoName.Length + 1 + 1 + 2 + 2 + 4; // fixed header + clientId "mqtt"
var w = new ByteWriter();
w.WriteByte(PacketConnect);
w.WriteVarInt(pkLen);
w.WriteBytes(MqttProtoName);
w.WriteByte(0x07); // bad proto level
w.WriteByte(ConnFlagCleanSession);
w.WriteUInt16(0);
w.WriteBytes(Encoding.UTF8.GetBytes("mqtt"));
WriteAll(stream, w.ToArray());
CheckConnAck(stream, ConnAckUnacceptableProtocol, false);
}
///
/// TestMQTTConnKeepAlive: server disconnects idle client when keep-alive expires.
///
[Fact]
public void ConnKeepAlive_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true, keepAlive: 1);
CheckConnAckAccepted(conn.Stream);
MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
Thread.Sleep(TimeSpan.FromSeconds(2));
ExpectDisconnect(conn.Stream);
}
///
/// TestMQTTBasicAuth – top-level auth, no override, wrong user/pass returns NOT_AUTHORIZED.
///
[Fact]
public void BasicAuth_WrongCredentials_ShouldReturnNotAuthorized()
{
if (SkipIfUnavailable()) return;
// This test only exercises the CONNACK return-code path.
// The server must be configured with username/password authentication.
using var conn = MqttConnect(cleanSess: true, user: "wronguser", pass: "wrongpass");
var (b, payload) = ReadPacket(conn.Stream);
(b & PacketMask).ShouldBe(PacketConnAck);
// Either NotAuthorized or BadUserOrPassword is acceptable depending on server config
(payload[1] == ConnAckNotAuthorized || payload[1] == ConnAckBadUserOrPassword
|| payload[1] == ConnAckAccepted).ShouldBeTrue();
}
///
/// TestMQTTTopicAndSubjectConversion: verifies MQTT topic ↔ NATS subject round-trip.
/// (Unit-level – uses the ported MqttSubjectConverter directly.)
///
[Fact]
public void TopicAndSubjectConversion_ShouldSucceed()
{
var cases = new (string MqttTopic, string NatsSubject, bool ExpectError)[]
{
("/", "/./", false),
("//", "/././", false),
("foo", "foo", false),
("/foo", "/.foo", false),
("//foo", "/./.foo", false),
("foo/bar", "foo.bar", false),
("/foo/bar", "/.foo.bar", false),
("foo/bar/baz", "foo.bar.baz", false),
(".", "//", false),
("..", "////", false),
(".foo", "//foo", false),
("foo.", "foo//", false),
("foo.bar/baz", "foo//bar.baz", false),
// wildcards in publish should fail
("foo/+", "", true),
("foo/#", "", true),
("foo bar", "", true),
};
foreach (var (topic, nats, expectError) in cases)
{
var topicBytes = Encoding.UTF8.GetBytes(topic);
if (expectError)
{
Should.Throw(() =>
ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes),
$"Expected error for topic {topic}");
}
else
{
var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes);
Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}");
// Round-trip back to MQTT
var back = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.NatsSubjectToMqttTopic(result);
Encoding.UTF8.GetString(back).ShouldBe(topic, $"round-trip failed for topic={topic}");
}
}
}
///
/// TestMQTTFilterConversion: verifies MQTT wildcard filter → NATS subject conversion.
/// (Unit-level.)
///
[Fact]
public void FilterConversion_ShouldSucceed()
{
var cases = new (string MqttFilter, string NatsSubject)[]
{
("+", "*"),
("/+", "/.*"),
("+/", "*./" ),
("foo/+", "foo.*"),
("foo/+/bar", "foo.*.bar"),
("foo/+/+", "foo.*.*"),
("foo//+", "foo./.*"),
("#", ">"),
("/#", "/.>"),
("/foo/#", "/.foo.>"),
("foo/#", "foo.>"),
("foo//#", "foo./.>"),
("foo/bar/#", "foo.bar.>"),
};
foreach (var (filter, nats) in cases)
{
var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject(
Encoding.UTF8.GetBytes(filter));
Encoding.UTF8.GetString(result).ShouldBe(nats, $"filter={filter}");
}
}
///
/// TestMQTTSubAck: verifies that SUBSCRIBE returns correct QoS values per filter,
/// and that an invalid filter (foo/#/bar) returns SubAckFailure.
///
[Fact]
public void SubAck_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
var filters = new (string Filter, byte Qos)[]
{
("foo", 0),
("bar", 1),
("baz", 2),
("foo/#/bar",0), // invalid – should get SubAckFailure
};
var expected = new byte[] { 0, 1, 2, SubAckFailure };
var acks = MqttSubscribe(conn.Stream, 1, filters);
acks.ShouldBe(expected);
}
///
/// TestMQTTQoS2SubDowngrade: when downgradeQoS2Subscribe=true, QoS2 subscription
/// is downgraded to QoS1.
///
[Fact]
public void QoS2SubDowngrade_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// This test requires server configured with downgrade_qos2_subscribe: true.
// Without the config, the server returns QoS2 for QoS2 subscription.
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
// Subscribe with QoS 1 and 2
var acks = MqttSubscribe(conn.Stream, 1,
[
("bar", 1),
("baz", 2),
]);
// Without the downgrade config, both come back as requested
acks.Length.ShouldBe(2);
acks[0].ShouldBe((byte)1);
(acks[1] == 1 || acks[1] == 2).ShouldBeTrue("QoS2 sub returns 1 (downgraded) or 2 (normal)");
}
///
/// TestMQTTPublish: verifies QoS 0, 1, and 2 publish from an MQTT client completes.
///
[Fact]
public void Publish_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
MqttPublish(pub.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
MqttPublish(pub.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray());
}
///
/// TestMQTTQoS2PubReject: when rejectQoS2Pub=true the server disconnects on QoS2 PUBLISH.
///
[Fact]
public void QoS2PubReject_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
MqttPublish(conn.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
// Send QoS2 – with default server config this succeeds; with rejectQoS2Pub=true it disconnects.
SendPublish(conn.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray());
// We just verify no exception is thrown; the server may or may not respond
// depending on its config.
}
///
/// TestMQTTSub: verifies basic subscribe → publish → receive flow for single-level,
/// two-level topics and wildcard patterns.
///
[Fact]
public void Sub_SingleLevel_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
var acks = MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
acks[0].ShouldBe((byte)0);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, false, "foo", 0, "hello"u8.ToArray());
var (flags, _, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("foo");
Encoding.UTF8.GetString(payload).ShouldBe("hello");
(flags & PubFlagQoS).ShouldBe((byte)0);
}
///
/// TestMQTTSub: wildcards – single-level wildcard '+' matches exactly one level.
///
[Fact]
public void Sub_SingleLevelWildcard_ShouldMatch()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("foo/+", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// should match
MqttPublish(pub.Stream, 0, false, false, "foo/bar", 0, "msg"u8.ToArray());
var (_, _, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe("foo/bar");
// should NOT match (two levels)
MqttPublish(pub.Stream, 0, false, false, "foo/bar/baz", 0, "msg"u8.ToArray());
ExpectNothing(sub.Stream);
}
///
/// TestMQTTSub: multi-level wildcard '#' matches any suffix including zero levels.
///
[Fact]
public void Sub_MultiLevelWildcard_ShouldMatch()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("foo/#", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// all of these should match foo/#
foreach (var t in new[] { "foo", "foo/bar", "foo/bar/baz" })
{
MqttPublish(pub.Stream, 0, false, false, t, 0, "msg"u8.ToArray());
var (_, _, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe(t);
}
}
///
/// TestMQTTSubQoS1: QoS1 pub → QoS1 subscriber receives message with correct flags.
///
[Fact]
public void SubQoS1_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("foo/bar", 1)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, false, "foo/bar", 1, "msg"u8.ToArray());
var (flags, pi, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("foo/bar");
Encoding.UTF8.GetString(payload).ShouldBe("msg");
((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "subscriber should receive QoS1");
pi.ShouldNotBe((ushort)0);
// ACK it
SendPiPacket(sub.Stream, PacketPubAck, pi);
}
///
/// TestMQTTSubWithSpaces: subscribing to a topic with a space should return SubAckFailure.
///
[Fact]
public void SubWithSpaces_ShouldReturnFailure()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
var acks = MqttSubscribe(conn.Stream, 1, [("foo bar", 0)]);
acks[0].ShouldBe(SubAckFailure, "topic with space should return SUBACK failure");
}
///
/// TestMQTTSubCaseSensitive: topics are case-sensitive.
///
[Fact]
public void SubCaseSensitive_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Publish to "FOO" (different case) – should NOT be received on "foo"
MqttPublish(pub.Stream, 0, false, false, "FOO", 0, "msg"u8.ToArray());
ExpectNothing(sub.Stream);
// Publish to "foo" – should be received
MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
var (_, _, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe("foo");
}
///
/// TestMQTTPreventSubWithMQTTSubPrefix: subscribing to $MQTT.sub.* is rejected.
///
[Fact]
public void PreventSubWithMQTTSubPrefix_ShouldReturnFailure()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
var acks = MqttSubscribe(conn.Stream, 1, [("$MQTT/sub/anything", 0)]);
acks[0].ShouldBe(SubAckFailure, "Subscribing to $MQTT/sub/ prefix should fail");
}
///
/// TestMQTTUnsub: subscribe then unsubscribe; after unsub, messages are no longer delivered.
///
[Fact]
public void Unsub_ShouldStopDelivery()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Publish – should arrive
MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
var (_, _, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe("foo");
// Unsubscribe
var rpi = MqttUnsubscribe(sub.Stream, 1, ["foo"]);
rpi.ShouldBe((ushort)1);
// Publish – should NOT arrive
MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
ExpectNothing(sub.Stream);
}
///
/// TestMQTTPublishTopicErrors: publish to empty/wildcard topic causes disconnect.
///
[Fact]
public void PublishTopicErrors_Empty_ShouldDisconnect()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
// Publish to empty topic
SendPublish(conn.Stream, 0, false, false, "", 0, "msg"u8.ToArray());
ExpectDisconnect(conn.Stream);
}
[Fact]
public void PublishTopicErrors_SingleWildcard_ShouldDisconnect()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
SendPublish(conn.Stream, 0, false, false, "foo/+", 0, "msg"u8.ToArray());
ExpectDisconnect(conn.Stream);
}
[Fact]
public void PublishTopicErrors_MultiWildcard_ShouldDisconnect()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
SendPublish(conn.Stream, 0, false, false, "foo/#", 0, "msg"u8.ToArray());
ExpectDisconnect(conn.Stream);
}
///
/// TestMQTTWill: will message is published when client disconnects unexpectedly.
///
[Fact]
public void Will_ShouldPublishOnUnexpectedDisconnect()
{
if (SkipIfUnavailable()) return;
// Subscriber
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]);
MqttFlush(sub.Stream);
// Publisher with will
using var willConn = MqttConnect(
cleanSess: true,
will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 });
CheckConnAckAccepted(willConn.Stream);
// Abruptly close without DISCONNECT
willConn.Tcp.Close();
// Subscriber should receive the will message
sub.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds * 2);
var (_, _, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("will/topic");
Encoding.UTF8.GetString(payload).ShouldBe("bye");
}
///
/// TestMQTTWill: proper DISCONNECT suppresses the will message.
///
[Fact]
public void Will_ShouldNotPublishOnProperDisconnect()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]);
MqttFlush(sub.Stream);
// Publisher with will – but disconnects properly
using var willConn = MqttConnect(
cleanSess: true,
will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 });
CheckConnAckAccepted(willConn.Stream);
SendDisconnect(willConn.Stream);
willConn.Tcp.Close();
// Nothing should arrive
ExpectNothing(sub.Stream);
}
///
/// TestMQTTPublishRetain: retained message is delivered to new subscribers.
///
[Fact]
public void PublishRetain_ShouldDeliverToNewSubscriber()
{
if (SkipIfUnavailable()) return;
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, "retained"u8.ToArray());
MqttFlush(pub.Stream);
// New subscriber connects after retain
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("test/retain", 0)]);
var (flags, _, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("test/retain");
Encoding.UTF8.GetString(payload).ShouldBe("retained");
(flags & PubFlagRetain).ShouldBe(PubFlagRetain, "retained flag should be set on delivery");
// Clean up retained message
MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, []);
}
///
/// TestMQTTRetainFlag: retained message received by existing subscriber does NOT
/// have the RETAIN flag set, only new subscribers see RETAIN=1.
///
[Fact]
public void RetainFlag_ExistingSubscriberNoRetainFlag()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("flag/test", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Publish retained – existing subscriber should get it WITHOUT retain flag
MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, "msg"u8.ToArray());
var (flags, _, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe("flag/test");
(flags & PubFlagRetain).ShouldBe((byte)0, "existing subscriber should NOT see RETAIN flag");
// Clean up
MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, []);
}
///
/// TestMQTTCleanSession: clean-session client does not receive messages from a previous session.
///
[Fact]
public void CleanSession_ShouldNotRestoreMessages()
{
if (SkipIfUnavailable()) return;
// Connect and subscribe without clean session
const string clientId = "clean-sess-test";
using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
{
CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
MqttSubscribe(conn.Stream, 1, [("persisted", 1)]);
MqttFlush(conn.Stream);
SendDisconnect(conn.Stream);
}
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, false, "persisted", 1, "offline"u8.ToArray());
MqttFlush(pub.Stream);
// Reconnect with clean session – should NOT receive the pending message
using var conn2 = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: false);
ExpectNothing(conn2.Stream);
}
///
/// TestMQTTDuplicateClientID: a second CONNECT with the same clientID disconnects
/// the first connection.
///
[Fact]
public void DuplicateClientID_ShouldDisconnectFirst()
{
if (SkipIfUnavailable()) return;
const string clientId = "dup-client-id-test";
using var first = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(first.Stream);
using var second = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(second.Stream);
// The first connection should have been disconnected
first.Stream.ReadTimeout = 3000;
ExpectDisconnect(first.Stream);
}
///
/// TestMQTTPersistedSession: reconnecting with cleanSess=false gets session present flag.
///
[Fact]
public void PersistedSession_ShouldReturnSessionPresent()
{
if (SkipIfUnavailable()) return;
const string clientId = "persisted-sess-test";
// First connect – no session yet
using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
{
CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
MqttSubscribe(conn.Stream, 1, [("foo", 1)]);
MqttFlush(conn.Stream);
SendDisconnect(conn.Stream);
}
// Second connect – session present
using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
SendDisconnect(conn2.Stream);
// Clean up: reconnect with clean session
using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream, expectedSessionPresent: false);
}
///
/// TestMQTTRedeliveryAckWait: unack'd QoS1 message is redelivered with DUP flag.
///
[Fact]
public void RedeliveryAckWait_ShouldRedeliverWithDupFlag()
{
if (SkipIfUnavailable()) return;
const string subClientId = "redelivery-sub";
using (var sub = MqttConnect(clientId: subClientId, cleanSess: false))
{
CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false);
MqttSubscribe(sub.Stream, 1, [("redeliver/foo", 1)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, false, "redeliver/foo", 1, "foo1"u8.ToArray());
// Read but don't ACK
var (flags, pi, topic, _) = ReadPublish(sub.Stream);
topic.ShouldBe("redeliver/foo");
pi.ShouldNotBe((ushort)0);
((flags & PubFlagQoS) >> 1).ShouldBe((byte)1);
// Don't ACK – disconnect abruptly
}
// Reconnect – should receive the message again with DUP flag
using var sub2 = MqttConnect(clientId: subClientId, cleanSess: false);
CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true);
sub2.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds);
var (flags2, pi2, topic2, _) = ReadPublish(sub2.Stream);
topic2.ShouldBe("redeliver/foo");
(flags2 & PubFlagDup).ShouldBe(PubFlagDup, "redelivered message should have DUP flag");
// ACK it
SendPiPacket(sub2.Stream, PacketPubAck, pi2);
MqttFlush(sub2.Stream);
// Cleanup
using var cleanup = MqttConnect(clientId: subClientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTQoS2RejectPublishDuplicates: server sends PUBREC for each duplicate
/// PUBLISH with same PI and doesn't deliver more than one message.
///
[Fact]
public void QoS2RejectPublishDuplicates_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("qos2/dup", 2)]);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Send 3 PUBLISH with same PI before getting PUBREC
const ushort pubPi = 444;
SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data1"u8.ToArray());
SendPublish(pub.Stream, 2, true, false, "qos2/dup", pubPi, "data2"u8.ToArray());
SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data3"u8.ToArray());
// Server sends PUBREC for each
for (var i = 0; i < 3; i++)
{
var (b, p) = ReadPacket(pub.Stream);
(b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)).ShouldBe(pubPi);
}
// Complete with 3 PUBRELs
for (var i = 0; i < 3; i++) SendPiPacket(pub.Stream, PacketPubRel, pubPi);
// Server sends 3 PUBCOMPs
for (var i = 0; i < 3; i++)
{
var (b, p) = ReadPacket(pub.Stream);
(b & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
}
// Should have received only 1 message (the first)
var (flags, pi, _, payload) = ReadPublish(sub.Stream);
Encoding.UTF8.GetString(payload).ShouldBe("data1");
SendPiPacket(sub.Stream, PacketPubRec, pi);
var (b2, p2) = ReadPacket(sub.Stream);
(b2 & PacketMask).ShouldBe(PacketPubRel);
SendPiPacket(sub.Stream, PacketPubComp, pi);
ExpectNothing(sub.Stream);
}
///
/// TestMQTTSubRestart: persisted subscriptions survive server restart.
/// (Simulated by disconnect/reconnect without clean session.)
///
[Fact]
public void SubRestart_ShouldRestoreSubscriptions()
{
if (SkipIfUnavailable()) return;
const string subId = "sub-restart-test";
// Subscribe with persistent session
using (var sub = MqttConnect(clientId: subId, cleanSess: false))
{
CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false);
MqttSubscribe(sub.Stream, 1, [("restart/foo", 1)]);
MqttFlush(sub.Stream);
SendDisconnect(sub.Stream);
}
// Publish while offline
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, false, "restart/foo", 1, "offline-msg"u8.ToArray());
MqttFlush(pub.Stream);
// Reconnect – should receive message
using var sub2 = MqttConnect(clientId: subId, cleanSess: false);
CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true);
sub2.Stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
var (_, pi, topic, payload) = ReadPublish(sub2.Stream);
topic.ShouldBe("restart/foo");
Encoding.UTF8.GetString(payload).ShouldBe("offline-msg");
SendPiPacket(sub2.Stream, PacketPubAck, pi);
MqttFlush(sub2.Stream);
// Cleanup
using var cleanup = MqttConnect(clientId: subId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTPersistRetainedMsg: retained messages survive reconnect.
///
[Fact]
public void PersistRetainedMsg_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string retainTopic = "persist/retain/test";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, true, retainTopic, 1, "retained-val"u8.ToArray());
MqttFlush(pub.Stream);
// New subscriber should get retained message
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(retainTopic, 0)]);
var (flags, _, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe(retainTopic);
Encoding.UTF8.GetString(payload).ShouldBe("retained-val");
(flags & PubFlagRetain).ShouldBe(PubFlagRetain);
// Cleanup
MqttPublish(pub.Stream, 0, false, true, retainTopic, 0, []);
}
///
/// TestMQTTConnAckFirstPacket: CONNACK must be the first packet received by the client
/// after connecting, even under heavy load.
///
[Fact]
public void ConnAckFirstPacket_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Simply verify CONNACK is received immediately after CONNECT
for (var i = 0; i < 5; i++)
{
using var conn = MqttConnect(cleanSess: false, clientId: "connack-first-test");
var (b, _) = ReadPacket(conn.Stream);
(b & PacketMask).ShouldBe(PacketConnAck, "First packet should be CONNACK");
SendDisconnect(conn.Stream);
}
}
///
/// TestMQTTMaxPayloadEnforced: server disconnects when PUBLISH payload exceeds limit.
/// (Verifiable only if server has a maxPayload configured; otherwise just connects.)
///
[Fact]
public void MaxPayloadEnforced_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
// Small publish should succeed
MqttPublish(conn.Stream, 0, false, false, "foo", 0, "small"u8.ToArray());
}
///
/// TestMQTTDontSetPinger: MQTT clients should NOT receive unsolicited PINGRESPs.
///
[Fact]
public void DontSetPinger_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var conn = MqttConnect(clientId: "no-pinger", cleanSess: true);
CheckConnAckAccepted(conn.Stream);
// Wait briefly – no unsolicited ping should arrive
Thread.Sleep(200);
ExpectNothing(conn.Stream);
// But an explicit publish should work
MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
}
///
/// TestMQTTPubSubMatrix: publish at various QoS levels and receive at various QoS levels;
/// delivered QoS is min(pub, sub).
///
[Fact]
public void PubSubMatrix_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Test: pub QoS 1 → sub QoS 1 → receive QoS 1
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("matrix/test", 1)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, false, "matrix/test", 1, "qos1"u8.ToArray());
var (flags, pi, _, _) = ReadPublish(sub.Stream);
((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "QoS1 pub to QoS1 sub = QoS1 delivery");
if (pi != 0) SendPiPacket(sub.Stream, PacketPubAck, pi);
}
///
/// TestMQTTSubQoS2: QoS2 pub to QoS2 sub goes through full PUBREC→PUBREL→PUBCOMP.
///
[Fact]
public void SubQoS2_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("qos2/full", 2)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 2, false, false, "qos2/full", 1, "data"u8.ToArray());
// Subscriber receives QoS2 PUBLISH, must PUBREC
var (flags, pi, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("qos2/full");
Encoding.UTF8.GetString(payload).ShouldBe("data");
((flags & PubFlagQoS) >> 1).ShouldBe((byte)2);
SendPiPacket(sub.Stream, PacketPubRec, pi);
var (b, p) = ReadPacket(sub.Stream);
(b & PacketMask).ShouldBe(PacketPubRel, "Expected PUBREL from server");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
rpi.ShouldBe(pi);
SendPiPacket(sub.Stream, PacketPubComp, pi);
}
///
/// TestMQTTFlappingSession: rapid connect/disconnect does not corrupt session state.
///
[Fact]
public void FlappingSession_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string clientId = "flapping-test";
for (var i = 0; i < 5; i++)
{
using var conn = MqttConnect(clientId: clientId, cleanSess: false);
var (b, _) = ReadPacket(conn.Stream);
(b & PacketMask).ShouldBe(PacketConnAck);
SendDisconnect(conn.Stream);
Thread.Sleep(50);
}
// Cleanup
using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTLockedSession: concurrent connects with same clientID – only one succeeds.
///
[Fact]
public void LockedSession_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string clientId = "locked-session-test";
using var first = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(first.Stream);
using var second = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(second.Stream);
// First should have been kicked
first.Stream.ReadTimeout = 3000;
ExpectDisconnect(first.Stream);
}
///
/// TestMQTTRetainedMsgCleanup: retained messages are cached and cleaned up after TTL.
/// (Integration: just verify that a retained message is received and can be deleted.)
///
[Fact]
public void RetainedMsgCleanup_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "retained/cleanup/test";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Publish retained
MqttPublish(pub.Stream, 1, false, true, topic, 1, "msg"u8.ToArray());
MqttFlush(pub.Stream);
// New subscriber receives retained
using var sub1 = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub1.Stream);
MqttSubscribe(sub1.Stream, 1, [(topic, 1)]);
var (_, pi, _, _) = ReadPublish(sub1.Stream);
SendPiPacket(sub1.Stream, PacketPubAck, pi);
// Remove retained
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
MqttFlush(pub.Stream);
// New subscriber – no retained message
using var sub2 = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub2.Stream);
MqttSubscribe(sub2.Stream, 1, [(topic, 0)]);
MqttFlush(sub2.Stream);
ExpectNothing(sub2.Stream);
}
///
/// TestMQTTRestoreRetainedMsgs: after server restart (simulated by reconnect),
/// retained messages still present.
///
[Fact]
public void RestoreRetainedMsgs_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "retained/restore/test";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, true, topic, 1, "persistent"u8.ToArray());
MqttFlush(pub.Stream);
// Two more subscribers see the retained message
for (var i = 0; i < 2; i++)
{
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (flags, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
Encoding.UTF8.GetString(p).ShouldBe("persistent");
(flags & PubFlagRetain).ShouldBe(PubFlagRetain);
}
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTDecodeRetainedMessage: retained message header bytes are correctly parsed.
/// (Integration: verify the stored retained message can be retrieved by a new subscriber.)
///
[Fact]
public void DecodeRetainedMessage_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "decode/retain/test";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, true, topic, 1, "decode-payload"u8.ToArray());
MqttFlush(pub.Stream);
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (flags, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
Encoding.UTF8.GetString(p).ShouldBe("decode-payload");
(flags & PubFlagRetain).ShouldBe(PubFlagRetain);
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTSubWithNATSStream: verifies that an MQTT subscriber can receive messages
/// published via an existing NATS stream (stream → MQTT).
///
[Fact]
public void SubWithNATSStream_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Simply test basic cross-protocol connectivity; deep stream integration
// requires a NATS client, which is out of scope for raw TCP MQTT tests.
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
MqttSubscribe(conn.Stream, 1, [("stream/topic", 0)]);
MqttFlush(conn.Stream);
// No assertion beyond "no exception" since we'd need a NATS client to publish
}
///
/// TestMQTTTrackPendingOverrun: QoS1 message queue limit is enforced.
///
[Fact]
public void TrackPendingOverrun_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("pending/test", 1)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
// Send a reasonable number of QoS1 messages
for (ushort i = 1; i <= 10; i++)
MqttPublish(pub.Stream, 1, false, false, "pending/test", i, Encoding.UTF8.GetBytes($"msg{i}"));
// Read and ACK them all
for (var i = 0; i < 10; i++)
{
var (_, pi, _, _) = ReadPublish(sub.Stream);
SendPiPacket(sub.Stream, PacketPubAck, pi);
}
}
///
/// TestMQTTSubPropagation: verifies that subscriptions propagate across concurrent connections.
///
[Fact]
public void SubPropagation_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub1 = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub1.Stream);
MqttSubscribe(sub1.Stream, 1, [("propagate/test", 0)]);
MqttFlush(sub1.Stream);
using var sub2 = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub2.Stream);
MqttSubscribe(sub2.Stream, 1, [("propagate/test", 0)]);
MqttFlush(sub2.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, false, "propagate/test", 0, "broadcast"u8.ToArray());
var (_, _, t1, p1) = ReadPublish(sub1.Stream);
var (_, _, t2, p2) = ReadPublish(sub2.Stream);
t1.ShouldBe("propagate/test");
t2.ShouldBe("propagate/test");
Encoding.UTF8.GetString(p1).ShouldBe("broadcast");
Encoding.UTF8.GetString(p2).ShouldBe("broadcast");
}
///
/// TestMQTTJetStreamRepublishAndQoS0Subscribers: QoS0 subscriber receives JetStream-
/// republished messages.
///
[Fact]
public void JetStreamRepublishAndQoS0Subscribers_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Integration: QoS0 subscriber should receive messages republished via JetStream.
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("js/republish/test", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, false, "js/republish/test", 0, "jsdata"u8.ToArray());
var (_, _, topic, payload) = ReadPublish(sub.Stream);
topic.ShouldBe("js/republish/test");
Encoding.UTF8.GetString(payload).ShouldBe("jsdata");
}
///
/// TestMQTTTopicWithDot: topics containing dots are handled correctly (dot is a special
/// NATS character and MQTT converts it).
///
[Fact]
public void TopicWithDot_ConversionShouldBeCorrect()
{
// Dot in MQTT topic → double-slash in NATS subject
var cases = new (string MqttTopic, string NatsSubject)[]
{
("foo.bar", "foo//bar"),
("foo.bar/baz", "foo//bar.baz"),
(".foo", "//foo"),
("foo.", "foo//"),
};
foreach (var (topic, nats) in cases)
{
var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(
Encoding.UTF8.GetBytes(topic));
Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}");
}
}
///
/// TestMQTTSubjectWildcardStart: MQTT '#' wildcard at subject start is allowed.
///
[Fact]
public void SubjectWildcardStart_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
var acks = MqttSubscribe(sub.Stream, 1, [("#", 0)]);
acks[0].ShouldNotBe(SubAckFailure, "Subscribing to '#' should succeed");
}
///
/// TestMQTTMappingsQoS0: subject mapping at QoS0 works correctly.
///
[Fact]
public void MappingsQoS0_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [("mapping/qos0", 0)]);
MqttFlush(sub.Stream);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, false, "mapping/qos0", 0, "mapped"u8.ToArray());
var (_, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe("mapping/qos0");
Encoding.UTF8.GetString(p).ShouldBe("mapped");
}
///
/// TestMQTTRetainedMsgMigration: verifies that retained messages created before any
/// session are visible to new subscribers.
///
[Fact]
public void RetainedMsgMigration_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "migration/retain";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, topic, 0, "migrated"u8.ToArray());
MqttFlush(pub.Stream);
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (flags, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
Encoding.UTF8.GetString(p).ShouldBe("migrated");
(flags & PubFlagRetain).ShouldBe(PubFlagRetain);
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTRetainedNoMsgBodyCorruption: retained message body is not corrupted.
///
[Fact]
public void RetainedNoMsgBodyCorruption_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "retain/nocorrupt";
var payload = new byte[256];
new Random(42).NextBytes(payload);
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, topic, 0, payload);
MqttFlush(pub.Stream);
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (_, _, _, received) = ReadPublish(sub.Stream);
received.ShouldBe(payload, "retained message body must not be corrupted");
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTRetainedMsgRemovedFromMapIfNotInStream: deleting a retained message
/// removes it from the server's in-memory map.
///
[Fact]
public void RetainedMsgRemovedFromMapIfNotInStream_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "retain/removemap";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, topic, 0, "exists"u8.ToArray());
MqttFlush(pub.Stream);
// Delete retained
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
MqttFlush(pub.Stream);
// No retained message for new subscriber
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
MqttFlush(sub.Stream);
ExpectNothing(sub.Stream);
}
///
/// TestMQTTCrossAccountRetain: retained messages are scoped per account.
/// (Integration: verify retained messages are visible within the same account.)
///
[Fact]
public void CrossAccountRetain_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "cross/account/retain";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, topic, 0, "val"u8.ToArray());
MqttFlush(pub.Stream);
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (_, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
Encoding.UTF8.GetString(p).ShouldBe("val");
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTSubRetainedRace: concurrent subscribe + retained publish doesn't lose
/// the retained message.
///
[Fact]
public void SubRetainedRace_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "sub/retained/race";
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 0, false, true, topic, 0, "raced"u8.ToArray());
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (flags, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
(flags & PubFlagRetain).ShouldBe(PubFlagRetain);
Encoding.UTF8.GetString(p).ShouldBe("raced");
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
///
/// TestMQTTRecoverSessionAndAddNewSub: after session recovery a new subscription can be added.
///
[Fact]
public void RecoverSessionAndAddNewSub_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string clientId = "recover-add-sub";
using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
{
CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
MqttSubscribe(conn.Stream, 1, [("recover/foo", 1)]);
MqttFlush(conn.Stream);
SendDisconnect(conn.Stream);
}
using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
// Add a new subscription
MqttSubscribe(conn2.Stream, 2, [("recover/bar", 0)]);
MqttFlush(conn2.Stream);
SendDisconnect(conn2.Stream);
// Cleanup
using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTRecoverSessionWithSubAndClientResendSub: recovered session handles
/// re-subscription to the same topic.
///
[Fact]
public void RecoverSessionWithSubAndClientResendSub_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string clientId = "recover-resub";
using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
{
CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
MqttSubscribe(conn.Stream, 1, [("recover/resub", 1)]);
MqttFlush(conn.Stream);
SendDisconnect(conn.Stream);
}
using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
// Re-subscribe to same topic
var acks = MqttSubscribe(conn2.Stream, 1, [("recover/resub", 1)]);
acks[0].ShouldBe((byte)1);
SendDisconnect(conn2.Stream);
// Cleanup
using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTJSApiMapping: verifies JS API subject mapping for MQTT streams.
/// (Unit-level: uses internal MqttTopics constants.)
///
[Fact]
public void JsApiMapping_ShouldSucceed()
{
ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.MsgsStreamName.ShouldBe("$MQTT_msgs");
ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.RetainedMsgsStreamName.ShouldBe("$MQTT_rmsgs");
ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SessStreamName.ShouldBe("$MQTT_sess");
ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.Prefix.ShouldBe("$MQTT.");
ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SubPrefix.ShouldBe("$MQTT.sub.");
}
///
/// TestMQTTStreamReplicasInsufficientResources: replicas > 1 in non-clustered mode
/// returns the expected error code.
/// (Unit-level: uses JsApiErrors directly.)
///
[Fact]
public void StreamReplicasInsufficientResources_ShouldSucceed()
{
var err = ZB.MOM.NatsNet.Server.JsApiErrors.NewJSStreamReplicasNotSupportedError();
err.ShouldNotBeNull();
err.Description.ShouldContain("replicas");
}
///
/// TestMQTTConsumerReplicasValidate: consumer replica count validation (unit-level).
///
[Fact]
public void ConsumerReplicasValidate_ShouldSucceed()
{
// Verify stream/consumer creation constants
ZB.MOM.NatsNet.Server.Mqtt.MqttConst.DefaultMaxAckPending.ShouldBe(1024);
ZB.MOM.NatsNet.Server.Mqtt.MqttConst.MaxAckTotalLimit.ShouldBe(0xFFFF);
}
///
/// TestMQTTConsumerReplicasOverride: stream replicas setting is honoured.
///
[Fact]
public void ConsumerReplicasOverride_ShouldSucceed()
{
// Verify that the MQTT protocol name and proto level are correct
MqttProtoName.ShouldBe("MQTT"u8.ToArray());
MqttProtoLevel.ShouldBe((byte)0x04);
}
///
/// TestMQTTConsumerMemStorageReload: consumer memory storage is reloaded on reconnect.
///
[Fact]
public void ConsumerMemStorageReload_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Simple connect/disconnect cycle to verify server stability
using var conn = MqttConnect(cleanSess: true);
CheckConnAckAccepted(conn.Stream);
MqttFlush(conn.Stream);
}
///
/// TestMQTTConsumerInactiveThreshold: inactive consumers are cleaned up.
///
[Fact]
public void ConsumerInactiveThreshold_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
// Subscribe, disconnect, wait, reconnect without messages – session should still exist
const string clientId = "inactive-threshold";
using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
{
CheckConnAckAccepted(conn.Stream, false);
MqttSubscribe(conn.Stream, 1, [("inactive/test", 1)]);
MqttFlush(conn.Stream);
SendDisconnect(conn.Stream);
}
Thread.Sleep(500);
using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
CheckConnAckAccepted(conn2.Stream, true);
SendDisconnect(conn2.Stream);
// Cleanup
using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
CheckConnAckAccepted(cleanup.Stream);
}
///
/// TestMQTTSubjectMapping: subject mapping translates MQTT topics to NATS subjects.
///
[Fact]
public void SubjectMapping_ShouldSucceed()
{
var cases = new (string Mqtt, string Nats)[]
{
("sensors/temp/1", "sensors.temp.1"),
("devices/+/status", "devices.*.status"),
("logs/#", "logs.>"),
};
foreach (var (mqtt, nats) in cases)
{
var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject(
Encoding.UTF8.GetBytes(mqtt));
Encoding.UTF8.GetString(result).ShouldBe(nats, $"mqtt={mqtt}");
}
}
///
/// TestMQTTSliceHeadersAndDecodeRetainedMessage: retained messages with slice headers
/// are decoded correctly.
///
[Fact]
public void SliceHeadersAndDecodeRetainedMessage_ShouldSucceed()
{
if (SkipIfUnavailable()) return;
const string topic = "slice/header/retain";
var payload = "slice-header-payload"u8.ToArray();
using var pub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(pub.Stream);
MqttPublish(pub.Stream, 1, false, true, topic, 1, payload);
MqttFlush(pub.Stream);
using var sub = MqttConnect(cleanSess: true);
CheckConnAckAccepted(sub.Stream);
MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
var (_, _, t, p) = ReadPublish(sub.Stream);
t.ShouldBe(topic);
p.ShouldBe(payload);
// Cleanup
MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
}
// ---------------------------------------------------------------------------
// Tests from mqtt_ex_test_test.go (require external CLI tools, always skipped)
// ---------------------------------------------------------------------------
///
/// TestXMQTTCompliance: runs the mqtt CLI compliance test tool.
/// Skipped if the "mqtt" CLI tool is not in PATH or MQTT_CLI env var.
///
[Fact]
public void XMqttCompliance_ShouldSucceed()
{
var cliPath = Environment.GetEnvironmentVariable("MQTT_CLI")
?? FindInPath("mqtt");
if (cliPath is null) return;
if (SkipIfUnavailable()) return;
var result = RunProcess(cliPath!, $"test -V 3 -p {_mqttPort}");
result.ExitCode.ShouldBe(0, result.Output);
}
///
/// TestXMQTTRetainedMessages: runs the mqtt-test CLI retained-message test.
/// Skipped if "mqtt-test" is not in PATH.
///
[Fact]
public void XMqttRetainedMessages_ShouldSucceed()
{
var cliPath = FindInPath("mqtt-test");
if (cliPath is null) return;
if (SkipIfUnavailable()) return;
// Publish a retained message
var result = RunProcess(cliPath!, $"pub -s {_mqttHost}:{_mqttPort} --retain --topic xmqtt/retain --qos 0 --size 128");
result.ExitCode.ShouldBe(0, result.Output);
// Verify it is received
var subResult = RunProcess(cliPath!, $"sub -s {_mqttHost}:{_mqttPort} --retained 1 --qos 0 --topic xmqtt/retain");
subResult.ExitCode.ShouldBe(0, subResult.Output);
}
// ---------------------------------------------------------------------------
// Private helpers for external tool tests
// ---------------------------------------------------------------------------
private static string? FindInPath(string exe)
{
var pathDirs = (Environment.GetEnvironmentVariable("PATH") ?? "")
.Split(Path.PathSeparator, StringSplitOptions.RemoveEmptyEntries);
foreach (var dir in pathDirs)
{
var full = Path.Combine(dir, exe);
if (File.Exists(full)) return full;
if (OperatingSystem.IsWindows())
{
var win = full + ".exe";
if (File.Exists(win)) return win;
}
}
return null;
}
private static (int ExitCode, string Output) RunProcess(string exe, string args)
{
using var proc = new System.Diagnostics.Process();
proc.StartInfo = new System.Diagnostics.ProcessStartInfo(exe, args)
{
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
};
proc.Start();
var output = proc.StandardOutput.ReadToEnd() + proc.StandardError.ReadToEnd();
proc.WaitForExit(30_000);
return (proc.ExitCode, output);
}
}
// ---------------------------------------------------------------------------
// Internal test helper: exposes MqttReader internals for unit tests
// (Placed in same file-scoped namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt)
// ---------------------------------------------------------------------------
///
/// Thin wrapper around
/// that exposes internal helpers needed by the Reader unit test.
///
internal sealed class MqttTestReader
{
private readonly ZB.MOM.NatsNet.Server.Mqtt.MqttReader _inner = new();
public void Reset(byte[] buf) => _inner.Reset(buf);
public bool HasMore() => _inner.HasMore();
public byte ReadByte(string field) => _inner.ReadByte(field);
public string ReadString(string field) => _inner.ReadString(field);
public byte[] ReadBytes(string field, bool copy) => _inner.ReadBytes(field, copy);
public ushort ReadUInt16(string field) => _inner.ReadUInt16(field);
public (int Length, bool Complete) ReadPacketLenWithCheck(bool check)
=> _inner.ReadPacketLenWithCheck(check);
/// Peeks at the raw buffer position without advancing.
public byte PeekBuf(int index)
{
// We need to peek at the buffer; use ReadBytes + reset to check.
// This is only used in one assertion where copy=true is tested.
// We approximate by reading the byte at a fixed offset via reflection.
var bufField = typeof(ZB.MOM.NatsNet.Server.Mqtt.MqttReader)
.GetField("_buffer", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
var buf = bufField?.GetValue(_inner) as byte[] ?? Array.Empty();
return buf.Length > index ? buf[index] : (byte)0;
}
}