// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Ported from MQTT cluster tests in server/mqtt_test.go (lines 3014-3809).
using System.Buffers.Binary;
using System.Net.Sockets;
using System.Text;
using Shouldly;
namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt;
///
/// Integration tests for MQTT behavior across JetStream clusters.
/// Tests cover subscription propagation, session takeover, retained message
/// replication, replica counts, placement, and leafnode integration.
///
/// These tests require a running NATS cluster with MQTT and JetStream enabled.
///
[Trait("Category", "Integration")]
public sealed class MqttClusterTests
{
// -------------------------------------------------------------------------
// Constants mirroring Go mqtt.go
// -------------------------------------------------------------------------
private const byte PacketConnect = 0x10;
private const byte PacketConnAck = 0x20;
private const byte PacketPub = 0x30;
private const byte PacketPubAck = 0x40;
private const byte PacketPubRec = 0x50;
private const byte PacketPubRel = 0x60;
private const byte PacketPubComp = 0x70;
private const byte PacketSub = 0x80;
private const byte PacketSubAck = 0x90;
private const byte PacketUnsub = 0xA0;
private const byte PacketUnsubAck = 0xB0;
private const byte PacketPing = 0xC0;
private const byte PacketPingResp = 0xD0;
private const byte PacketDisconnect = 0xE0;
private const byte PacketMask = 0xF0;
private const byte PacketFlagMask = 0x0F;
private const byte ConnFlagCleanSession = 0x02;
private const byte ConnFlagWillFlag = 0x04;
private const byte ConnFlagWillRetain = 0x20;
private const byte ConnFlagPasswordFlag = 0x40;
private const byte ConnFlagUsernameFlag = 0x80;
private const byte PubFlagRetain = 0x01;
private const byte PubFlagQoS = 0x06;
private const byte PubFlagDup = 0x08;
private const byte PubQoS1 = 0x1 << 1;
private const byte SubscribeFlags = 0x02;
private const byte ConnAckAccepted = 0x00;
private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT");
private const byte MqttProtoLevel = 0x04;
private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10);
// -------------------------------------------------------------------------
// MQTT protocol helpers (duplicated from MqttTests.cs for self-containment)
// -------------------------------------------------------------------------
private sealed class MqttConnection : IDisposable
{
public TcpClient Tcp { get; }
public NetworkStream Stream { get; }
public MqttConnection(TcpClient tcp)
{
Tcp = tcp;
Stream = tcp.GetStream();
}
public void Dispose()
{
Stream.Dispose();
Tcp.Dispose();
}
}
private static MqttConnection MqttConnect(
string host, int port,
string? clientId = null,
bool cleanSess = true,
string? user = null,
string? pass = null,
ushort keepAlive = 0)
{
var tcp = new TcpClient();
tcp.Connect(host, port);
var conn = new MqttConnection(tcp);
var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive);
WriteAll(conn.Stream, proto);
return conn;
}
private static byte[] BuildConnectPacket(
string? clientId,
bool cleanSess,
string? user,
string? pass,
ushort keepAlive)
{
byte flags = 0;
if (cleanSess) flags |= ConnFlagCleanSession;
var cidBytes = Encoding.UTF8.GetBytes(clientId ?? "");
var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null;
var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null;
if (userBytes is not null) flags |= ConnFlagUsernameFlag;
if (passBytes is not null) flags |= ConnFlagPasswordFlag;
var pkLen = 2 + MqttProtoName.Length +
1 + 1 + 2 +
2 + cidBytes.Length;
if (userBytes is not null) pkLen += 2 + userBytes.Length;
if (passBytes is not null) pkLen += 2 + passBytes.Length;
var w = new ByteWriter();
w.WriteByte(PacketConnect);
w.WriteVarInt(pkLen);
w.WriteBytes(MqttProtoName);
w.WriteByte(MqttProtoLevel);
w.WriteByte(flags);
w.WriteUInt16(keepAlive);
w.WriteBytes(cidBytes);
if (userBytes is not null) w.WriteBytes(userBytes);
if (passBytes is not null) w.WriteBytes(passBytes);
return w.ToArray();
}
private static byte[] ReadExact(NetworkStream stream, int n)
{
var buf = new byte[n];
var offset = 0;
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
while (offset < n)
{
var read = stream.Read(buf, offset, n - offset);
if (read == 0) throw new EndOfStreamException("MQTT connection closed");
offset += read;
}
return buf;
}
private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream)
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
var header = new byte[1];
var n = stream.Read(header, 0, 1);
if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header");
var multiplier = 1;
var value = 0;
while (true)
{
var b2 = new byte[1];
if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException();
value += (b2[0] & 0x7F) * multiplier;
if ((b2[0] & 0x80) == 0) break;
multiplier *= 128;
if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int");
}
var payload = value > 0 ? ReadExact(stream, value) : [];
return (header[0], payload);
}
private static void WriteAll(NetworkStream stream, byte[] data)
{
stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds;
stream.Write(data, 0, data.Length);
}
private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent)
{
var (b, payload) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet");
payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes");
var sessionPresent = (payload[0] & 0x01) == 1;
sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch");
payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})");
}
private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false)
=> CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent);
private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain,
string topic, ushort pi, byte[] payload)
{
byte flags = 0;
if (dup) flags |= PubFlagDup;
if (retain) flags |= PubFlagRetain;
flags |= (byte)((qos & 0x03) << 1);
var topicBytes = Encoding.UTF8.GetBytes(topic);
var pkLen = 2 + topicBytes.Length + payload.Length;
if (qos > 0) pkLen += 2;
var w = new ByteWriter();
w.WriteByte((byte)(PacketPub | flags));
w.WriteVarInt(pkLen);
w.WriteBytes(topicBytes);
if (qos > 0) w.WriteUInt16(pi);
w.Write(payload);
WriteAll(stream, w.ToArray());
}
private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain,
string topic, ushort pi, byte[] payload)
{
SendPublish(stream, qos, dup, retain, topic, pi, payload);
switch (qos)
{
case 1:
{
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
rpi.ShouldBe(pi, "PUBACK packet identifier mismatch");
break;
}
case 2:
{
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
rpi.ShouldBe(pi, "PUBREC packet identifier mismatch");
SendPiPacket(stream, PacketPubRel, pi);
var (b2, p2) = ReadPacket(stream);
(b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2));
rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch");
MqttFlush(stream);
break;
}
}
}
private static void MqttFlush(NetworkStream stream)
{
var w = new ByteWriter();
w.WriteByte(PacketPing);
w.WriteByte(0);
WriteAll(stream, w.ToArray());
var (b, p) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP");
p.Length.ShouldBe(0, "PINGRESP payload should be empty");
}
private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi)
{
var w = new ByteWriter();
w.WriteByte(packetType);
w.WriteVarInt(2);
w.WriteUInt16(pi);
WriteAll(stream, w.ToArray());
}
private static byte[] MqttSubscribe(NetworkStream stream, ushort pi,
IEnumerable<(string Filter, byte Qos)> filters)
{
var filterList = filters.ToList();
var pkLen = 2;
foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1;
var w = new ByteWriter();
w.WriteByte((byte)(PacketSub | SubscribeFlags));
w.WriteVarInt(pkLen);
w.WriteUInt16(pi);
foreach (var (f, q) in filterList)
{
w.WriteBytes(Encoding.UTF8.GetBytes(f));
w.WriteByte(q);
}
WriteAll(stream, w.ToArray());
var (b, payload) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK");
var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
rpi.ShouldBe(pi, "SUBACK packet identifier mismatch");
return payload[2..];
}
private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish(
NetworkStream stream)
{
var (b, raw) = ReadPacket(stream);
(b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet");
var flags = (byte)(b & PacketFlagMask);
var qos = (byte)((flags & PubFlagQoS) >> 1);
var pos = 0;
var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2;
var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen;
ushort pi = 0;
if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; }
var payload = raw[pos..];
return (flags, pi, topic, payload);
}
private static void ExpectNothing(NetworkStream stream)
{
stream.ReadTimeout = 150;
var buf = new byte[128];
try
{
var n = stream.Read(buf, 0, buf.Length);
n.ShouldBe(0, "Expected no data from MQTT server");
}
catch (IOException ex) when (ex.InnerException is SocketException se &&
se.SocketErrorCode == SocketError.TimedOut)
{
// expected
}
catch (IOException) { }
finally
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
}
}
private static void ExpectDisconnect(NetworkStream stream)
{
stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
var buf = new byte[64];
try
{
var n = stream.Read(buf, 0, buf.Length);
n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes");
}
catch (EndOfStreamException) { }
catch (IOException) { }
}
private static void SendDisconnect(NetworkStream stream)
{
var w = new ByteWriter();
w.WriteByte(PacketDisconnect);
w.WriteByte(0);
WriteAll(stream, w.ToArray());
}
private sealed class ByteWriter
{
private readonly List _buf = [];
public void WriteByte(byte b) => _buf.Add(b);
public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); }
public void WriteUInt16(ushort v)
{
_buf.Add((byte)(v >> 8));
_buf.Add((byte)v);
}
public void WriteBytes(byte[] data)
{
WriteUInt16((ushort)data.Length);
Write(data);
}
public void WriteVarInt(int v)
{
while (true)
{
var b = (byte)(v & 0x7F);
v >>= 7;
if (v > 0) b |= 0x80;
_buf.Add(b);
if (v == 0) break;
}
}
public byte[] ToArray() => [.. _buf];
}
// -------------------------------------------------------------------------
// Helpers used by cluster tests
// -------------------------------------------------------------------------
///
/// Checks a received PUBLISH message against expected topic, flags, and payload.
/// Mirrors Go's testMQTTCheckPubMsg.
///
private static void CheckPubMsg(NetworkStream stream, string expectedTopic,
byte expectedFlags, byte[] expectedPayload)
{
var (flags, pi, topic, payload) = ReadPublish(stream);
topic.ShouldBe(expectedTopic, "PUBLISH topic mismatch");
flags.ShouldBe(expectedFlags, $"PUBLISH flags mismatch (got 0x{flags:X2}, expected 0x{expectedFlags:X2})");
payload.ShouldBe(expectedPayload, "PUBLISH payload mismatch");
// ACK QoS 1 messages
if (((flags & PubFlagQoS) >> 1) == 1)
{
SendPiPacket(stream, PacketPubAck, pi);
}
}
///
/// Connects, verifies CONNACK, then disconnects. Mirrors Go's testMQTTConnectDisconnect.
///
private static void MqttConnectDisconnect(string host, int port,
string clientId, bool clean, bool expectedSessionPresent)
{
using var conn = MqttConnect(host, port, clientId: clientId, cleanSess: clean);
CheckConnAck(conn.Stream, ConnAckAccepted, expectedSessionPresent);
SendDisconnect(conn.Stream);
}
// -------------------------------------------------------------------------
// Tests — 8 MQTT cluster tests from server/mqtt_test.go
// -------------------------------------------------------------------------
///
/// Ported from Go TestMQTTSubPropagation (mqtt_test.go:3014).
/// Verifies that MQTT subscriptions on one cluster node cause interest
/// propagation to other nodes. Specifically checks that foo/# matches
/// foo.bar, foo./, and foo on the NATS side.
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2215
public void MQTTSubPropagation_ShouldSucceed()
{
// 2-server cluster; subscribe on S1 with foo/#, publish from NATS on S2.
// Verify propagation covers foo.bar, foo./, and foo.
//
// Go test:
// cl := createJetStreamClusterWithTemplate(t, ..., "MQTT", 2)
// mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
// testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo/#", qos: 0}}, []byte{0})
// testMQTTFlush(t, mc, nil, r)
// checkSubInterest(t, s2, globalAccountName, "foo", time.Second)
//
// natsPub(t, nc, "foo.bar", []byte("hello"))
// testMQTTCheckPubMsg(t, mc, r, "foo/bar", 0, []byte("hello"))
//
// natsPub(t, nc, "foo./", []byte("from"))
// testMQTTCheckPubMsg(t, mc, r, "foo/", 0, []byte("from"))
//
// natsPub(t, nc, "foo", []byte("NATS"))
// testMQTTCheckPubMsg(t, mc, r, "foo", 0, []byte("NATS"))
// Cluster setup: 2 servers with MQTT + JetStream
var srv1Host = "127.0.0.1";
var srv1Port = 1883;
using var mc = MqttConnect(srv1Host, srv1Port, cleanSess: true);
CheckConnAckAccepted(mc.Stream);
// Subscribe to foo/# on server 1
MqttSubscribe(mc.Stream, 1, [("foo/#", 0)]);
MqttFlush(mc.Stream);
// Publish on NATS subjects from server 2; MQTT subscriber should receive
// foo.bar → foo/bar, foo./ → foo/, foo → foo
// NOTE: This requires a NATS client connected to S2 to publish.
// When running against a real cluster, the NATS publish would be done
// via natsConnect(s2.ClientURL()) / natsPub().
// Verify received messages match expected MQTT topics
// CheckPubMsg(mc.Stream, "foo/bar", 0, "hello"u8.ToArray());
// CheckPubMsg(mc.Stream, "foo/", 0, "from"u8.ToArray());
// CheckPubMsg(mc.Stream, "foo", 0, "NATS"u8.ToArray());
SendDisconnect(mc.Stream);
}
///
/// Ported from Go TestMQTTCluster (mqtt_test.go:3044).
/// Tests cross-node pub/sub and session takeover across a 2-server cluster.
/// Runs subtests: {first_start, restart} × {qos_0, qos_1}.
/// After first_start, the cluster is restarted and the tests are repeated.
/// Verifies that:
/// - NATS-published messages reach MQTT subscribers across nodes
/// - MQTT-published messages (QoS 0 and 1) reach subscribers
/// - Session takeover from another node disconnects the first client
/// - QoS downgrade to subscriber QoS level works correctly
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2216
public void MQTTCluster_ShouldSucceed()
{
// 2-server cluster with nested subtests.
//
// Go test structure:
// for _, topTest := range [{first_start, true}, {restart, false}] {
// for _, test := range [{qos_0, 0}, {qos_1, 1}] {
// clientID := nuid.Next()
// mc, r := testMQTTConnectRetry(cleanSess: false)
// testMQTTSub(1, mc, r, "foo/#", test.subQos)
//
// check := func(mc, r, o, s) {
// // NATS pub → MQTT sub
// natsPub(nc, "foo.bar", "fromNats")
// testMQTTCheckPubMsg(mc, r, "foo/bar", 0, "fromNats")
// // MQTT pub QoS0 → MQTT sub
// testMQTTPublish(mpc, pr, 0, false, false, "foo/baz", 0, "mqtt_qos0")
// testMQTTCheckPubMsg(mc, r, "foo/baz", 0, "mqtt_qos0")
// // MQTT pub QoS1 → MQTT sub (downgraded to sub QoS)
// testMQTTPublish(mpc, pr, 1, false, false, "foo/bat", 1, "mqtt_qos1")
// expectedQoS := 0 or PubQoS1 depending on test.subQos
// testMQTTCheckPubMsg(mc, r, "foo/bat", expectedQoS, "mqtt_qos1")
// }
// check(mc, r, cl.opts[0], cl.servers[0])
// check(mc, r, cl.opts[1], cl.servers[1])
//
// // Session takeover from other node
// mc2, r2 := testMQTTConnect(clientID, cleanSess: false, server 2)
// testMQTTCheckConnAck(r2, accepted, sessionPresent: true)
// testMQTTExpectDisconnect(mc) // first connection kicked
// check(mc2, r2, both servers)
//
// // Clean up session
// testMQTTDisconnect(mc2)
// mc2 = testMQTTConnect(clientID, cleanSess: true)
// testMQTTDisconnect(mc2)
// }
// if topTest.restart { cl.stopAll(); cl.restartAll(); waitOnStreams }
// }
var host = "127.0.0.1";
var port = 1883;
foreach (var (topTestName, restart) in new[] { ("first_start", true), ("restart", false) })
{
foreach (var (testName, subQos) in new[] { ("qos_0", (byte)0), ("qos_1", (byte)1) })
{
var clientId = Guid.NewGuid().ToString("N");
using var mc = MqttConnect(host, port, clientId: clientId, cleanSess: false);
CheckConnAckAccepted(mc.Stream);
MqttSubscribe(mc.Stream, 1, [("foo/#", subQos)]);
MqttFlush(mc.Stream);
// Cross-node pub/sub checks would happen here with real cluster
// Session takeover from server 2 would disconnect mc
// Clean up session state
SendDisconnect(mc.Stream);
}
}
}
///
/// Ported from Go TestMQTTClusterConnectDisconnectClean (mqtt_test.go:3176).
/// Stress test: 100 random connect/disconnect iterations with clean sessions
/// across a 3-server cluster. Skipped in Go source (flaky due to timing).
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2217
public void MQTTClusterConnectDisconnectClean_ShouldSucceed()
{
// Go test (skipped in Go source):
// t.Skip()
// nServers := 3
// cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers)
// clientID := nuid.Next()
// N := 100
// for n := 0; n < N; n++ {
// testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false)
// }
var clientId = Guid.NewGuid().ToString("N");
var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) };
var rng = new Random();
for (var n = 0; n < 100; n++)
{
var (host, port) = hosts[rng.Next(hosts.Length)];
MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false);
}
}
///
/// Ported from Go TestMQTTClusterConnectDisconnectPersist (mqtt_test.go:3198).
/// 20 iterations of persistent session connect/disconnect across a 3-server cluster.
/// On each iteration, first cleans sessions on all servers, then performs persistent
/// connects verifying session-present flag progression. Skipped in Go (flaky).
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2218
public void MQTTClusterConnectDisconnectPersist_ShouldSucceed()
{
// Go test (skipped in Go source):
// t.Skip()
// nServers := 3
// cl := createJetStreamClusterWithTemplate(..., "MQTT", nServers)
// clientID := nuid.Next()
// N := 20
// for n := 0; n < N; n++ {
// // First clean sessions on all servers
// for i := 0; i < nServers; i++ {
// testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false)
// }
// testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false)
// testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true)
// testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true)
// testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true)
// }
var clientId = Guid.NewGuid().ToString("N");
var hosts = new[] { ("127.0.0.1", 1883), ("127.0.0.1", 1884), ("127.0.0.1", 1885) };
for (var n = 0; n < 20; n++)
{
// Clean sessions on all servers
foreach (var (host, port) in hosts)
{
MqttConnectDisconnect(host, port, clientId, clean: true, expectedSessionPresent: false);
}
// Persistent connects: first has no session, subsequent find existing session
MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: false);
MqttConnectDisconnect(hosts[1].Item1, hosts[1].Item2, clientId, clean: false, expectedSessionPresent: true);
MqttConnectDisconnect(hosts[2].Item1, hosts[2].Item2, clientId, clean: false, expectedSessionPresent: true);
MqttConnectDisconnect(hosts[0].Item1, hosts[0].Item2, clientId, clean: false, expectedSessionPresent: true);
}
}
///
/// Ported from Go TestMQTTClusterRetainedMsg (mqtt_test.go:3228).
/// Tests retained message behavior across a 2-server cluster:
/// - Retained messages published on one server are received on the other
/// - New subscribers receive retained messages with retain flag
/// - Empty retained message deletes the retained message
/// - After deletion, reconnecting subscribers don't receive the retained message
/// - Network deletes (cross-node retained message removal) work correctly
/// - After network delete + new retained message, reconnecting subs get the new message
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2219
public void MQTTClusterRetainedMsg_ShouldSucceed()
{
// Go test:
// cl := createJetStreamClusterWithTemplate(..., "MQTT", 2)
// srv1Opts, srv2Opts := cl.opts[0], cl.opts[1]
//
// -- Subscribe on S1 --
// mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false, S1)
// testMQTTSub(1, mc, rc, "foo/#", qos: 1)
//
// -- Publish retained from S2 --
// mp, rp := testMQTTConnect(cleanSess: true, S2)
// testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "retained")
// testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "retained")
//
// -- New sub on S1 gets retained msg with retain flag --
// mc2, rc2 := testMQTTConnect(cleanSess: true, S1)
// testMQTTSub(1, mc2, rc2, "foo/#", qos: 1)
// testMQTTCheckPubMsg(mc2, rc2, "foo/bar", PubQoS1|PubFlagRetain, "retained")
//
// -- Empty retained deletes from storage but still delivered --
// testMQTTPublish(mp, rp, 1, false, true, "foo/bar", 1, "")
// testMQTTCheckPubMsg(mc, rc, "foo/bar", PubQoS1, "")
//
// -- Reconnect to S2: retained should NOT be delivered --
// mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S2)
// testMQTTSub(1, mc, rc, "foo/#", qos: 1)
// testMQTTExpectNothing(rc)
//
// -- Reconnect to S1: retained should NOT be delivered --
// mc, rc = testMQTTConnect(clientID: "sub", cleanSess: false, S1)
// testMQTTSub(1, mc, rc, "foo/#", qos: 1)
// testMQTTExpectNothing(rc)
//
// -- Network delete test --
// Sub on S1 ("sub_one"), sub on S2 ("sub_two"), both subscribe to "bar"
// Publish retained "msg1" from S2 → both receive
// Publish empty retained → both receive (delete)
// Publish retained "msg2" from S2 → both receive
// Reconnect "sub_one" on S1 → gets "msg2" with retain flag
var srv1Host = "127.0.0.1";
var srv1Port = 1883;
var srv2Host = "127.0.0.1";
var srv2Port = 1884;
// Subscribe on server 1
using var mc = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false);
CheckConnAckAccepted(mc.Stream);
MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]);
MqttFlush(mc.Stream);
// Publish retained from server 2
using var mp = MqttConnect(srv2Host, srv2Port, cleanSess: true);
CheckConnAckAccepted(mp.Stream);
MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, "retained"u8.ToArray());
CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, "retained"u8.ToArray());
// New subscriber on S1 gets retained message with retain flag
using var mc2 = MqttConnect(srv1Host, srv1Port, cleanSess: true);
CheckConnAckAccepted(mc2.Stream);
MqttSubscribe(mc2.Stream, 1, [("foo/#", 1)]);
CheckPubMsg(mc2.Stream, "foo/bar", PubQoS1 | PubFlagRetain, "retained"u8.ToArray());
SendDisconnect(mc2.Stream);
// Empty retained message deletes it from storage but still delivered
MqttPublish(mp.Stream, 1, false, true, "foo/bar", 1, []); // empty payload
CheckPubMsg(mc.Stream, "foo/bar", PubQoS1, []);
SendDisconnect(mc.Stream);
// Reconnect to S2: retained should NOT be delivered (was deleted)
using var mc3 = MqttConnect(srv2Host, srv2Port, clientId: "sub", cleanSess: false);
CheckConnAck(mc3.Stream, ConnAckAccepted, expectedSessionPresent: true);
MqttSubscribe(mc3.Stream, 1, [("foo/#", 1)]);
ExpectNothing(mc3.Stream);
SendDisconnect(mc3.Stream);
// Reconnect to S1: retained should NOT be delivered
using var mc4 = MqttConnect(srv1Host, srv1Port, clientId: "sub", cleanSess: false);
CheckConnAck(mc4.Stream, ConnAckAccepted, expectedSessionPresent: true);
MqttSubscribe(mc4.Stream, 1, [("foo/#", 1)]);
ExpectNothing(mc4.Stream);
SendDisconnect(mc4.Stream);
// --- Network delete test ---
// Subscribe on S1 ("sub_one") and S2 ("sub_two") to "bar"
using var mcOne = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false);
CheckConnAckAccepted(mcOne.Stream);
MqttSubscribe(mcOne.Stream, 1, [("bar", 1)]);
MqttFlush(mcOne.Stream);
using var mcTwo = MqttConnect(srv2Host, srv2Port, clientId: "sub_two", cleanSess: false);
CheckConnAckAccepted(mcTwo.Stream);
MqttSubscribe(mcTwo.Stream, 1, [("bar", 1)]);
MqttFlush(mcTwo.Stream);
// Publish retained "msg1" from S2
MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg1"u8.ToArray());
CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg1"u8.ToArray());
CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg1"u8.ToArray());
// Empty retained deletes it (network delete for S1)
MqttPublish(mp.Stream, 1, false, true, "bar", 1, []);
CheckPubMsg(mcOne.Stream, "bar", PubQoS1, []);
CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, []);
// New retained message
MqttPublish(mp.Stream, 1, false, true, "bar", 1, "msg2"u8.ToArray());
CheckPubMsg(mcOne.Stream, "bar", PubQoS1, "msg2"u8.ToArray());
CheckPubMsg(mcTwo.Stream, "bar", PubQoS1, "msg2"u8.ToArray());
// Reconnect sub_one on S1: should get msg2 with retain flag
SendDisconnect(mcOne.Stream);
using var mcOneRecon = MqttConnect(srv1Host, srv1Port, clientId: "sub_one", cleanSess: false);
CheckConnAck(mcOneRecon.Stream, ConnAckAccepted, expectedSessionPresent: true);
MqttSubscribe(mcOneRecon.Stream, 1, [("bar", 1)]);
CheckPubMsg(mcOneRecon.Stream, "bar", PubQoS1 | PubFlagRetain, "msg2"u8.ToArray());
SendDisconnect(mcOneRecon.Stream);
SendDisconnect(mcTwo.Stream);
SendDisconnect(mp.Stream);
}
///
/// Ported from Go TestMQTTClusterReplicasCount (mqtt_test.go:3525).
/// Parameterized test verifying that MQTT JetStream streams are created with
/// the correct replica count based on cluster size:
/// size=1 → replicas=1, size=2 → replicas=2, size=3 → replicas=3, size=5 → replicas=3
/// Checks $MQTT_msgs, $MQTT_rmsgs, and $MQTT_sess streams.
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2222
public void MQTTClusterReplicasCount_ShouldSucceed()
{
// Go test:
// for _, test := range []struct{ size, replicas int }{
// {1, 1}, {2, 2}, {3, 3}, {5, 3},
// } {
// if test.size == 1 { single server } else { cluster of test.size }
// mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: false)
// testMQTTSub(1, mc, rc, "foo/#", qos: 1)
// testMQTTFlush(mc, nil, rc)
// js, _ := nc.JetStream()
// for _, sname := range []string{mqttStreamName, mqttRetainedMsgsStreamName, mqttSessStreamName} {
// si, _ := js.StreamInfo(sname)
// si.Config.Replicas == test.replicas
// }
// }
var testCases = new[] { (Size: 1, Replicas: 1), (Size: 2, Replicas: 2), (Size: 3, Replicas: 3), (Size: 5, Replicas: 3) };
var mqttStreams = new[] { "$MQTT_msgs", "$MQTT_rmsgs", "$MQTT_sess" };
foreach (var (size, expectedReplicas) in testCases)
{
var host = "127.0.0.1";
var port = 1883;
using var mc = MqttConnect(host, port, clientId: "sub", cleanSess: false);
CheckConnAckAccepted(mc.Stream);
MqttSubscribe(mc.Stream, 1, [("foo/#", 1)]);
MqttFlush(mc.Stream);
// Verify replica counts on $MQTT_msgs, $MQTT_rmsgs, $MQTT_sess
// This requires a JetStream API call to check stream info.
// nc := natsConnect(s.ClientURL())
// js, _ := nc.JetStream()
// for each stream, js.StreamInfo(sname).Config.Replicas == expectedReplicas
SendDisconnect(mc.Stream);
}
}
///
/// Ported from Go TestMQTTClusterCanCreateSessionWithOnServerDown (mqtt_test.go:3582).
/// Tests that MQTT sessions can still be created when one server in a 3-server
/// cluster is down. After shutting down one server, waits for quorum and verifies
/// a new MQTT connection can still be established on a remaining server.
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2223
public void MQTTClusterCanCreateSessionWithOnServerDown_ShouldSucceed()
{
// Go test:
// cl := createJetStreamClusterWithTemplate(..., "MQTT", 3)
// o := cl.opts[0]
// mc, rc := testMQTTConnectRetry(cleanSess: true, S1)
// testMQTTCheckConnAck(rc, accepted, false)
// mc.Close()
//
// // Shutdown server 2
// cl.servers[1].Shutdown()
// cl.waitOnPeerCount(2)
// cl.waitOnLeader()
//
// // Connect to server 3; should still work with quorum
// o = cl.opts[2]
// mc, rc = testMQTTConnectRetry(cleanSess: true, S3, retries: 5)
// testMQTTCheckConnAck(rc, accepted, false)
// Initial connection on server 1
var host1 = "127.0.0.1";
var port1 = 1883;
using var mc1 = MqttConnect(host1, port1, cleanSess: true);
CheckConnAckAccepted(mc1.Stream);
// Server 2 would be shut down here (cl.servers[1].Shutdown())
// Wait for quorum (cl.waitOnPeerCount(2), cl.waitOnLeader())
// Connect to server 3: should succeed with 2/3 quorum
var host3 = "127.0.0.1";
var port3 = 1885;
using var mc3 = MqttConnect(host3, port3, cleanSess: true);
CheckConnAckAccepted(mc3.Stream);
}
///
/// Ported from Go TestMQTTClusterPlacement (mqtt_test.go:3611).
/// Tests that MQTT JetStream assets (streams and consumers) are placed on the
/// leaf node cluster (SPOKE) rather than the hub when using a SuperCluster
/// topology (3 hub clusters × 2 servers each + 3 SPOKE leaf nodes).
/// Verifies that all streams and their consumers have cluster name "SPOKE"
/// and all replicas have names prefixed with "SPOKE-".
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2224
public void MQTTClusterPlacement_ShouldSucceed()
{
// Go test:
// sc := createJetStreamSuperCluster(t, 3, 2)
// c := sc.randomCluster()
// lnc := c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafAndMQTT, "SPOKE", 3, 22111)
// sc.waitOnPeerCount(9)
// sc.waitOnLeader()
//
// for i := 0; i < 10; i++ {
// mc, rc := testMQTTConnectRetry(cleanSess: true, lnc.opts[i%3])
// testMQTTCheckConnAck(rc, accepted, false)
// }
//
// nc := natsConnect(lnc.servers[0].ClientURL())
// js, _ := nc.JetStream()
// for si := range js.StreamsInfo() {
// si.Cluster.Name == "SPOKE"
// all replicas have "SPOKE-" prefix
// for ci := range js.ConsumersInfo(si.Config.Name) {
// ci.Cluster.Name == "SPOKE"
// all replicas have "SPOKE-" prefix
// }
// }
// SuperCluster: 3 hub clusters × 2 servers + 3 SPOKE leaf nodes
// Connect 10 MQTT clients to SPOKE leaf nodes
var spokeHost = "127.0.0.1";
var spokePorts = new[] { 22111, 22112, 22113 };
for (var i = 0; i < 10; i++)
{
var port = spokePorts[i % 3];
using var mc = MqttConnect(spokeHost, port, cleanSess: true);
CheckConnAckAccepted(mc.Stream);
}
// Verify all MQTT streams are on SPOKE cluster
// Verify all consumers are on SPOKE cluster
// Verify all replica names start with "SPOKE-"
// This requires JetStream API calls via NATS client.
}
///
/// Ported from Go TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc (mqtt_test.go:3664).
/// Tests MQTT connectivity through a leafnode without JetStream to a cluster
/// that has JetStream. Runs 6 sub-cases varying how JetStream domain resolution
/// is configured:
/// 0: JsAccDefaultDomain="", JS enabled in leaf with extra account
/// 1: JsAccDefaultDomain="", JS disabled in leaf
/// 2: JsAccDefaultDomain=DOMAIN, JS disabled in leaf
/// 3: MQTT.JsDomain=DOMAIN, JS enabled in leaf with extra account
/// 4: MQTT.JsDomain=DOMAIN, JS disabled in leaf
/// 5: JsAccDefaultDomain=DOMAIN, JS enabled in leaf with extra account
/// For each case, subscribes on leafnode, publishes from leafnode and hub,
/// verifies cross-topology message delivery.
///
[Fact(Skip = "deferred: requires running NATS cluster")] // T:2225
public void MQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc_ShouldSucceed()
{
// Go test:
// test := func(t *testing.T, resolution int) {
// // 3-server hub cluster
// getClusterOpts := func(name, i) { ... hub opts ... }
// o1, o2, o3 := getClusterOpts("S1",1), getClusterOpts("S2",2), getClusterOpts("S3",3)
// s1, s2, s3 := testMQTTRunServer(oi)
// checkClusterFormed(s1, s2, s3)
// waitForJetStreamLeader()
//
// // Leafnode with MQTT but no JS (varies by resolution case)
// lno := testMQTTDefaultOptions()
// lno.JetStream = false
// switch resolution {
// case 0: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": ""}
// case 1: lno.JsAccDefaultDomain = {"$G": ""}
// case 2: lno.JsAccDefaultDomain = {"$G": "DOMAIN"}
// case 3: lno.Accounts = [NewAccount("unused")]; lno.MQTT.JsDomain = "DOMAIN"
// case 4: lno.MQTT.JsDomain = "DOMAIN"
// case 5: lno.Accounts = [NewAccount("unused")]; lno.JsAccDefaultDomain = {"$G": "DOMAIN"}
// }
// lno.LeafNode.Remotes = [{ URLs: [o1,o2,o3 LeafNode ports] }]
// ln := RunServer(lno)
// checkLeafNodeConnected(ln)
//
// // Subscribe on leafnode
// mc, rc := testMQTTConnectRetry(clientID: "sub", cleanSess: true, lno.MQTT)
// testMQTTSub(1, mc, rc, "foo", qos: 1)
//
// connectAndPublish := func(o *Options) {
// mp := testMQTTConnectRetry(cleanSess: true, o.MQTT)
// testMQTTPublish(mp, 1, false, false, "foo", 1, "msg")
// }
//
// // Pub from leafnode → sub receives
// connectAndPublish(lno)
// testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg")
//
// // Pub from hub server → sub receives
// connectAndPublish(o3)
// testMQTTCheckPubMsg(mc, rc, "foo", PubQoS1, "msg")
//
// // Subscribe on hub server
// mc2, rc2 := testMQTTConnectRetry(clientID: "sub2", cleanSess: true, o2.MQTT)
// testMQTTSub(1, mc2, rc2, "foo", qos: 1)
//
// // Pub from leafnode → hub sub receives
// connectAndPublish(lno)
// testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg")
//
// // Pub from hub → hub sub receives
// connectAndPublish(o1)
// testMQTTCheckPubMsg(mc2, rc2, "foo", PubQoS1, "msg")
// }
//
// 6 subtests with resolution 0-5
var subCases = new (string Name, int Resolution)[]
{
("backwards-compatibility-default-js-enabled-in-leaf", 0),
("backwards-compatibility-default-js-disabled-in-leaf", 1),
("backwards-compatibility-domain-js-disabled-in-leaf", 2),
("mqtt-explicit-js-enabled-in-leaf", 3),
("mqtt-explicit-js-disabled-in-leaf", 4),
("backwards-compatibility-domain-js-enabled-in-leaf", 5),
};
foreach (var (name, resolution) in subCases)
{
// 3-server hub cluster + leafnode with MQTT
// Configuration varies per resolution case
var leafHost = "127.0.0.1";
var leafPort = 1883;
var hubHost = "127.0.0.1";
var hubPorts = new[] { 1884, 1885, 1886 };
// Subscribe on leafnode
using var mc = MqttConnect(leafHost, leafPort, clientId: "sub", cleanSess: true);
CheckConnAckAccepted(mc.Stream);
MqttSubscribe(mc.Stream, 1, [("foo", 1)]);
MqttFlush(mc.Stream);
// Publish from leafnode
using var mp1 = MqttConnect(leafHost, leafPort, cleanSess: true);
CheckConnAckAccepted(mp1.Stream);
MqttPublish(mp1.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray());
SendDisconnect(mp1.Stream);
// Publish from hub server 3
using var mp2 = MqttConnect(hubHost, hubPorts[2], cleanSess: true);
CheckConnAckAccepted(mp2.Stream);
MqttPublish(mp2.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
CheckPubMsg(mc.Stream, "foo", PubQoS1, "msg"u8.ToArray());
SendDisconnect(mp2.Stream);
// Subscribe on hub server 2
using var mc2 = MqttConnect(hubHost, hubPorts[1], clientId: "sub2", cleanSess: true);
CheckConnAckAccepted(mc2.Stream);
MqttSubscribe(mc2.Stream, 1, [("foo", 1)]);
MqttFlush(mc2.Stream);
// Publish from leafnode → hub sub receives
using var mp3 = MqttConnect(leafHost, leafPort, cleanSess: true);
CheckConnAckAccepted(mp3.Stream);
MqttPublish(mp3.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray());
SendDisconnect(mp3.Stream);
// Publish from hub server 1 → hub sub receives
using var mp4 = MqttConnect(hubHost, hubPorts[0], cleanSess: true);
CheckConnAckAccepted(mp4.Stream);
MqttPublish(mp4.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
CheckPubMsg(mc2.Stream, "foo", PubQoS1, "msg"u8.ToArray());
SendDisconnect(mp4.Stream);
SendDisconnect(mc2.Stream);
SendDisconnect(mc.Stream);
}
}
}