using NATS.Server.JetStream; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; using NATS.Server.Mqtt; // Retained/Session store tests use MemStore + StreamConfig directly namespace NATS.Server.Mqtt.Tests; /// /// Tests for MQTT JetStream persistence: stream initialization, consumer management, /// QoS 1/2 flow with JetStream backing, session persistence, and retained message persistence. /// Go reference: server/mqtt.go mqttCreateAccountSessionManager, mqttStoreSession, /// mqttHandleRetainedMsg, trackPublish. /// public class MqttJetStreamPersistenceTests { // ----------------------------------------------------------------------- // MqttStreamInitializer // ----------------------------------------------------------------------- [Fact] public void StreamInitializer_creates_all_five_streams() { // Go reference: server/mqtt.go mqttCreateAccountSessionManager creates 5 streams var (streamMgr, _, initializer) = CreateJetStreamInfra(); initializer.IsInitialized.ShouldBeFalse(); initializer.EnsureStreams(); initializer.IsInitialized.ShouldBeTrue(); streamMgr.Exists(MqttProtocolConstants.SessStreamName).ShouldBeTrue(); streamMgr.Exists(MqttProtocolConstants.StreamName).ShouldBeTrue(); streamMgr.Exists(MqttProtocolConstants.RetainedMsgsStreamName).ShouldBeTrue(); streamMgr.Exists(MqttProtocolConstants.QoS2IncomingMsgsStreamName).ShouldBeTrue(); streamMgr.Exists(MqttProtocolConstants.OutStreamName).ShouldBeTrue(); } [Fact] public void StreamInitializer_is_idempotent() { var (streamMgr, _, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); initializer.EnsureStreams(); // should not throw streamMgr.StreamNames.Count.ShouldBe(5); } // ----------------------------------------------------------------------- // MqttConsumerManager — subscription consumers // ----------------------------------------------------------------------- [Fact] public void ConsumerManager_creates_subscription_consumer() { // Go reference: server/mqtt.go mqttProcessSub — creates durable consumer per QoS>0 sub var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); var binding = mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100); binding.ShouldNotBeNull(); binding.Stream.ShouldBe(MqttProtocolConstants.StreamName); binding.FilterSubject.ShouldBe($"{MqttProtocolConstants.StreamSubjectPrefix}sensor.temp"); } [Fact] public void ConsumerManager_removes_subscription_consumer() { var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100); mqttConsumerMgr.GetBinding("client1", "sensor.temp").ShouldNotBeNull(); mqttConsumerMgr.RemoveSubscriptionConsumer("client1", "sensor.temp"); mqttConsumerMgr.GetBinding("client1", "sensor.temp").ShouldBeNull(); } [Fact] public void ConsumerManager_removes_all_consumers_for_client() { // Go reference: clean session disconnect removes all consumers var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100); mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.humidity", qos: 1, maxAckPending: 100); mqttConsumerMgr.GetClientBindings("client1").Count.ShouldBe(2); mqttConsumerMgr.RemoveAllConsumers("client1"); mqttConsumerMgr.GetClientBindings("client1").Count.ShouldBe(0); } // ----------------------------------------------------------------------- // QoS 1 with JetStream // ----------------------------------------------------------------------- [Fact] public async Task QoS1_publish_stores_to_stream() { // Go reference: server/mqtt.go QoS 1 publish stores message in $MQTT_msgs var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); var seq = mqttConsumerMgr.PublishToStream("sensor.temp", "72.5"u8.ToArray()); seq.ShouldBeGreaterThan((ulong)0); // Verify message is in the stream streamMgr.TryGet(MqttProtocolConstants.StreamName, out var handle).ShouldBeTrue(); var msg = await handle.Store.LoadAsync(seq, default); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg.Payload.Span).ShouldBe("72.5"); } [Fact] public async Task QoS1_acknowledge_removes_from_stream() { // Go reference: PUBACK acks the JetStream message var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); var seq = mqttConsumerMgr.PublishToStream("sensor.temp", "72.5"u8.ToArray()); mqttConsumerMgr.AcknowledgeMessage(seq).ShouldBeTrue(); // Message should be removed streamMgr.TryGet(MqttProtocolConstants.StreamName, out var handle).ShouldBeTrue(); var msg = await handle.Store.LoadAsync(seq, default); msg.ShouldBeNull(); } [Fact] public void QoS1_tracker_records_stream_sequence() { // Go reference: server/mqtt.go trackPublish — maps packet ID → stream sequence var tracker = new MqttQoS1Tracker(); var packetId = tracker.Register("sensor/temp", "72.5"u8.ToArray(), streamSequence: 42); tracker.IsPending(packetId).ShouldBeTrue(); var acked = tracker.Acknowledge(packetId); acked.ShouldNotBeNull(); acked.StreamSequence.ShouldBe((ulong)42); } [Fact] public void QoS1_tracker_redelivery_preserves_stream_sequence() { var tracker = new MqttQoS1Tracker(); tracker.Register("sensor/temp", "72.5"u8.ToArray(), streamSequence: 99); var pending = tracker.GetPendingForRedelivery(); pending.Count.ShouldBe(1); pending[0].StreamSequence.ShouldBe((ulong)99); pending[0].DeliveryCount.ShouldBe(2); // incremented } // ----------------------------------------------------------------------- // QoS 2 with JetStream // ----------------------------------------------------------------------- [Fact] public async Task QoS2_incoming_stores_for_dedup() { // Go reference: server/mqtt.go QoS 2 incoming stored in $MQTT_qos2in for dedup var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); var seq = mqttConsumerMgr.StoreQoS2Incoming("client1", 1, "payload"u8.ToArray()); seq.ShouldBeGreaterThan((ulong)0); var msg = await mqttConsumerMgr.LoadQoS2IncomingAsync("client1", 1); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg.Payload.Span).ShouldBe("payload"); } [Fact] public async Task QoS2_incoming_removed_after_pubcomp() { // Go reference: server/mqtt.go QoS 2 state removed on PUBCOMP var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra(); initializer.EnsureStreams(); var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr); mqttConsumerMgr.StoreQoS2Incoming("client1", 1, "payload"u8.ToArray()); var removed = await mqttConsumerMgr.RemoveQoS2IncomingAsync("client1", 1); removed.ShouldBeTrue(); var msg = await mqttConsumerMgr.LoadQoS2IncomingAsync("client1", 1); msg.ShouldBeNull(); } // ----------------------------------------------------------------------- // Session persistence with JetStream backing // ----------------------------------------------------------------------- [Fact] public async Task Session_persists_and_recovers_from_jetstream() { // Go reference: server/mqtt.go mqttStoreSession + mqttLoadSession via JetStream var backingStore = new MemStore(new StreamConfig { Name = MqttProtocolConstants.SessStreamName, Subjects = [$"{MqttProtocolConstants.SessStreamSubjectPrefix}>"], MaxMsgsPer = 1, }); var store1 = new MqttSessionStore(backingStore); await store1.ConnectAsync("client-js", cleanSession: false); store1.AddSubscription("client-js", "topic/a", 1); store1.AddSubscription("client-js", "topic/b", 0); await store1.SaveSessionAsync("client-js"); // Simulate restart with same backing store var store2 = new MqttSessionStore(backingStore); await store2.ConnectAsync("client-js", cleanSession: false); var subs = store2.GetSubscriptions("client-js"); subs.Count.ShouldBe(2); subs["topic/a"].ShouldBe(1); subs["topic/b"].ShouldBe(0); } [Fact] public async Task Clean_session_removes_from_jetstream() { var backingStore = new MemStore(new StreamConfig { Name = MqttProtocolConstants.SessStreamName, Subjects = [$"{MqttProtocolConstants.SessStreamSubjectPrefix}>"], MaxMsgsPer = 1, }); var store = new MqttSessionStore(backingStore); await store.ConnectAsync("client-clean", cleanSession: false); store.AddSubscription("client-clean", "topic/x", 1); await store.SaveSessionAsync("client-clean"); // Clean session connect await store.ConnectAsync("client-clean", cleanSession: true); // Simulate restart — should not find session var store2 = new MqttSessionStore(backingStore); await store2.ConnectAsync("client-clean", cleanSession: false); store2.GetSubscriptions("client-clean").ShouldBeEmpty(); } // ----------------------------------------------------------------------- // Retained messages with JetStream backing // ----------------------------------------------------------------------- [Fact] public async Task Retained_persists_and_recovers_from_jetstream() { // Go reference: server/mqtt.go retained messages stored in $MQTT_rmsgs var backingStore = new MemStore(new StreamConfig { Name = MqttProtocolConstants.RetainedMsgsStreamName, Subjects = [$"{MqttProtocolConstants.RetainedMsgsStreamSubject}>"], MaxMsgsPer = 1, }); var retained1 = new MqttRetainedStore(backingStore); await retained1.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); // Simulate restart — new store backed by same JetStream var retained2 = new MqttRetainedStore(backingStore); var msg = await retained2.GetRetainedAsync("sensors/temp"); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg).ShouldBe("72.5"); } [Fact] public async Task Retained_tombstone_removes_from_jetstream() { // Go reference: empty payload + retain = delete retained var backingStore = new MemStore(new StreamConfig { Name = MqttProtocolConstants.RetainedMsgsStreamName, Subjects = [$"{MqttProtocolConstants.RetainedMsgsStreamSubject}>"], MaxMsgsPer = 1, }); var retained = new MqttRetainedStore(backingStore); await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); await retained.SetRetainedAsync("sensors/temp", ReadOnlyMemory.Empty); // tombstone // Should be gone even from backing store var recovered = new MqttRetainedStore(backingStore); var msg = await recovered.GetRetainedAsync("sensors/temp"); msg.ShouldBeNull(); } // ----------------------------------------------------------------------- // Flow controller — JetStream integration // ----------------------------------------------------------------------- [Fact] public async Task FlowController_IsAtCapacity_when_max_reached() { // Go reference: server/mqtt.go mqttMaxAckPending flow control using var fc = new MqttFlowController(defaultMaxAckPending: 2); // Acquire 2 slots (await fc.TryAcquireAsync("sub1")).ShouldBeTrue(); (await fc.TryAcquireAsync("sub1")).ShouldBeTrue(); // Now at capacity fc.IsAtCapacity("sub1").ShouldBeTrue(); (await fc.TryAcquireAsync("sub1")).ShouldBeFalse(); // Release one fc.Release("sub1"); fc.IsAtCapacity("sub1").ShouldBeFalse(); (await fc.TryAcquireAsync("sub1")).ShouldBeTrue(); } // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- private static (StreamManager StreamMgr, ConsumerManager ConsumerMgr, MqttStreamInitializer Initializer) CreateJetStreamInfra() { var consumerMgr = new ConsumerManager(); var streamMgr = new StreamManager(consumerManager: consumerMgr); consumerMgr.StreamManager = streamMgr; var initializer = new MqttStreamInitializer(streamMgr); return (streamMgr, consumerMgr, initializer); } }