diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttClusterTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttClusterTests.cs
new file mode 100644
index 0000000..1243d7e
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttClusterTests.cs
@@ -0,0 +1,1055 @@
+// Copyright 2020-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Ported from MQTT cluster tests in server/mqtt_test.go (lines 3014-3809).
+
+using System.Buffers.Binary;
+using System.Net.Sockets;
+using System.Text;
+using Shouldly;
+
+namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt;
+
+///
+/// Integration tests for MQTT behavior across JetStream clusters.
+/// Tests cover subscription propagation, session takeover, retained message
+/// replication, replica counts, placement, and leafnode integration.
+///
+/// These tests require a running NATS cluster with MQTT and JetStream enabled.
+///
+[Trait("Category", "Integration")]
+public sealed class MqttClusterTests
+{
+ // -------------------------------------------------------------------------
+ // Constants mirroring Go mqtt.go
+ // -------------------------------------------------------------------------
+
+ private const byte PacketConnect = 0x10;
+ private const byte PacketConnAck = 0x20;
+ private const byte PacketPub = 0x30;
+ private const byte PacketPubAck = 0x40;
+ private const byte PacketPubRec = 0x50;
+ private const byte PacketPubRel = 0x60;
+ private const byte PacketPubComp = 0x70;
+ private const byte PacketSub = 0x80;
+ private const byte PacketSubAck = 0x90;
+ private const byte PacketUnsub = 0xA0;
+ private const byte PacketUnsubAck = 0xB0;
+ private const byte PacketPing = 0xC0;
+ private const byte PacketPingResp = 0xD0;
+ private const byte PacketDisconnect = 0xE0;
+ private const byte PacketMask = 0xF0;
+ private const byte PacketFlagMask = 0x0F;
+
+ private const byte ConnFlagCleanSession = 0x02;
+ private const byte ConnFlagWillFlag = 0x04;
+ private const byte ConnFlagWillRetain = 0x20;
+ private const byte ConnFlagPasswordFlag = 0x40;
+ private const byte ConnFlagUsernameFlag = 0x80;
+
+ private const byte PubFlagRetain = 0x01;
+ private const byte PubFlagQoS = 0x06;
+ private const byte PubFlagDup = 0x08;
+ private const byte PubQoS1 = 0x1 << 1;
+
+ private const byte SubscribeFlags = 0x02;
+
+ private const byte ConnAckAccepted = 0x00;
+
+ private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT");
+ private const byte MqttProtoLevel = 0x04;
+
+ private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10);
+
+ // -------------------------------------------------------------------------
+ // MQTT protocol helpers (duplicated from MqttTests.cs for self-containment)
+ // -------------------------------------------------------------------------
+
+ private sealed class MqttConnection : IDisposable
+ {
+ public TcpClient Tcp { get; }
+ public NetworkStream Stream { get; }
+
+ public MqttConnection(TcpClient tcp)
+ {
+ Tcp = tcp;
+ Stream = tcp.GetStream();
+ }
+
+ public void Dispose()
+ {
+ Stream.Dispose();
+ Tcp.Dispose();
+ }
+ }
+
+ private static MqttConnection MqttConnect(
+ string host, int port,
+ string? clientId = null,
+ bool cleanSess = true,
+ string? user = null,
+ string? pass = null,
+ ushort keepAlive = 0)
+ {
+ var tcp = new TcpClient();
+ tcp.Connect(host, port);
+ var conn = new MqttConnection(tcp);
+
+ var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive);
+ WriteAll(conn.Stream, proto);
+ return conn;
+ }
+
+ private static byte[] BuildConnectPacket(
+ string? clientId,
+ bool cleanSess,
+ string? user,
+ string? pass,
+ ushort keepAlive)
+ {
+ byte flags = 0;
+ if (cleanSess) flags |= ConnFlagCleanSession;
+
+ var cidBytes = Encoding.UTF8.GetBytes(clientId ?? "");
+ var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null;
+ var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null;
+
+ if (userBytes is not null) flags |= ConnFlagUsernameFlag;
+ if (passBytes is not null) flags |= ConnFlagPasswordFlag;
+
+ var pkLen = 2 + MqttProtoName.Length +
+ 1 + 1 + 2 +
+ 2 + cidBytes.Length;
+ if (userBytes is not null) pkLen += 2 + userBytes.Length;
+ if (passBytes is not null) pkLen += 2 + passBytes.Length;
+
+ var w = new ByteWriter();
+ w.WriteByte(PacketConnect);
+ w.WriteVarInt(pkLen);
+ w.WriteBytes(MqttProtoName);
+ w.WriteByte(MqttProtoLevel);
+ w.WriteByte(flags);
+ w.WriteUInt16(keepAlive);
+ w.WriteBytes(cidBytes);
+ if (userBytes is not null) w.WriteBytes(userBytes);
+ if (passBytes is not null) w.WriteBytes(passBytes);
+ return w.ToArray();
+ }
+
+ private static byte[] ReadExact(NetworkStream stream, int n)
+ {
+ var buf = new byte[n];
+ var offset = 0;
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ while (offset < n)
+ {
+ var read = stream.Read(buf, offset, n - offset);
+ if (read == 0) throw new EndOfStreamException("MQTT connection closed");
+ offset += read;
+ }
+ return buf;
+ }
+
+ private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream)
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ var header = new byte[1];
+ var n = stream.Read(header, 0, 1);
+ if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header");
+
+ var multiplier = 1;
+ var value = 0;
+ while (true)
+ {
+ var b2 = new byte[1];
+ if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException();
+ value += (b2[0] & 0x7F) * multiplier;
+ if ((b2[0] & 0x80) == 0) break;
+ multiplier *= 128;
+ if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int");
+ }
+
+ var payload = value > 0 ? ReadExact(stream, value) : [];
+ return (header[0], payload);
+ }
+
+ private static void WriteAll(NetworkStream stream, byte[] data)
+ {
+ stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds;
+ stream.Write(data, 0, data.Length);
+ }
+
+ private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent)
+ {
+ var (b, payload) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet");
+ payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes");
+ var sessionPresent = (payload[0] & 0x01) == 1;
+ sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch");
+ payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})");
+ }
+
+ private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false)
+ => CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent);
+
+ private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain,
+ string topic, ushort pi, byte[] payload)
+ {
+ byte flags = 0;
+ if (dup) flags |= PubFlagDup;
+ if (retain) flags |= PubFlagRetain;
+ flags |= (byte)((qos & 0x03) << 1);
+
+ var topicBytes = Encoding.UTF8.GetBytes(topic);
+ var pkLen = 2 + topicBytes.Length + payload.Length;
+ if (qos > 0) pkLen += 2;
+
+ var w = new ByteWriter();
+ w.WriteByte((byte)(PacketPub | flags));
+ w.WriteVarInt(pkLen);
+ w.WriteBytes(topicBytes);
+ if (qos > 0) w.WriteUInt16(pi);
+ w.Write(payload);
+ WriteAll(stream, w.ToArray());
+ }
+
+ private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain,
+ string topic, ushort pi, byte[] payload)
+ {
+ SendPublish(stream, qos, dup, retain, topic, pi, payload);
+ switch (qos)
+ {
+ case 1:
+ {
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "PUBACK packet identifier mismatch");
+ break;
+ }
+ case 2:
+ {
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "PUBREC packet identifier mismatch");
+
+ SendPiPacket(stream, PacketPubRel, pi);
+
+ var (b2, p2) = ReadPacket(stream);
+ (b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
+ var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2));
+ rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch");
+
+ MqttFlush(stream);
+ break;
+ }
+ }
+ }
+
+ private static void MqttFlush(NetworkStream stream)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(PacketPing);
+ w.WriteByte(0);
+ WriteAll(stream, w.ToArray());
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP");
+ p.Length.ShouldBe(0, "PINGRESP payload should be empty");
+ }
+
+ private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(packetType);
+ w.WriteVarInt(2);
+ w.WriteUInt16(pi);
+ WriteAll(stream, w.ToArray());
+ }
+
+ private static byte[] MqttSubscribe(NetworkStream stream, ushort pi,
+ IEnumerable<(string Filter, byte Qos)> filters)
+ {
+ var filterList = filters.ToList();
+ var pkLen = 2;
+ foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1;
+
+ var w = new ByteWriter();
+ w.WriteByte((byte)(PacketSub | SubscribeFlags));
+ w.WriteVarInt(pkLen);
+ w.WriteUInt16(pi);
+ foreach (var (f, q) in filterList)
+ {
+ w.WriteBytes(Encoding.UTF8.GetBytes(f));
+ w.WriteByte(q);
+ }
+ WriteAll(stream, w.ToArray());
+
+ var (b, payload) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "SUBACK packet identifier mismatch");
+ return payload[2..];
+ }
+
+ private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish(
+ NetworkStream stream)
+ {
+ var (b, raw) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet");
+ var flags = (byte)(b & PacketFlagMask);
+ var qos = (byte)((flags & PubFlagQoS) >> 1);
+
+ var pos = 0;
+ var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2;
+ var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen;
+ ushort pi = 0;
+ if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; }
+ var payload = raw[pos..];
+ return (flags, pi, topic, payload);
+ }
+
+ private static void ExpectNothing(NetworkStream stream)
+ {
+ stream.ReadTimeout = 150;
+ var buf = new byte[128];
+ try
+ {
+ var n = stream.Read(buf, 0, buf.Length);
+ n.ShouldBe(0, "Expected no data from MQTT server");
+ }
+ catch (IOException ex) when (ex.InnerException is SocketException se &&
+ se.SocketErrorCode == SocketError.TimedOut)
+ {
+ // expected
+ }
+ catch (IOException) { }
+ finally
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ }
+ }
+
+ private static void ExpectDisconnect(NetworkStream stream)
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ var buf = new byte[64];
+ try
+ {
+ var n = stream.Read(buf, 0, buf.Length);
+ n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes");
+ }
+ catch (EndOfStreamException) { }
+ catch (IOException) { }
+ }
+
+ private static void SendDisconnect(NetworkStream stream)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(PacketDisconnect);
+ w.WriteByte(0);
+ WriteAll(stream, w.ToArray());
+ }
+
+ private sealed class ByteWriter
+ {
+ private readonly List _buf = [];
+
+ public void WriteByte(byte b) => _buf.Add(b);
+ public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); }
+
+ public void WriteUInt16(ushort v)
+ {
+ _buf.Add((byte)(v >> 8));
+ _buf.Add((byte)v);
+ }
+
+ public void WriteBytes(byte[] data)
+ {
+ WriteUInt16((ushort)data.Length);
+ Write(data);
+ }
+
+ public void WriteVarInt(int v)
+ {
+ while (true)
+ {
+ var b = (byte)(v & 0x7F);
+ v >>= 7;
+ if (v > 0) b |= 0x80;
+ _buf.Add(b);
+ if (v == 0) break;
+ }
+ }
+
+ public byte[] ToArray() => [.. _buf];
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers used by cluster tests
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Checks a received PUBLISH message against expected topic, flags, and payload.
+ /// Mirrors Go's testMQTTCheckPubMsg.
+ ///
+ private static void CheckPubMsg(NetworkStream stream, string expectedTopic,
+ byte expectedFlags, byte[] expectedPayload)
+ {
+ var (flags, pi, topic, payload) = ReadPublish(stream);
+ topic.ShouldBe(expectedTopic, "PUBLISH topic mismatch");
+ flags.ShouldBe(expectedFlags, $"PUBLISH flags mismatch (got 0x{flags:X2}, expected 0x{expectedFlags:X2})");
+ payload.ShouldBe(expectedPayload, "PUBLISH payload mismatch");
+
+ // ACK QoS 1 messages
+ if (((flags & PubFlagQoS) >> 1) == 1)
+ {
+ SendPiPacket(stream, PacketPubAck, pi);
+ }
+ }
+
+ ///
+ /// Connects, verifies CONNACK, then disconnects. Mirrors Go's testMQTTConnectDisconnect.
+ ///
+ private static void MqttConnectDisconnect(string host, int port,
+ string clientId, bool clean, bool expectedSessionPresent)
+ {
+ using var conn = MqttConnect(host, port, clientId: clientId, cleanSess: clean);
+ CheckConnAck(conn.Stream, ConnAckAccepted, expectedSessionPresent);
+ SendDisconnect(conn.Stream);
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests — 8 MQTT cluster tests from server/mqtt_test.go
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Ported from Go TestMQTTSubPropagation (mqtt_test.go:3014).
+ /// Verifies that MQTT subscriptions on one cluster node cause interest
+ /// propagation to other nodes. Specifically checks that foo/# matches
+ /// foo.bar, foo./, and foo on the NATS side.
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2215
+ public void MQTTSubPropagation_ShouldSucceed()
+ {
+ // 2-server cluster; subscribe on S1 with foo/#, publish from NATS on S2.
+ // Verify propagation covers foo.bar, foo./, and foo.
+ //
+ // Go test:
+ // cl := createJetStreamClusterWithTemplate(t, ..., "MQTT", 2)
+ // mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
+ // testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/#", qos: 0}}, []byte{0})
+ // testMQTTFlush(t, mc, nil, r)
+ // checkSubInterest(t, s2, globalAccountName, "foo", time.Second)
+ //
+ // natsPub(t, nc, "foo.bar", []byte("hello"))
+ // testMQTTCheckPubMsg(t, mc, r, "foo/bar", 0, []byte("hello"))
+ //
+ // natsPub(t, nc, "foo./", []byte("from"))
+ // testMQTTCheckPubMsg(t, mc, r, "foo/", 0, []byte("from"))
+ //
+ // natsPub(t, nc, "foo", []byte("NATS"))
+ // testMQTTCheckPubMsg(t, mc, r, "foo", 0, []byte("NATS"))
+
+ // Cluster setup: 2 servers with MQTT + JetStream
+ var srv1Host = "127.0.0.1";
+ var srv1Port = 1883;
+
+ using var mc = MqttConnect(srv1Host, srv1Port, cleanSess: true);
+ CheckConnAckAccepted(mc.Stream);
+
+ // Subscribe to foo/# on server 1
+ MqttSubscribe(mc.Stream, 1, [("foo/#", 0)]);
+ MqttFlush(mc.Stream);
+
+ // Publish on NATS subjects from server 2; MQTT subscriber should receive
+ // foo.bar → foo/bar, foo./ → foo/, foo → foo
+ // NOTE: This requires a NATS client connected to S2 to publish.
+ // When running against a real cluster, the NATS publish would be done
+ // via natsConnect(s2.ClientURL()) / natsPub().
+
+ // Verify received messages match expected MQTT topics
+ // CheckPubMsg(mc.Stream, "foo/bar", 0, "hello"u8.ToArray());
+ // CheckPubMsg(mc.Stream, "foo/", 0, "from"u8.ToArray());
+ // CheckPubMsg(mc.Stream, "foo", 0, "NATS"u8.ToArray());
+
+ SendDisconnect(mc.Stream);
+ }
+
+ ///
+ /// Ported from Go TestMQTTCluster (mqtt_test.go:3044).
+ /// Tests cross-node pub/sub and session takeover across a 2-server cluster.
+ /// Runs subtests: {first_start, restart} × {qos_0, qos_1}.
+ /// After first_start, the cluster is restarted and the tests are repeated.
+ /// Verifies that:
+ /// - NATS-published messages reach MQTT subscribers across nodes
+ /// - MQTT-published messages (QoS 0 and 1) reach subscribers
+ /// - Session takeover from another node disconnects the first client
+ /// - QoS downgrade to subscriber QoS level works correctly
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2216
+ public void MQTTCluster_ShouldSucceed()
+ {
+ // 2-server cluster with nested subtests.
+ //
+ // Go test structure:
+ // for _, topTest := range [{first_start, true}, {restart, false}] {
+ // for _, test := range [{qos_0, 0}, {qos_1, 1}] {
+ // clientID := nuid.Next()
+ // mc, r := testMQTTConnectRetry(cleanSess: false)
+ // testMQTTSub(1, mc, r, "foo/#", test.subQos)
+ //
+ // check := func(mc, r, o, s) {
+ // // NATS pub → MQTT sub
+ // natsPub(nc, "foo.bar", "fromNats")
+ // testMQTTCheckPubMsg(mc, r, "foo/bar", 0, "fromNats")
+ // // MQTT pub QoS0 → MQTT sub
+ // testMQTTPublish(mpc, pr, 0, false, false, "foo/baz", 0, "mqtt_qos0")
+ // testMQTTCheckPubMsg(mc, r, "foo/baz", 0, "mqtt_qos0")
+ // // MQTT pub QoS1 → MQTT sub (downgraded to sub QoS)
+ // testMQTTPublish(mpc, pr, 1, false, false, "foo/bat", 1, "mqtt_qos1")
+ // expectedQoS := 0 or PubQoS1 depending on test.subQos
+ // testMQTTCheckPubMsg(mc, r, "foo/bat", expectedQoS, "mqtt_qos1")
+ // }
+ // check(mc, r, cl.opts[0], cl.servers[0])
+ // check(mc, r, cl.opts[1], cl.servers[1])
+ //
+ // // Session takeover from other node
+ // mc2, r2 := testMQTTConnect(clientID, cleanSess: false, server 2)
+ // testMQTTCheckConnAck(r2, accepted, sessionPresent: true)
+ // testMQTTExpectDisconnect(mc) // first connection kicked
+ // check(mc2, r2, both servers)
+ //
+ // // Clean up session
+ // testMQTTDisconnect(mc2)
+ // mc2 = testMQTTConnect(clientID, cleanSess: true)
+ // testMQTTDisconnect(mc2)
+ // }
+ // if topTest.restart { cl.stopAll(); cl.restartAll(); waitOnStreams }
+ // }
+
+ var host = "127.0.0.1";
+ var port = 1883;
+
+ foreach (var (topTestName, restart) in new[] { ("first_start", true), ("restart", false) })
+ {
+ foreach (var (testName, subQos) in new[] { ("qos_0", (byte)0), ("qos_1", (byte)1) })
+ {
+ var clientId = Guid.NewGuid().ToString("N");
+
+ using var mc = MqttConnect(host, port, clientId: clientId, cleanSess: false);
+ CheckConnAckAccepted(mc.Stream);
+
+ MqttSubscribe(mc.Stream, 1, [("foo/#", subQos)]);
+ MqttFlush(mc.Stream);
+
+ // Cross-node pub/sub checks would happen here with real cluster
+ // Session takeover from server 2 would disconnect mc
+ // Clean up session state
+
+ SendDisconnect(mc.Stream);
+ }
+ }
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterConnectDisconnectClean (mqtt_test.go:3176).
+ /// Stress test: 100 random connect/disconnect iterations with clean sessions
+ /// across a 3-server cluster. Skipped in Go source (flaky due to timing).
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2217
+ public void MQTTClusterConnectDisconnectClean_ShouldSucceed()
+ {
+ // Go test (skipped in Go source):
+ // t.Skip()
+ // nServers := 3
+ // cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers)
+ // clientID := nuid.Next()
+ // N := 100
+ // for n := 0; n < N; n++ {
+ // testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false)
+ // }
+
+ var clientId = Guid.NewGuid().ToString("N");
+ var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) };
+ var rng = new Random();
+
+ for (var n = 0; n < 100; n++)
+ {
+ var (host, port) = hosts[rng.Next(hosts.Length)];
+ MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false);
+ }
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterConnectDisconnectPersist (mqtt_test.go:3198).
+ /// 20 iterations of persistent session connect/disconnect across a 3-server cluster.
+ /// On each iteration, first cleans sessions on all servers, then performs persistent
+ /// connects verifying session-present flag progression. Skipped in Go (flaky).
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2218
+ public void MQTTClusterConnectDisconnectPersist_ShouldSucceed()
+ {
+ // Go test (skipped in Go source):
+ // t.Skip()
+ // nServers := 3
+ // cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers)
+ // clientID := nuid.Next()
+ // N := 20
+ // for n := 0; n < N; n++ {
+ // // First clean sessions on all servers
+ // for i := 0; i < nServers; i++ {
+ // testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false)
+ // }
+ // testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false)
+ // testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true)
+ // testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true)
+ // testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true)
+ // }
+
+ var clientId = Guid.NewGuid().ToString("N");
+ var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) };
+
+ for (var n = 0; n < 20; n++)
+ {
+ // Clean sessions on all servers
+ foreach (var (host, port) in hosts)
+ {
+ MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false);
+ }
+
+ // Persistent connects: first has no session, subsequent find existing session
+ MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: false);
+ MqttConnectDisconnect(hosts[1].Item1, hosts[1].Item2, clientId, clean: false, expectedSessionPresent: true);
+ MqttConnectDisconnect(hosts[2].Item1, hosts[2].Item2, clientId, clean: false, expectedSessionPresent: true);
+ MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: true);
+ }
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterRetainedMsg (mqtt_test.go:3228).
+ /// Tests retained message behavior across a 2-server cluster:
+ /// - Retained messages published on one server are received on the other
+ /// - New subscribers receive retained messages with retain flag
+ /// - Empty retained message deletes the retained message
+ /// - After deletion, reconnecting subscribers don't receive the retained message
+ /// - Network deletes (cross-node retained message removal) work correctly
+ /// - After network delete + new retained message, reconnecting subs get the new message
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2219
+ public void MQTTClusterRetainedMsg_ShouldSucceed()
+ {
+ // Go test:
+ // cl := createJetStreamClusterWithTemplate(..., "MQTT", 2)
+ // srv1Opts, srv2Opts := cl.opts[0], cl.opts[1]
+ //
+ // -- Subscribe on S1 --
+ // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false, S1)
+ // testMQTTSub(1, mc, rc, "foo/#", qos: 1)
+ //
+ // -- Publish retained from S2 --
+ // mp, rp := testMQTTConnect(cleanSess: true, S2)
+ // testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "retained")
+ // testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "retained")
+ //
+ // -- New sub on S1 gets retained msg with retain flag --
+ // mc2, rc2 := testMQTTConnect(cleanSess: true, S1)
+ // testMQTTSub(1, mc2, rc2, "foo/#", qos: 1)
+ // testMQTTCheckPubMsg(mc2, rc2, "foo/bar", PubQoS1|PubFlagRetain, "retained")
+ //
+ // -- Empty retained deletes from storage but still delivered --
+ // testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "")
+ // testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "")
+ //
+ // -- Reconnect to S2: retained should NOT be delivered --
+ // mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S2)
+ // testMQTTSub(1, mc, rc, "foo/#", qos: 1)
+ // testMQTTExpectNothing(rc)
+ //
+ // -- Reconnect to S1: retained should NOT be delivered --
+ // mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S1)
+ // testMQTTSub(1, mc, rc, "foo/#", qos: 1)
+ // testMQTTExpectNothing(rc)
+ //
+ // -- Network delete test --
+ // Sub on S1 ("sub_one"), sub on S2 ("sub_two"), both subscribe to "bar"
+ // Publish retained "msg1" from S2 → both receive
+ // Publish empty retained → both receive (delete)
+ // Publish retained "msg2" from S2 → both receive
+ // Reconnect "sub_one" on S1 → gets "msg2" with retain flag
+
+ var srv1Host = "127.0.0.1";
+ var srv1Port = 1883;
+ var srv2Host = "127.0.0.1";
+ var srv2Port = 1884;
+
+ // Subscribe on server 1
+ using var mc = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false);
+ CheckConnAckAccepted(mc.Stream);
+ MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]);
+ MqttFlush(mc.Stream);
+
+ // Publish retained from server 2
+ using var mp = MqttConnect(srv2Host, srv2Port, cleanSess: true);
+ CheckConnAckAccepted(mp.Stream);
+
+ MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, "retained"u8.ToArray());
+ CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, "retained"u8.ToArray());
+
+ // New subscriber on S1 gets retained message with retain flag
+ using var mc2 = MqttConnect(srv1Host, srv1Port, cleanSess: true);
+ CheckConnAckAccepted(mc2.Stream);
+ MqttSubscribe(mc2.Stream, 1, [("foo/#", 1)]);
+ CheckPubMsg(mc2.Stream, "foo/bar", PubQoS1 | PubFlagRetain, "retained"u8.ToArray());
+ SendDisconnect(mc2.Stream);
+
+ // Empty retained message deletes it from storage but still delivered
+ MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, []); // empty payload
+ CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, []);
+
+ SendDisconnect(mc.Stream);
+
+ // Reconnect to S2: retained should NOT be delivered (was deleted)
+ using var mc3 = MqttConnect(srv2Host, srv2Port, clientId: "sub", cleanSess: false);
+ CheckConnAck(mc3.Stream, ConnAckAccepted, expectedSessionPresent: true);
+ MqttSubscribe(mc3.Stream, 1, [("foo/#", 1)]);
+ ExpectNothing(mc3.Stream);
+ SendDisconnect(mc3.Stream);
+
+ // Reconnect to S1: retained should NOT be delivered
+ using var mc4 = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false);
+ CheckConnAck(mc4.Stream, ConnAckAccepted, expectedSessionPresent: true);
+ MqttSubscribe(mc4.Stream, 1, [("foo/#", 1)]);
+ ExpectNothing(mc4.Stream);
+ SendDisconnect(mc4.Stream);
+
+ // --- Network delete test ---
+
+ // Subscribe on S1 ("sub_one") and S2 ("sub_two") to "bar"
+ using var mcOne = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false);
+ CheckConnAckAccepted(mcOne.Stream);
+ MqttSubscribe(mcOne.Stream, 1, [("bar", 1)]);
+ MqttFlush(mcOne.Stream);
+
+ using var mcTwo = MqttConnect(srv2Host, srv2Port, clientId: "sub_two", cleanSess: false);
+ CheckConnAckAccepted(mcTwo.Stream);
+ MqttSubscribe(mcTwo.Stream, 1, [("bar", 1)]);
+ MqttFlush(mcTwo.Stream);
+
+ // Publish retained "msg1" from S2
+ MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg1"u8.ToArray());
+ CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg1"u8.ToArray());
+ CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg1"u8.ToArray());
+
+ // Empty retained deletes it (network delete for S1)
+ MqttPublish(mp.Stream, 1, false, true, "bar", 1, []);
+ CheckPubMsg(mcOne.Stream, "bar", PubQoS1, []);
+ CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, []);
+
+ // New retained message
+ MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg2"u8.ToArray());
+ CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg2"u8.ToArray());
+ CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg2"u8.ToArray());
+
+ // Reconnect sub_one on S1: should get msg2 with retain flag
+ SendDisconnect(mcOne.Stream);
+
+ using var mcOneRecon = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false);
+ CheckConnAck(mcOneRecon.Stream, ConnAckAccepted, expectedSessionPresent: true);
+ MqttSubscribe(mcOneRecon.Stream, 1, [("bar", 1)]);
+ CheckPubMsg(mcOneRecon.Stream, "bar", PubQoS1 | PubFlagRetain, "msg2"u8.ToArray());
+
+ SendDisconnect(mcOneRecon.Stream);
+ SendDisconnect(mcTwo.Stream);
+ SendDisconnect(mp.Stream);
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterReplicasCount (mqtt_test.go:3525).
+ /// Parameterized test verifying that MQTT JetStream streams are created with
+ /// the correct replica count based on cluster size:
+ /// size=1 → replicas=1, size=2 → replicas=2, size=3 → replicas=3, size=5 → replicas=3
+ /// Checks $MQTT_msgs, $MQTT_rmsgs, and $MQTT_sess streams.
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2222
+ public void MQTTClusterReplicasCount_ShouldSucceed()
+ {
+ // Go test:
+ // for _, test := range []struct{ size, replicas int }{
+ // {1, 1}, {2, 2}, {3, 3}, {5, 3},
+ // } {
+ // if test.size == 1 { single server } else { cluster of test.size }
+ // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false)
+ // testMQTTSub(1, mc, rc, "foo/#", qos: 1)
+ // testMQTTFlush(mc, nil, rc)
+ // js, _ := nc.JetStream()
+ // for _, sname := range []string{mqttStreamName, mqttRetainedMsgsStreamName, mqttSessStreamName} {
+ // si, _ := js.StreamInfo(sname)
+ // si.Config.Replicas == test.replicas
+ // }
+ // }
+
+ var testCases = new[] { (Size: 1, Replicas: 1), (Size: 2, Replicas: 2), (Size: 3, Replicas: 3), (Size: 5, Replicas: 3) };
+ var mqttStreams = new[] { "$MQTT_msgs", "$MQTT_rmsgs", "$MQTT_sess" };
+
+ foreach (var (size, expectedReplicas) in testCases)
+ {
+ var host = "127.0.0.1";
+ var port = 1883;
+
+ using var mc = MqttConnect(host, port, clientId: "sub", cleanSess: false);
+ CheckConnAckAccepted(mc.Stream);
+ MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]);
+ MqttFlush(mc.Stream);
+
+ // Verify replica counts on $MQTT_msgs, $MQTT_rmsgs, $MQTT_sess
+ // This requires a JetStream API call to check stream info.
+ // nc := natsConnect(s.ClientURL())
+ // js, _ := nc.JetStream()
+ // for each stream, js.StreamInfo(sname).Config.Replicas == expectedReplicas
+
+ SendDisconnect(mc.Stream);
+ }
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterCanCreateSessionWithOnServerDown (mqtt_test.go:3582).
+ /// Tests that MQTT sessions can still be created when one server in a 3-server
+ /// cluster is down. After shutting down one server, waits for quorum and verifies
+ /// a new MQTT connection can still be established on a remaining server.
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2223
+ public void MQTTClusterCanCreateSessionWithOnServerDown_ShouldSucceed()
+ {
+ // Go test:
+ // cl := createJetStreamClusterWithTemplate(..., "MQTT", 3)
+ // o := cl.opts[0]
+ // mc, rc := testMQTTConnectRetry(cleanSess: true, S1)
+ // testMQTTCheckConnAck(rc, accepted, false)
+ // mc.Close()
+ //
+ // // Shutdown server 2
+ // cl.servers[1].Shutdown()
+ // cl.waitOnPeerCount(2)
+ // cl.waitOnLeader()
+ //
+ // // Connect to server 3; should still work with quorum
+ // o = cl.opts[2]
+ // mc, rc = testMQTTConnectRetry(cleanSess: true, S3, retries: 5)
+ // testMQTTCheckConnAck(rc, accepted, false)
+
+ // Initial connection on server 1
+ var host1 = "127.0.0.1";
+ var port1 = 1883;
+ using var mc1 = MqttConnect(host1, port1, cleanSess: true);
+ CheckConnAckAccepted(mc1.Stream);
+
+ // Server 2 would be shut down here (cl.servers[1].Shutdown())
+ // Wait for quorum (cl.waitOnPeerCount(2), cl.waitOnLeader())
+
+ // Connect to server 3: should succeed with 2/3 quorum
+ var host3 = "127.0.0.1";
+ var port3 = 1885;
+ using var mc3 = MqttConnect(host3, port3, cleanSess: true);
+ CheckConnAckAccepted(mc3.Stream);
+ }
+
+ ///
+ /// Ported from Go TestMQTTClusterPlacement (mqtt_test.go:3611).
+ /// Tests that MQTT JetStream assets (streams and consumers) are placed on the
+ /// leaf node cluster (SPOKE) rather than the hub when using a SuperCluster
+ /// topology (3 hub clusters × 2 servers each + 3 SPOKE leaf nodes).
+ /// Verifies that all streams and their consumers have cluster name "SPOKE"
+ /// and all replicas have names prefixed with "SPOKE-".
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2224
+ public void MQTTClusterPlacement_ShouldSucceed()
+ {
+ // Go test:
+ // sc := createJetStreamSuperCluster(t, 3, 2)
+ // c := sc.randomCluster()
+ // lnc := c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafAndMQTT, "SPOKE", 3, 22111)
+ // sc.waitOnPeerCount(9)
+ // sc.waitOnLeader()
+ //
+ // for i := 0; i < 10; i++ {
+ // mc, rc := testMQTTConnectRetry(cleanSess: true, lnc.opts[i%3])
+ // testMQTTCheckConnAck(rc, accepted, false)
+ // }
+ //
+ // nc := natsConnect(lnc.servers[0].ClientURL())
+ // js, _ := nc.JetStream()
+ // for si := range js.StreamsInfo() {
+ // si.Cluster.Name == "SPOKE"
+ // all replicas have "SPOKE-" prefix
+ // for ci := range js.ConsumersInfo(si.Config.Name) {
+ // ci.Cluster.Name == "SPOKE"
+ // all replicas have "SPOKE-" prefix
+ // }
+ // }
+
+ // SuperCluster: 3 hub clusters × 2 servers + 3 SPOKE leaf nodes
+ // Connect 10 MQTT clients to SPOKE leaf nodes
+ var spokeHost = "127.0.0.1";
+ var spokePorts = new[] { 22111, 22112, 22113 };
+
+ for (var i = 0; i < 10; i++)
+ {
+ var port = spokePorts[i % 3];
+ using var mc = MqttConnect(spokeHost, port, cleanSess: true);
+ CheckConnAckAccepted(mc.Stream);
+ }
+
+ // Verify all MQTT streams are on SPOKE cluster
+ // Verify all consumers are on SPOKE cluster
+ // Verify all replica names start with "SPOKE-"
+ // This requires JetStream API calls via NATS client.
+ }
+
+ ///
+ /// Ported from Go TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc (mqtt_test.go:3664).
+ /// Tests MQTT connectivity through a leafnode without JetStream to a cluster
+ /// that has JetStream. Runs 6 sub-cases varying how JetStream domain resolution
+ /// is configured:
+ /// 0: JsAccDefaultDomain="", JS enabled in leaf with extra account
+ /// 1: JsAccDefaultDomain="", JS disabled in leaf
+ /// 2: JsAccDefaultDomain=DOMAIN, JS disabled in leaf
+ /// 3: MQTT.JsDomain=DOMAIN, JS enabled in leaf with extra account
+ /// 4: MQTT.JsDomain=DOMAIN, JS disabled in leaf
+ /// 5: JsAccDefaultDomain=DOMAIN, JS enabled in leaf with extra account
+ /// For each case, subscribes on leafnode, publishes from leafnode and hub,
+ /// verifies cross-topology message delivery.
+ ///
+ [Fact(Skip = "deferred: requires running NATS cluster")] // T:2225
+ public void MQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc_ShouldSucceed()
+ {
+ // Go test:
+ // test := func(t *testing.T, resolution int) {
+ // // 3-server hub cluster
+ // getClusterOpts := func(name, i) { ... hub opts ... }
+ // o1, o2, o3 := getClusterOpts("S1",1), getClusterOpts("S2",2), getClusterOpts("S3",3)
+ // s1, s2, s3 := testMQTTRunServer(oi)
+ // checkClusterFormed(s1, s2, s3)
+ // waitForJetStreamLeader()
+ //
+ // // Leafnode with MQTT but no JS (varies by resolution case)
+ // lno := testMQTTDefaultOptions()
+ // lno.JetStream = false
+ // switch resolution {
+ // case 0: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": ""}
+ // case 1: lno.JsAccDefaultDomain = {"$G": ""}
+ // case 2: lno.JsAccDefaultDomain = {"$G": "DOMAIN"}
+ // case 3: lno.Accounts = [NewAccount("unused")]; lno.MQTT.JsDomain = "DOMAIN"
+ // case 4: lno.MQTT.JsDomain = "DOMAIN"
+ // case 5: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": "DOMAIN"}
+ // }
+ // lno.LeafNode.Remotes = [{ URLs: [o1,o2,o3 LeafNode ports] }]
+ // ln := RunServer(lno)
+ // checkLeafNodeConnected(ln)
+ //
+ // // Subscribe on leafnode
+ // mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: true, lno.MQTT)
+ // testMQTTSub(1, mc, rc, "foo", qos: 1)
+ //
+ // connectAndPublish := func(o *Options) {
+ // mp := testMQTTConnectRetry(cleanSess: true, o.MQTT)
+ // testMQTTPublish(mp, 1, false, false, "foo", 1, "msg")
+ // }
+ //
+ // // Pub from leafnode → sub receives
+ // connectAndPublish(lno)
+ // testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg")
+ //
+ // // Pub from hub server → sub receives
+ // connectAndPublish(o3)
+ // testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg")
+ //
+ // // Subscribe on hub server
+ // mc2, rc2 := testMQTTConnectRetry(clientID: "sub2", cleanSess: true, o2.MQTT)
+ // testMQTTSub(1, mc2, rc2, "foo", qos: 1)
+ //
+ // // Pub from leafnode → hub sub receives
+ // connectAndPublish(lno)
+ // testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg")
+ //
+ // // Pub from hub → hub sub receives
+ // connectAndPublish(o1)
+ // testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg")
+ // }
+ //
+ // 6 subtests with resolution 0-5
+
+ var subCases = new (string Name, int Resolution)[]
+ {
+ ("backwards-compatibility-default-js-enabled-in-leaf", 0),
+ ("backwards-compatibility-default-js-disabled-in-leaf", 1),
+ ("backwards-compatibility-domain-js-disabled-in-leaf", 2),
+ ("mqtt-explicit-js-enabled-in-leaf", 3),
+ ("mqtt-explicit-js-disabled-in-leaf", 4),
+ ("backwards-compatibility-domain-js-enabled-in-leaf", 5),
+ };
+
+ foreach (var (name, resolution) in subCases)
+ {
+ // 3-server hub cluster + leafnode with MQTT
+ // Configuration varies per resolution case
+ var leafHost = "127.0.0.1";
+ var leafPort = 1883;
+ var hubHost = "127.0.0.1";
+ var hubPorts = new[] { 1884, 1885, 1886 };
+
+ // Subscribe on leafnode
+ using var mc = MqttConnect(leafHost, leafPort, clientId: "sub", cleanSess: true);
+ CheckConnAckAccepted(mc.Stream);
+ MqttSubscribe(mc.Stream, 1, [("foo", 1)]);
+ MqttFlush(mc.Stream);
+
+ // Publish from leafnode
+ using var mp1 = MqttConnect(leafHost, leafPort, cleanSess: true);
+ CheckConnAckAccepted(mp1.Stream);
+ MqttPublish(mp1.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray());
+ SendDisconnect(mp1.Stream);
+
+ // Publish from hub server 3
+ using var mp2 = MqttConnect(hubHost, hubPorts[2], cleanSess: true);
+ CheckConnAckAccepted(mp2.Stream);
+ MqttPublish(mp2.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray());
+ SendDisconnect(mp2.Stream);
+
+ // Subscribe on hub server 2
+ using var mc2 = MqttConnect(hubHost, hubPorts[1], clientId: "sub2", cleanSess: true);
+ CheckConnAckAccepted(mc2.Stream);
+ MqttSubscribe(mc2.Stream, 1, [("foo", 1)]);
+ MqttFlush(mc2.Stream);
+
+ // Publish from leafnode → hub sub receives
+ using var mp3 = MqttConnect(leafHost, leafPort, cleanSess: true);
+ CheckConnAckAccepted(mp3.Stream);
+ MqttPublish(mp3.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray());
+ SendDisconnect(mp3.Stream);
+
+ // Publish from hub server 1 → hub sub receives
+ using var mp4 = MqttConnect(hubHost, hubPorts[0], cleanSess: true);
+ CheckConnAckAccepted(mp4.Stream);
+ MqttPublish(mp4.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray());
+ SendDisconnect(mp4.Stream);
+
+ SendDisconnect(mc2.Stream);
+ SendDisconnect(mc.Stream);
+ }
+ }
+}
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs
index a129418..37a107d 100644
--- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs
@@ -8,20 +8,7 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class MqttHandlerTests
{
- [Fact] // T:2225
- public void MQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc_ShouldSucceed()
- {
- var cluster = new JetStreamCluster();
- var streamAssignment = new StreamAssignment { Config = new StreamConfig { Name = "MQTT" } };
-
- cluster.TrackInflightStreamProposal("SYS", streamAssignment, deleted: false);
- cluster.TrackInflightStreamProposal("SYS", streamAssignment, deleted: true);
- cluster.InflightStreams["SYS"]["MQTT"].Deleted.ShouldBeTrue();
-
- cluster.RemoveInflightStreamProposal("SYS", "MQTT");
- cluster.RemoveInflightStreamProposal("SYS", "MQTT");
- cluster.InflightStreams.ContainsKey("SYS").ShouldBeFalse();
- }
+ // T:2225 moved to IntegrationTests/Mqtt/MqttClusterTests.cs
[Fact] // T:2178
public void MQTTTLS_ShouldSucceed()
diff --git a/reports/current.md b/reports/current.md
index 4a255e0..eadfc96 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-03-01 21:15:21 UTC
+Generated: 2026-03-02 00:11:44 UTC
## Modules (12 total)