// Ports retained message behavior from Go reference: // golang/nats-server/server/mqtt_test.go — TestMQTTPublishRetain, TestMQTTRetainFlag, // TestMQTTPersistRetainedMsg, TestMQTTRetainedMsgCleanup, TestMQTTRestoreRetainedMsgs, // TestMQTTDecodeRetainedMessage, TestMQTTRetainedNoMsgBodyCorruption using System.Net; using System.Net.Sockets; using System.Text; using NATS.Server.Mqtt; namespace NATS.Server.Tests.Mqtt; public class MqttRetainedMessageParityTests { // Go ref: TestMQTTPublishRetain server/mqtt_test.go:4407 [Fact] public async Task Retained_message_not_delivered_when_subscriber_connects_after_publish() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var pubStream = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(pubStream, "CONNECT pub-client clean=true"); (await MqttRetainedWire.ReadLineAsync(pubStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(pubStream, "PUB sensors.temp 72"); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-client clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB sensors.temp"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); (await MqttRetainedWire.ReadLineAsync(subStream, 300)).ShouldBeNull(); } // Go ref: TestMQTTPublishRetain — non-retained publish delivers to existing subscriber // server/mqtt_test.go:4407 [Fact] public async Task Non_retained_publish_delivers_to_existing_subscriber() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-retain clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB sensors.temp"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var pubStream = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(pubStream, "CONNECT pub-retain clean=true"); (await MqttRetainedWire.ReadLineAsync(pubStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(pubStream, "PUB sensors.temp 72"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("MSG sensors.temp 72"); } // Go ref: TestMQTTRetainFlag — live messages not flagged as retained [MQTT-3.3.1-9] // server/mqtt_test.go:4495 [Fact] public async Task Live_message_delivered_to_existing_subscriber_is_not_flagged_retained() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-live clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB foo.zero"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var pubStream = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(pubStream, "CONNECT pub-live clean=true"); (await MqttRetainedWire.ReadLineAsync(pubStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(pubStream, "PUB foo.zero flag-not-set"); var msg = await MqttRetainedWire.ReadLineAsync(subStream, 1000); msg.ShouldBe("MSG foo.zero flag-not-set"); } // Go ref: TestMQTTPersistRetainedMsg server/mqtt_test.go:5279 [Fact] public async Task Multiple_publishers_deliver_to_same_subscriber() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-multi clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB data.feed"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); using var pubA = new TcpClient(); await pubA.ConnectAsync(IPAddress.Loopback, listener.Port); var streamA = pubA.GetStream(); await MqttRetainedWire.WriteLineAsync(streamA, "CONNECT pub-a clean=true"); (await MqttRetainedWire.ReadLineAsync(streamA, 1000)).ShouldBe("CONNACK"); using var pubB = new TcpClient(); await pubB.ConnectAsync(IPAddress.Loopback, listener.Port); var streamB = pubB.GetStream(); await MqttRetainedWire.WriteLineAsync(streamB, "CONNECT pub-b clean=true"); (await MqttRetainedWire.ReadLineAsync(streamB, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(streamA, "PUB data.feed alpha"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("MSG data.feed alpha"); await MqttRetainedWire.WriteLineAsync(streamB, "PUB data.feed beta"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("MSG data.feed beta"); } // Go ref: TestMQTTRetainedNoMsgBodyCorruption server/mqtt_test.go:3432 [Fact] public async Task Message_payload_is_not_corrupted_through_broker() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-integrity clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB integrity.test"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var pubStream = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(pubStream, "CONNECT pub-integrity clean=true"); (await MqttRetainedWire.ReadLineAsync(pubStream, 1000)).ShouldBe("CONNACK"); var payload = "hello-world-12345-!@#$%"; await MqttRetainedWire.WriteLineAsync(pubStream, $"PUB integrity.test {payload}"); var msg = await MqttRetainedWire.ReadLineAsync(subStream, 1000); msg.ShouldBe($"MSG integrity.test {payload}"); } // Go ref: TestMQTTRetainedMsgCleanup server/mqtt_test.go:5378 [Fact] public async Task Sequential_publishes_all_deliver() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub = new TcpClient(); await sub.ConnectAsync(IPAddress.Loopback, listener.Port); var subStream = sub.GetStream(); await MqttRetainedWire.WriteLineAsync(subStream, "CONNECT sub-empty clean=true"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(subStream, "SUB cleanup.topic"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var pubStream = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(pubStream, "CONNECT pub-empty clean=true"); (await MqttRetainedWire.ReadLineAsync(pubStream, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(pubStream, "PUB cleanup.topic data"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("MSG cleanup.topic data"); await MqttRetainedWire.WriteLineAsync(pubStream, "PUB cleanup.topic x"); (await MqttRetainedWire.ReadLineAsync(subStream, 1000)).ShouldBe("MSG cleanup.topic x"); } // Go ref: TestMQTTDecodeRetainedMessage server/mqtt_test.go:7760 [Fact] public async Task Multiple_topics_receive_messages_independently() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub1 = new TcpClient(); await sub1.ConnectAsync(IPAddress.Loopback, listener.Port); var s1 = sub1.GetStream(); await MqttRetainedWire.WriteLineAsync(s1, "CONNECT sub-topic1 clean=true"); (await MqttRetainedWire.ReadLineAsync(s1, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(s1, "SUB topic.alpha"); (await MqttRetainedWire.ReadLineAsync(s1, 1000))!.ShouldContain("SUBACK"); using var sub2 = new TcpClient(); await sub2.ConnectAsync(IPAddress.Loopback, listener.Port); var s2 = sub2.GetStream(); await MqttRetainedWire.WriteLineAsync(s2, "CONNECT sub-topic2 clean=true"); (await MqttRetainedWire.ReadLineAsync(s2, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(s2, "SUB topic.beta"); (await MqttRetainedWire.ReadLineAsync(s2, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var ps = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(ps, "CONNECT pub-topics clean=true"); (await MqttRetainedWire.ReadLineAsync(ps, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(ps, "PUB topic.alpha alpha-data"); (await MqttRetainedWire.ReadLineAsync(s1, 1000)).ShouldBe("MSG topic.alpha alpha-data"); await MqttRetainedWire.WriteLineAsync(ps, "PUB topic.beta beta-data"); (await MqttRetainedWire.ReadLineAsync(s2, 1000)).ShouldBe("MSG topic.beta beta-data"); (await MqttRetainedWire.ReadLineAsync(s1, 300)).ShouldBeNull(); (await MqttRetainedWire.ReadLineAsync(s2, 300)).ShouldBeNull(); } // Go ref: TestMQTTRestoreRetainedMsgs server/mqtt_test.go:5408 [Fact] public async Task Subscriber_reconnect_resubscribe_receives_new_messages() { await using var listener = new MqttListener("127.0.0.1", 0); using var cts = new CancellationTokenSource(); await listener.StartAsync(cts.Token); using var sub1 = new TcpClient(); await sub1.ConnectAsync(IPAddress.Loopback, listener.Port); var s1 = sub1.GetStream(); await MqttRetainedWire.WriteLineAsync(s1, "CONNECT sub-reconnect clean=true"); (await MqttRetainedWire.ReadLineAsync(s1, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(s1, "SUB restore.topic"); (await MqttRetainedWire.ReadLineAsync(s1, 1000))!.ShouldContain("SUBACK"); using var pub = new TcpClient(); await pub.ConnectAsync(IPAddress.Loopback, listener.Port); var ps = pub.GetStream(); await MqttRetainedWire.WriteLineAsync(ps, "CONNECT pub-restore clean=true"); (await MqttRetainedWire.ReadLineAsync(ps, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(ps, "PUB restore.topic msg1"); (await MqttRetainedWire.ReadLineAsync(s1, 1000)).ShouldBe("MSG restore.topic msg1"); sub1.Dispose(); using var sub2 = new TcpClient(); await sub2.ConnectAsync(IPAddress.Loopback, listener.Port); var s2 = sub2.GetStream(); await MqttRetainedWire.WriteLineAsync(s2, "CONNECT sub-reconnect clean=true"); (await MqttRetainedWire.ReadLineAsync(s2, 1000)).ShouldBe("CONNACK"); await MqttRetainedWire.WriteLineAsync(s2, "SUB restore.topic"); (await MqttRetainedWire.ReadLineAsync(s2, 1000))!.ShouldContain("SUBACK"); await MqttRetainedWire.WriteLineAsync(ps, "PUB restore.topic msg2"); (await MqttRetainedWire.ReadLineAsync(s2, 1000)).ShouldBe("MSG restore.topic msg2"); } } internal static class MqttRetainedWire { public static async Task WriteLineAsync(NetworkStream stream, string line) { var bytes = Encoding.UTF8.GetBytes(line + "\n"); await stream.WriteAsync(bytes); await stream.FlushAsync(); } public static async Task ReadLineAsync(NetworkStream stream, int timeoutMs) { using var timeout = new CancellationTokenSource(timeoutMs); var bytes = new List(); var one = new byte[1]; try { while (true) { var read = await stream.ReadAsync(one.AsMemory(0, 1), timeout.Token); if (read == 0) return null; if (one[0] == (byte)'\n') break; if (one[0] != (byte)'\r') bytes.Add(one[0]); } } catch (OperationCanceledException) { return null; } return Encoding.UTF8.GetString([.. bytes]); } }