// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Ported from MQTT cluster tests in server/mqtt_test.go (lines 3014-3809). using System.Buffers.Binary; using System.Net.Sockets; using System.Text; using Shouldly; namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt; /// /// Integration tests for MQTT behavior across JetStream clusters. /// Tests cover subscription propagation, session takeover, retained message /// replication, replica counts, placement, and leafnode integration. /// /// These tests require a running NATS cluster with MQTT and JetStream enabled. /// [Trait("Category", "Integration")] public sealed class MqttClusterTests { // ------------------------------------------------------------------------- // Constants mirroring Go mqtt.go // ------------------------------------------------------------------------- private const byte PacketConnect = 0x10; private const byte PacketConnAck = 0x20; private const byte PacketPub = 0x30; private const byte PacketPubAck = 0x40; private const byte PacketPubRec = 0x50; private const byte PacketPubRel = 0x60; private const byte PacketPubComp = 0x70; private const byte PacketSub = 0x80; private const byte PacketSubAck = 0x90; private const byte PacketUnsub = 0xA0; private const byte PacketUnsubAck = 0xB0; private const byte PacketPing = 0xC0; private const byte PacketPingResp = 0xD0; private const byte PacketDisconnect = 0xE0; private const byte PacketMask = 0xF0; private const byte PacketFlagMask = 0x0F; private const byte ConnFlagCleanSession = 0x02; private const byte ConnFlagWillFlag = 0x04; private const byte ConnFlagWillRetain = 0x20; private const byte ConnFlagPasswordFlag = 0x40; private const byte ConnFlagUsernameFlag = 0x80; private const byte PubFlagRetain = 0x01; private const byte PubFlagQoS = 0x06; private const byte PubFlagDup = 0x08; private const byte PubQoS1 = 0x1 << 1; private const byte SubscribeFlags = 0x02; private const byte ConnAckAccepted = 0x00; private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT"); private const byte MqttProtoLevel = 0x04; private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10); // ------------------------------------------------------------------------- // MQTT protocol helpers (duplicated from MqttTests.cs for self-containment) // ------------------------------------------------------------------------- private sealed class MqttConnection : IDisposable { public TcpClient Tcp { get; } public NetworkStream Stream { get; } public MqttConnection(TcpClient tcp) { Tcp = tcp; Stream = tcp.GetStream(); } public void Dispose() { Stream.Dispose(); Tcp.Dispose(); } } private static MqttConnection MqttConnect( string host, int port, string? clientId = null, bool cleanSess = true, string? user = null, string? pass = null, ushort keepAlive = 0) { var tcp = new TcpClient(); tcp.Connect(host, port); var conn = new MqttConnection(tcp); var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive); WriteAll(conn.Stream, proto); return conn; } private static byte[] BuildConnectPacket( string? clientId, bool cleanSess, string? user, string? pass, ushort keepAlive) { byte flags = 0; if (cleanSess) flags |= ConnFlagCleanSession; var cidBytes = Encoding.UTF8.GetBytes(clientId ?? ""); var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null; var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null; if (userBytes is not null) flags |= ConnFlagUsernameFlag; if (passBytes is not null) flags |= ConnFlagPasswordFlag; var pkLen = 2 + MqttProtoName.Length + 1 + 1 + 2 + 2 + cidBytes.Length; if (userBytes is not null) pkLen += 2 + userBytes.Length; if (passBytes is not null) pkLen += 2 + passBytes.Length; var w = new ByteWriter(); w.WriteByte(PacketConnect); w.WriteVarInt(pkLen); w.WriteBytes(MqttProtoName); w.WriteByte(MqttProtoLevel); w.WriteByte(flags); w.WriteUInt16(keepAlive); w.WriteBytes(cidBytes); if (userBytes is not null) w.WriteBytes(userBytes); if (passBytes is not null) w.WriteBytes(passBytes); return w.ToArray(); } private static byte[] ReadExact(NetworkStream stream, int n) { var buf = new byte[n]; var offset = 0; stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; while (offset < n) { var read = stream.Read(buf, offset, n - offset); if (read == 0) throw new EndOfStreamException("MQTT connection closed"); offset += read; } return buf; } private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream) { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; var header = new byte[1]; var n = stream.Read(header, 0, 1); if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header"); var multiplier = 1; var value = 0; while (true) { var b2 = new byte[1]; if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException(); value += (b2[0] & 0x7F) * multiplier; if ((b2[0] & 0x80) == 0) break; multiplier *= 128; if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int"); } var payload = value > 0 ? ReadExact(stream, value) : []; return (header[0], payload); } private static void WriteAll(NetworkStream stream, byte[] data) { stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds; stream.Write(data, 0, data.Length); } private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent) { var (b, payload) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet"); payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes"); var sessionPresent = (payload[0] & 0x01) == 1; sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch"); payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})"); } private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false) => CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent); private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain, string topic, ushort pi, byte[] payload) { byte flags = 0; if (dup) flags |= PubFlagDup; if (retain) flags |= PubFlagRetain; flags |= (byte)((qos & 0x03) << 1); var topicBytes = Encoding.UTF8.GetBytes(topic); var pkLen = 2 + topicBytes.Length + payload.Length; if (qos > 0) pkLen += 2; var w = new ByteWriter(); w.WriteByte((byte)(PacketPub | flags)); w.WriteVarInt(pkLen); w.WriteBytes(topicBytes); if (qos > 0) w.WriteUInt16(pi); w.Write(payload); WriteAll(stream, w.ToArray()); } private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain, string topic, ushort pi, byte[] payload) { SendPublish(stream, qos, dup, retain, topic, pi, payload); switch (qos) { case 1: { var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)); rpi.ShouldBe(pi, "PUBACK packet identifier mismatch"); break; } case 2: { var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)); rpi.ShouldBe(pi, "PUBREC packet identifier mismatch"); SendPiPacket(stream, PacketPubRel, pi); var (b2, p2) = ReadPacket(stream); (b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP"); var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2)); rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch"); MqttFlush(stream); break; } } } private static void MqttFlush(NetworkStream stream) { var w = new ByteWriter(); w.WriteByte(PacketPing); w.WriteByte(0); WriteAll(stream, w.ToArray()); var (b, p) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP"); p.Length.ShouldBe(0, "PINGRESP payload should be empty"); } private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi) { var w = new ByteWriter(); w.WriteByte(packetType); w.WriteVarInt(2); w.WriteUInt16(pi); WriteAll(stream, w.ToArray()); } private static byte[] MqttSubscribe(NetworkStream stream, ushort pi, IEnumerable<(string Filter, byte Qos)> filters) { var filterList = filters.ToList(); var pkLen = 2; foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1; var w = new ByteWriter(); w.WriteByte((byte)(PacketSub | SubscribeFlags)); w.WriteVarInt(pkLen); w.WriteUInt16(pi); foreach (var (f, q) in filterList) { w.WriteBytes(Encoding.UTF8.GetBytes(f)); w.WriteByte(q); } WriteAll(stream, w.ToArray()); var (b, payload) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK"); var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2)); rpi.ShouldBe(pi, "SUBACK packet identifier mismatch"); return payload[2..]; } private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish( NetworkStream stream) { var (b, raw) = ReadPacket(stream); (b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet"); var flags = (byte)(b & PacketFlagMask); var qos = (byte)((flags & PubFlagQoS) >> 1); var pos = 0; var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen; ushort pi = 0; if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; } var payload = raw[pos..]; return (flags, pi, topic, payload); } private static void ExpectNothing(NetworkStream stream) { stream.ReadTimeout = 150; var buf = new byte[128]; try { var n = stream.Read(buf, 0, buf.Length); n.ShouldBe(0, "Expected no data from MQTT server"); } catch (IOException ex) when (ex.InnerException is SocketException se && se.SocketErrorCode == SocketError.TimedOut) { // expected } catch (IOException) { } finally { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; } } private static void ExpectDisconnect(NetworkStream stream) { stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds; var buf = new byte[64]; try { var n = stream.Read(buf, 0, buf.Length); n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes"); } catch (EndOfStreamException) { } catch (IOException) { } } private static void SendDisconnect(NetworkStream stream) { var w = new ByteWriter(); w.WriteByte(PacketDisconnect); w.WriteByte(0); WriteAll(stream, w.ToArray()); } private sealed class ByteWriter { private readonly List _buf = []; public void WriteByte(byte b) => _buf.Add(b); public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); } public void WriteUInt16(ushort v) { _buf.Add((byte)(v >> 8)); _buf.Add((byte)v); } public void WriteBytes(byte[] data) { WriteUInt16((ushort)data.Length); Write(data); } public void WriteVarInt(int v) { while (true) { var b = (byte)(v & 0x7F); v >>= 7; if (v > 0) b |= 0x80; _buf.Add(b); if (v == 0) break; } } public byte[] ToArray() => [.. _buf]; } // ------------------------------------------------------------------------- // Helpers used by cluster tests // ------------------------------------------------------------------------- /// /// Checks a received PUBLISH message against expected topic, flags, and payload. /// Mirrors Go's testMQTTCheckPubMsg. /// private static void CheckPubMsg(NetworkStream stream, string expectedTopic, byte expectedFlags, byte[] expectedPayload) { var (flags, pi, topic, payload) = ReadPublish(stream); topic.ShouldBe(expectedTopic, "PUBLISH topic mismatch"); flags.ShouldBe(expectedFlags, $"PUBLISH flags mismatch (got 0x{flags:X2}, expected 0x{expectedFlags:X2})"); payload.ShouldBe(expectedPayload, "PUBLISH payload mismatch"); // ACK QoS 1 messages if (((flags & PubFlagQoS) >> 1) == 1) { SendPiPacket(stream, PacketPubAck, pi); } } /// /// Connects, verifies CONNACK, then disconnects. Mirrors Go's testMQTTConnectDisconnect. /// private static void MqttConnectDisconnect(string host, int port, string clientId, bool clean, bool expectedSessionPresent) { using var conn = MqttConnect(host, port, clientId: clientId, cleanSess: clean); CheckConnAck(conn.Stream, ConnAckAccepted, expectedSessionPresent); SendDisconnect(conn.Stream); } // ------------------------------------------------------------------------- // Tests — 8 MQTT cluster tests from server/mqtt_test.go // ------------------------------------------------------------------------- /// /// Ported from Go TestMQTTSubPropagation (mqtt_test.go:3014). /// Verifies that MQTT subscriptions on one cluster node cause interest /// propagation to other nodes. Specifically checks that foo/# matches /// foo.bar, foo./, and foo on the NATS side. /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2215 public void MQTTSubPropagation_ShouldSucceed() { // 2-server cluster; subscribe on S1 with foo/#, publish from NATS on S2. // Verify propagation covers foo.bar, foo./, and foo. // // Go test: // cl := createJetStreamClusterWithTemplate(t, ..., "MQTT", 2) // mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) // testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/#", qos: 0}}, []byte{0}) // testMQTTFlush(t, mc, nil, r) // checkSubInterest(t, s2, globalAccountName, "foo", time.Second) // // natsPub(t, nc, "foo.bar", []byte("hello")) // testMQTTCheckPubMsg(t, mc, r, "foo/bar", 0, []byte("hello")) // // natsPub(t, nc, "foo./", []byte("from")) // testMQTTCheckPubMsg(t, mc, r, "foo/", 0, []byte("from")) // // natsPub(t, nc, "foo", []byte("NATS")) // testMQTTCheckPubMsg(t, mc, r, "foo", 0, []byte("NATS")) // Cluster setup: 2 servers with MQTT + JetStream var srv1Host = "127.0.0.1"; var srv1Port = 1883; using var mc = MqttConnect(srv1Host, srv1Port, cleanSess: true); CheckConnAckAccepted(mc.Stream); // Subscribe to foo/# on server 1 MqttSubscribe(mc.Stream, 1, [("foo/#", 0)]); MqttFlush(mc.Stream); // Publish on NATS subjects from server 2; MQTT subscriber should receive // foo.bar → foo/bar, foo./ → foo/, foo → foo // NOTE: This requires a NATS client connected to S2 to publish. // When running against a real cluster, the NATS publish would be done // via natsConnect(s2.ClientURL()) / natsPub(). // Verify received messages match expected MQTT topics // CheckPubMsg(mc.Stream, "foo/bar", 0, "hello"u8.ToArray()); // CheckPubMsg(mc.Stream, "foo/", 0, "from"u8.ToArray()); // CheckPubMsg(mc.Stream, "foo", 0, "NATS"u8.ToArray()); SendDisconnect(mc.Stream); } /// /// Ported from Go TestMQTTCluster (mqtt_test.go:3044). /// Tests cross-node pub/sub and session takeover across a 2-server cluster. /// Runs subtests: {first_start, restart} × {qos_0, qos_1}. /// After first_start, the cluster is restarted and the tests are repeated. /// Verifies that: /// - NATS-published messages reach MQTT subscribers across nodes /// - MQTT-published messages (QoS 0 and 1) reach subscribers /// - Session takeover from another node disconnects the first client /// - QoS downgrade to subscriber QoS level works correctly /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2216 public void MQTTCluster_ShouldSucceed() { // 2-server cluster with nested subtests. // // Go test structure: // for _, topTest := range [{first_start, true}, {restart, false}] { // for _, test := range [{qos_0, 0}, {qos_1, 1}] { // clientID := nuid.Next() // mc, r := testMQTTConnectRetry(cleanSess: false) // testMQTTSub(1, mc, r, "foo/#", test.subQos) // // check := func(mc, r, o, s) { // // NATS pub → MQTT sub // natsPub(nc, "foo.bar", "fromNats") // testMQTTCheckPubMsg(mc, r, "foo/bar", 0, "fromNats") // // MQTT pub QoS0 → MQTT sub // testMQTTPublish(mpc, pr, 0, false, false, "foo/baz", 0, "mqtt_qos0") // testMQTTCheckPubMsg(mc, r, "foo/baz", 0, "mqtt_qos0") // // MQTT pub QoS1 → MQTT sub (downgraded to sub QoS) // testMQTTPublish(mpc, pr, 1, false, false, "foo/bat", 1, "mqtt_qos1") // expectedQoS := 0 or PubQoS1 depending on test.subQos // testMQTTCheckPubMsg(mc, r, "foo/bat", expectedQoS, "mqtt_qos1") // } // check(mc, r, cl.opts[0], cl.servers[0]) // check(mc, r, cl.opts[1], cl.servers[1]) // // // Session takeover from other node // mc2, r2 := testMQTTConnect(clientID, cleanSess: false, server 2) // testMQTTCheckConnAck(r2, accepted, sessionPresent: true) // testMQTTExpectDisconnect(mc) // first connection kicked // check(mc2, r2, both servers) // // // Clean up session // testMQTTDisconnect(mc2) // mc2 = testMQTTConnect(clientID, cleanSess: true) // testMQTTDisconnect(mc2) // } // if topTest.restart { cl.stopAll(); cl.restartAll(); waitOnStreams } // } var host = "127.0.0.1"; var port = 1883; foreach (var (topTestName, restart) in new[] { ("first_start", true), ("restart", false) }) { foreach (var (testName, subQos) in new[] { ("qos_0", (byte)0), ("qos_1", (byte)1) }) { var clientId = Guid.NewGuid().ToString("N"); using var mc = MqttConnect(host, port, clientId: clientId, cleanSess: false); CheckConnAckAccepted(mc.Stream); MqttSubscribe(mc.Stream, 1, [("foo/#", subQos)]); MqttFlush(mc.Stream); // Cross-node pub/sub checks would happen here with real cluster // Session takeover from server 2 would disconnect mc // Clean up session state SendDisconnect(mc.Stream); } } } /// /// Ported from Go TestMQTTClusterConnectDisconnectClean (mqtt_test.go:3176). /// Stress test: 100 random connect/disconnect iterations with clean sessions /// across a 3-server cluster. Skipped in Go source (flaky due to timing). /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2217 public void MQTTClusterConnectDisconnectClean_ShouldSucceed() { // Go test (skipped in Go source): // t.Skip() // nServers := 3 // cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers) // clientID := nuid.Next() // N := 100 // for n := 0; n < N; n++ { // testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false) // } var clientId = Guid.NewGuid().ToString("N"); var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) }; var rng = new Random(); for (var n = 0; n < 100; n++) { var (host, port) = hosts[rng.Next(hosts.Length)]; MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false); } } /// /// Ported from Go TestMQTTClusterConnectDisconnectPersist (mqtt_test.go:3198). /// 20 iterations of persistent session connect/disconnect across a 3-server cluster. /// On each iteration, first cleans sessions on all servers, then performs persistent /// connects verifying session-present flag progression. Skipped in Go (flaky). /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2218 public void MQTTClusterConnectDisconnectPersist_ShouldSucceed() { // Go test (skipped in Go source): // t.Skip() // nServers := 3 // cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers) // clientID := nuid.Next() // N := 20 // for n := 0; n < N; n++ { // // First clean sessions on all servers // for i := 0; i < nServers; i++ { // testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false) // } // testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false) // testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true) // testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true) // testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true) // } var clientId = Guid.NewGuid().ToString("N"); var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) }; for (var n = 0; n < 20; n++) { // Clean sessions on all servers foreach (var (host, port) in hosts) { MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false); } // Persistent connects: first has no session, subsequent find existing session MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: false); MqttConnectDisconnect(hosts[1].Item1, hosts[1].Item2, clientId, clean: false, expectedSessionPresent: true); MqttConnectDisconnect(hosts[2].Item1, hosts[2].Item2, clientId, clean: false, expectedSessionPresent: true); MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: true); } } /// /// Ported from Go TestMQTTClusterRetainedMsg (mqtt_test.go:3228). /// Tests retained message behavior across a 2-server cluster: /// - Retained messages published on one server are received on the other /// - New subscribers receive retained messages with retain flag /// - Empty retained message deletes the retained message /// - After deletion, reconnecting subscribers don't receive the retained message /// - Network deletes (cross-node retained message removal) work correctly /// - After network delete + new retained message, reconnecting subs get the new message /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2219 public void MQTTClusterRetainedMsg_ShouldSucceed() { // Go test: // cl := createJetStreamClusterWithTemplate(..., "MQTT", 2) // srv1Opts, srv2Opts := cl.opts[0], cl.opts[1] // // -- Subscribe on S1 -- // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false, S1) // testMQTTSub(1, mc, rc, "foo/#", qos: 1) // // -- Publish retained from S2 -- // mp, rp := testMQTTConnect(cleanSess: true, S2) // testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "retained") // testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "retained") // // -- New sub on S1 gets retained msg with retain flag -- // mc2, rc2 := testMQTTConnect(cleanSess: true, S1) // testMQTTSub(1, mc2, rc2, "foo/#", qos: 1) // testMQTTCheckPubMsg(mc2, rc2, "foo/bar", PubQoS1|PubFlagRetain, "retained") // // -- Empty retained deletes from storage but still delivered -- // testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "") // testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "") // // -- Reconnect to S2: retained should NOT be delivered -- // mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S2) // testMQTTSub(1, mc, rc, "foo/#", qos: 1) // testMQTTExpectNothing(rc) // // -- Reconnect to S1: retained should NOT be delivered -- // mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S1) // testMQTTSub(1, mc, rc, "foo/#", qos: 1) // testMQTTExpectNothing(rc) // // -- Network delete test -- // Sub on S1 ("sub_one"), sub on S2 ("sub_two"), both subscribe to "bar" // Publish retained "msg1" from S2 → both receive // Publish empty retained → both receive (delete) // Publish retained "msg2" from S2 → both receive // Reconnect "sub_one" on S1 → gets "msg2" with retain flag var srv1Host = "127.0.0.1"; var srv1Port = 1883; var srv2Host = "127.0.0.1"; var srv2Port = 1884; // Subscribe on server 1 using var mc = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false); CheckConnAckAccepted(mc.Stream); MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]); MqttFlush(mc.Stream); // Publish retained from server 2 using var mp = MqttConnect(srv2Host, srv2Port, cleanSess: true); CheckConnAckAccepted(mp.Stream); MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, "retained"u8.ToArray()); CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, "retained"u8.ToArray()); // New subscriber on S1 gets retained message with retain flag using var mc2 = MqttConnect(srv1Host, srv1Port, cleanSess: true); CheckConnAckAccepted(mc2.Stream); MqttSubscribe(mc2.Stream, 1, [("foo/#", 1)]); CheckPubMsg(mc2.Stream, "foo/bar", PubQoS1 | PubFlagRetain, "retained"u8.ToArray()); SendDisconnect(mc2.Stream); // Empty retained message deletes it from storage but still delivered MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, []); // empty payload CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, []); SendDisconnect(mc.Stream); // Reconnect to S2: retained should NOT be delivered (was deleted) using var mc3 = MqttConnect(srv2Host, srv2Port, clientId: "sub", cleanSess: false); CheckConnAck(mc3.Stream, ConnAckAccepted, expectedSessionPresent: true); MqttSubscribe(mc3.Stream, 1, [("foo/#", 1)]); ExpectNothing(mc3.Stream); SendDisconnect(mc3.Stream); // Reconnect to S1: retained should NOT be delivered using var mc4 = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false); CheckConnAck(mc4.Stream, ConnAckAccepted, expectedSessionPresent: true); MqttSubscribe(mc4.Stream, 1, [("foo/#", 1)]); ExpectNothing(mc4.Stream); SendDisconnect(mc4.Stream); // --- Network delete test --- // Subscribe on S1 ("sub_one") and S2 ("sub_two") to "bar" using var mcOne = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false); CheckConnAckAccepted(mcOne.Stream); MqttSubscribe(mcOne.Stream, 1, [("bar", 1)]); MqttFlush(mcOne.Stream); using var mcTwo = MqttConnect(srv2Host, srv2Port, clientId: "sub_two", cleanSess: false); CheckConnAckAccepted(mcTwo.Stream); MqttSubscribe(mcTwo.Stream, 1, [("bar", 1)]); MqttFlush(mcTwo.Stream); // Publish retained "msg1" from S2 MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg1"u8.ToArray()); CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg1"u8.ToArray()); CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg1"u8.ToArray()); // Empty retained deletes it (network delete for S1) MqttPublish(mp.Stream, 1, false, true, "bar", 1, []); CheckPubMsg(mcOne.Stream, "bar", PubQoS1, []); CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, []); // New retained message MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg2"u8.ToArray()); CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg2"u8.ToArray()); CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg2"u8.ToArray()); // Reconnect sub_one on S1: should get msg2 with retain flag SendDisconnect(mcOne.Stream); using var mcOneRecon = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false); CheckConnAck(mcOneRecon.Stream, ConnAckAccepted, expectedSessionPresent: true); MqttSubscribe(mcOneRecon.Stream, 1, [("bar", 1)]); CheckPubMsg(mcOneRecon.Stream, "bar", PubQoS1 | PubFlagRetain, "msg2"u8.ToArray()); SendDisconnect(mcOneRecon.Stream); SendDisconnect(mcTwo.Stream); SendDisconnect(mp.Stream); } /// /// Ported from Go TestMQTTClusterReplicasCount (mqtt_test.go:3525). /// Parameterized test verifying that MQTT JetStream streams are created with /// the correct replica count based on cluster size: /// size=1 → replicas=1, size=2 → replicas=2, size=3 → replicas=3, size=5 → replicas=3 /// Checks $MQTT_msgs, $MQTT_rmsgs, and $MQTT_sess streams. /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2222 public void MQTTClusterReplicasCount_ShouldSucceed() { // Go test: // for _, test := range []struct{ size, replicas int }{ // {1, 1}, {2, 2}, {3, 3}, {5, 3}, // } { // if test.size == 1 { single server } else { cluster of test.size } // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false) // testMQTTSub(1, mc, rc, "foo/#", qos: 1) // testMQTTFlush(mc, nil, rc) // js, _ := nc.JetStream() // for _, sname := range []string{mqttStreamName, mqttRetainedMsgsStreamName, mqttSessStreamName} { // si, _ := js.StreamInfo(sname) // si.Config.Replicas == test.replicas // } // } var testCases = new[] { (Size: 1, Replicas: 1), (Size: 2, Replicas: 2), (Size: 3, Replicas: 3), (Size: 5, Replicas: 3) }; var mqttStreams = new[] { "$MQTT_msgs", "$MQTT_rmsgs", "$MQTT_sess" }; foreach (var (size, expectedReplicas) in testCases) { var host = "127.0.0.1"; var port = 1883; using var mc = MqttConnect(host, port, clientId: "sub", cleanSess: false); CheckConnAckAccepted(mc.Stream); MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]); MqttFlush(mc.Stream); // Verify replica counts on $MQTT_msgs, $MQTT_rmsgs, $MQTT_sess // This requires a JetStream API call to check stream info. // nc := natsConnect(s.ClientURL()) // js, _ := nc.JetStream() // for each stream, js.StreamInfo(sname).Config.Replicas == expectedReplicas SendDisconnect(mc.Stream); } } /// /// Ported from Go TestMQTTClusterCanCreateSessionWithOnServerDown (mqtt_test.go:3582). /// Tests that MQTT sessions can still be created when one server in a 3-server /// cluster is down. After shutting down one server, waits for quorum and verifies /// a new MQTT connection can still be established on a remaining server. /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2223 public void MQTTClusterCanCreateSessionWithOnServerDown_ShouldSucceed() { // Go test: // cl := createJetStreamClusterWithTemplate(..., "MQTT", 3) // o := cl.opts[0] // mc, rc := testMQTTConnectRetry(cleanSess: true, S1) // testMQTTCheckConnAck(rc, accepted, false) // mc.Close() // // // Shutdown server 2 // cl.servers[1].Shutdown() // cl.waitOnPeerCount(2) // cl.waitOnLeader() // // // Connect to server 3; should still work with quorum // o = cl.opts[2] // mc, rc = testMQTTConnectRetry(cleanSess: true, S3, retries: 5) // testMQTTCheckConnAck(rc, accepted, false) // Initial connection on server 1 var host1 = "127.0.0.1"; var port1 = 1883; using var mc1 = MqttConnect(host1, port1, cleanSess: true); CheckConnAckAccepted(mc1.Stream); // Server 2 would be shut down here (cl.servers[1].Shutdown()) // Wait for quorum (cl.waitOnPeerCount(2), cl.waitOnLeader()) // Connect to server 3: should succeed with 2/3 quorum var host3 = "127.0.0.1"; var port3 = 1885; using var mc3 = MqttConnect(host3, port3, cleanSess: true); CheckConnAckAccepted(mc3.Stream); } /// /// Ported from Go TestMQTTClusterPlacement (mqtt_test.go:3611). /// Tests that MQTT JetStream assets (streams and consumers) are placed on the /// leaf node cluster (SPOKE) rather than the hub when using a SuperCluster /// topology (3 hub clusters × 2 servers each + 3 SPOKE leaf nodes). /// Verifies that all streams and their consumers have cluster name "SPOKE" /// and all replicas have names prefixed with "SPOKE-". /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2224 public void MQTTClusterPlacement_ShouldSucceed() { // Go test: // sc := createJetStreamSuperCluster(t, 3, 2) // c := sc.randomCluster() // lnc := c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafAndMQTT, "SPOKE", 3, 22111) // sc.waitOnPeerCount(9) // sc.waitOnLeader() // // for i := 0; i < 10; i++ { // mc, rc := testMQTTConnectRetry(cleanSess: true, lnc.opts[i%3]) // testMQTTCheckConnAck(rc, accepted, false) // } // // nc := natsConnect(lnc.servers[0].ClientURL()) // js, _ := nc.JetStream() // for si := range js.StreamsInfo() { // si.Cluster.Name == "SPOKE" // all replicas have "SPOKE-" prefix // for ci := range js.ConsumersInfo(si.Config.Name) { // ci.Cluster.Name == "SPOKE" // all replicas have "SPOKE-" prefix // } // } // SuperCluster: 3 hub clusters × 2 servers + 3 SPOKE leaf nodes // Connect 10 MQTT clients to SPOKE leaf nodes var spokeHost = "127.0.0.1"; var spokePorts = new[] { 22111, 22112, 22113 }; for (var i = 0; i < 10; i++) { var port = spokePorts[i % 3]; using var mc = MqttConnect(spokeHost, port, cleanSess: true); CheckConnAckAccepted(mc.Stream); } // Verify all MQTT streams are on SPOKE cluster // Verify all consumers are on SPOKE cluster // Verify all replica names start with "SPOKE-" // This requires JetStream API calls via NATS client. } /// /// Ported from Go TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc (mqtt_test.go:3664). /// Tests MQTT connectivity through a leafnode without JetStream to a cluster /// that has JetStream. Runs 6 sub-cases varying how JetStream domain resolution /// is configured: /// 0: JsAccDefaultDomain="", JS enabled in leaf with extra account /// 1: JsAccDefaultDomain="", JS disabled in leaf /// 2: JsAccDefaultDomain=DOMAIN, JS disabled in leaf /// 3: MQTT.JsDomain=DOMAIN, JS enabled in leaf with extra account /// 4: MQTT.JsDomain=DOMAIN, JS disabled in leaf /// 5: JsAccDefaultDomain=DOMAIN, JS enabled in leaf with extra account /// For each case, subscribes on leafnode, publishes from leafnode and hub, /// verifies cross-topology message delivery. /// [Fact(Skip = "deferred: requires running NATS cluster")] // T:2225 public void MQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc_ShouldSucceed() { // Go test: // test := func(t *testing.T, resolution int) { // // 3-server hub cluster // getClusterOpts := func(name, i) { ... hub opts ... } // o1, o2, o3 := getClusterOpts("S1",1), getClusterOpts("S2",2), getClusterOpts("S3",3) // s1, s2, s3 := testMQTTRunServer(oi) // checkClusterFormed(s1, s2, s3) // waitForJetStreamLeader() // // // Leafnode with MQTT but no JS (varies by resolution case) // lno := testMQTTDefaultOptions() // lno.JetStream = false // switch resolution { // case 0: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": ""} // case 1: lno.JsAccDefaultDomain = {"$G": ""} // case 2: lno.JsAccDefaultDomain = {"$G": "DOMAIN"} // case 3: lno.Accounts = [NewAccount("unused")]; lno.MQTT.JsDomain = "DOMAIN" // case 4: lno.MQTT.JsDomain = "DOMAIN" // case 5: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": "DOMAIN"} // } // lno.LeafNode.Remotes = [{ URLs: [o1,o2,o3 LeafNode ports] }] // ln := RunServer(lno) // checkLeafNodeConnected(ln) // // // Subscribe on leafnode // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: true, lno.MQTT) // testMQTTSub(1, mc, rc, "foo", qos: 1) // // connectAndPublish := func(o *Options) { // mp := testMQTTConnectRetry(cleanSess: true, o.MQTT) // testMQTTPublish(mp, 1, false, false, "foo", 1, "msg") // } // // // Pub from leafnode → sub receives // connectAndPublish(lno) // testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg") // // // Pub from hub server → sub receives // connectAndPublish(o3) // testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg") // // // Subscribe on hub server // mc2, rc2 := testMQTTConnectRetry(clientID: "sub2", cleanSess: true, o2.MQTT) // testMQTTSub(1, mc2, rc2, "foo", qos: 1) // // // Pub from leafnode → hub sub receives // connectAndPublish(lno) // testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg") // // // Pub from hub → hub sub receives // connectAndPublish(o1) // testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg") // } // // 6 subtests with resolution 0-5 var subCases = new (string Name, int Resolution)[] { ("backwards-compatibility-default-js-enabled-in-leaf", 0), ("backwards-compatibility-default-js-disabled-in-leaf", 1), ("backwards-compatibility-domain-js-disabled-in-leaf", 2), ("mqtt-explicit-js-enabled-in-leaf", 3), ("mqtt-explicit-js-disabled-in-leaf", 4), ("backwards-compatibility-domain-js-enabled-in-leaf", 5), }; foreach (var (name, resolution) in subCases) { // 3-server hub cluster + leafnode with MQTT // Configuration varies per resolution case var leafHost = "127.0.0.1"; var leafPort = 1883; var hubHost = "127.0.0.1"; var hubPorts = new[] { 1884, 1885, 1886 }; // Subscribe on leafnode using var mc = MqttConnect(leafHost, leafPort, clientId: "sub", cleanSess: true); CheckConnAckAccepted(mc.Stream); MqttSubscribe(mc.Stream, 1, [("foo", 1)]); MqttFlush(mc.Stream); // Publish from leafnode using var mp1 = MqttConnect(leafHost, leafPort, cleanSess: true); CheckConnAckAccepted(mp1.Stream); MqttPublish(mp1.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray()); SendDisconnect(mp1.Stream); // Publish from hub server 3 using var mp2 = MqttConnect(hubHost, hubPorts[2], cleanSess: true); CheckConnAckAccepted(mp2.Stream); MqttPublish(mp2.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray()); SendDisconnect(mp2.Stream); // Subscribe on hub server 2 using var mc2 = MqttConnect(hubHost, hubPorts[1], clientId: "sub2", cleanSess: true); CheckConnAckAccepted(mc2.Stream); MqttSubscribe(mc2.Stream, 1, [("foo", 1)]); MqttFlush(mc2.Stream); // Publish from leafnode → hub sub receives using var mp3 = MqttConnect(leafHost, leafPort, cleanSess: true); CheckConnAckAccepted(mp3.Stream); MqttPublish(mp3.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray()); SendDisconnect(mp3.Stream); // Publish from hub server 1 → hub sub receives using var mp4 = MqttConnect(hubHost, hubPorts[0], cleanSess: true); CheckConnAckAccepted(mp4.Stream); MqttPublish(mp4.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray()); CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray()); SendDisconnect(mp4.Stream); SendDisconnect(mc2.Stream); SendDisconnect(mc.Stream); } } }