using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.Mqtt;
// Retained/Session store tests use MemStore + StreamConfig directly
namespace NATS.Server.Mqtt.Tests;
///
/// Tests for MQTT JetStream persistence: stream initialization, consumer management,
/// QoS 1/2 flow with JetStream backing, session persistence, and retained message persistence.
/// Go reference: server/mqtt.go mqttCreateAccountSessionManager, mqttStoreSession,
/// mqttHandleRetainedMsg, trackPublish.
///
public class MqttJetStreamPersistenceTests
{
// -----------------------------------------------------------------------
// MqttStreamInitializer
// -----------------------------------------------------------------------
[Fact]
public void StreamInitializer_creates_all_five_streams()
{
// Go reference: server/mqtt.go mqttCreateAccountSessionManager creates 5 streams
var (streamMgr, _, initializer) = CreateJetStreamInfra();
initializer.IsInitialized.ShouldBeFalse();
initializer.EnsureStreams();
initializer.IsInitialized.ShouldBeTrue();
streamMgr.Exists(MqttProtocolConstants.SessStreamName).ShouldBeTrue();
streamMgr.Exists(MqttProtocolConstants.StreamName).ShouldBeTrue();
streamMgr.Exists(MqttProtocolConstants.RetainedMsgsStreamName).ShouldBeTrue();
streamMgr.Exists(MqttProtocolConstants.QoS2IncomingMsgsStreamName).ShouldBeTrue();
streamMgr.Exists(MqttProtocolConstants.OutStreamName).ShouldBeTrue();
}
[Fact]
public void StreamInitializer_is_idempotent()
{
var (streamMgr, _, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
initializer.EnsureStreams(); // should not throw
streamMgr.StreamNames.Count.ShouldBe(5);
}
// -----------------------------------------------------------------------
// MqttConsumerManager — subscription consumers
// -----------------------------------------------------------------------
[Fact]
public void ConsumerManager_creates_subscription_consumer()
{
// Go reference: server/mqtt.go mqttProcessSub — creates durable consumer per QoS>0 sub
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
var binding = mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100);
binding.ShouldNotBeNull();
binding.Stream.ShouldBe(MqttProtocolConstants.StreamName);
binding.FilterSubject.ShouldBe($"{MqttProtocolConstants.StreamSubjectPrefix}sensor.temp");
}
[Fact]
public void ConsumerManager_removes_subscription_consumer()
{
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100);
mqttConsumerMgr.GetBinding("client1", "sensor.temp").ShouldNotBeNull();
mqttConsumerMgr.RemoveSubscriptionConsumer("client1", "sensor.temp");
mqttConsumerMgr.GetBinding("client1", "sensor.temp").ShouldBeNull();
}
[Fact]
public void ConsumerManager_removes_all_consumers_for_client()
{
// Go reference: clean session disconnect removes all consumers
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.temp", qos: 1, maxAckPending: 100);
mqttConsumerMgr.CreateSubscriptionConsumer("client1", "sensor.humidity", qos: 1, maxAckPending: 100);
mqttConsumerMgr.GetClientBindings("client1").Count.ShouldBe(2);
mqttConsumerMgr.RemoveAllConsumers("client1");
mqttConsumerMgr.GetClientBindings("client1").Count.ShouldBe(0);
}
// -----------------------------------------------------------------------
// QoS 1 with JetStream
// -----------------------------------------------------------------------
[Fact]
public async Task QoS1_publish_stores_to_stream()
{
// Go reference: server/mqtt.go QoS 1 publish stores message in $MQTT_msgs
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
var seq = mqttConsumerMgr.PublishToStream("sensor.temp", "72.5"u8.ToArray());
seq.ShouldBeGreaterThan((ulong)0);
// Verify message is in the stream
streamMgr.TryGet(MqttProtocolConstants.StreamName, out var handle).ShouldBeTrue();
var msg = await handle.Store.LoadAsync(seq, default);
msg.ShouldNotBeNull();
System.Text.Encoding.UTF8.GetString(msg.Payload.Span).ShouldBe("72.5");
}
[Fact]
public async Task QoS1_acknowledge_removes_from_stream()
{
// Go reference: PUBACK acks the JetStream message
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
var seq = mqttConsumerMgr.PublishToStream("sensor.temp", "72.5"u8.ToArray());
mqttConsumerMgr.AcknowledgeMessage(seq).ShouldBeTrue();
// Message should be removed
streamMgr.TryGet(MqttProtocolConstants.StreamName, out var handle).ShouldBeTrue();
var msg = await handle.Store.LoadAsync(seq, default);
msg.ShouldBeNull();
}
[Fact]
public void QoS1_tracker_records_stream_sequence()
{
// Go reference: server/mqtt.go trackPublish — maps packet ID → stream sequence
var tracker = new MqttQoS1Tracker();
var packetId = tracker.Register("sensor/temp", "72.5"u8.ToArray(), streamSequence: 42);
tracker.IsPending(packetId).ShouldBeTrue();
var acked = tracker.Acknowledge(packetId);
acked.ShouldNotBeNull();
acked.StreamSequence.ShouldBe((ulong)42);
}
[Fact]
public void QoS1_tracker_redelivery_preserves_stream_sequence()
{
var tracker = new MqttQoS1Tracker();
tracker.Register("sensor/temp", "72.5"u8.ToArray(), streamSequence: 99);
var pending = tracker.GetPendingForRedelivery();
pending.Count.ShouldBe(1);
pending[0].StreamSequence.ShouldBe((ulong)99);
pending[0].DeliveryCount.ShouldBe(2); // incremented
}
// -----------------------------------------------------------------------
// QoS 2 with JetStream
// -----------------------------------------------------------------------
[Fact]
public async Task QoS2_incoming_stores_for_dedup()
{
// Go reference: server/mqtt.go QoS 2 incoming stored in $MQTT_qos2in for dedup
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
var seq = mqttConsumerMgr.StoreQoS2Incoming("client1", 1, "payload"u8.ToArray());
seq.ShouldBeGreaterThan((ulong)0);
var msg = await mqttConsumerMgr.LoadQoS2IncomingAsync("client1", 1);
msg.ShouldNotBeNull();
System.Text.Encoding.UTF8.GetString(msg.Payload.Span).ShouldBe("payload");
}
[Fact]
public async Task QoS2_incoming_removed_after_pubcomp()
{
// Go reference: server/mqtt.go QoS 2 state removed on PUBCOMP
var (streamMgr, consumerMgr, initializer) = CreateJetStreamInfra();
initializer.EnsureStreams();
var mqttConsumerMgr = new MqttConsumerManager(streamMgr, consumerMgr);
mqttConsumerMgr.StoreQoS2Incoming("client1", 1, "payload"u8.ToArray());
var removed = await mqttConsumerMgr.RemoveQoS2IncomingAsync("client1", 1);
removed.ShouldBeTrue();
var msg = await mqttConsumerMgr.LoadQoS2IncomingAsync("client1", 1);
msg.ShouldBeNull();
}
// -----------------------------------------------------------------------
// Session persistence with JetStream backing
// -----------------------------------------------------------------------
[Fact]
public async Task Session_persists_and_recovers_from_jetstream()
{
// Go reference: server/mqtt.go mqttStoreSession + mqttLoadSession via JetStream
var backingStore = new MemStore(new StreamConfig
{
Name = MqttProtocolConstants.SessStreamName,
Subjects = [$"{MqttProtocolConstants.SessStreamSubjectPrefix}>"],
MaxMsgsPer = 1,
});
var store1 = new MqttSessionStore(backingStore);
await store1.ConnectAsync("client-js", cleanSession: false);
store1.AddSubscription("client-js", "topic/a", 1);
store1.AddSubscription("client-js", "topic/b", 0);
await store1.SaveSessionAsync("client-js");
// Simulate restart with same backing store
var store2 = new MqttSessionStore(backingStore);
await store2.ConnectAsync("client-js", cleanSession: false);
var subs = store2.GetSubscriptions("client-js");
subs.Count.ShouldBe(2);
subs["topic/a"].ShouldBe(1);
subs["topic/b"].ShouldBe(0);
}
[Fact]
public async Task Clean_session_removes_from_jetstream()
{
var backingStore = new MemStore(new StreamConfig
{
Name = MqttProtocolConstants.SessStreamName,
Subjects = [$"{MqttProtocolConstants.SessStreamSubjectPrefix}>"],
MaxMsgsPer = 1,
});
var store = new MqttSessionStore(backingStore);
await store.ConnectAsync("client-clean", cleanSession: false);
store.AddSubscription("client-clean", "topic/x", 1);
await store.SaveSessionAsync("client-clean");
// Clean session connect
await store.ConnectAsync("client-clean", cleanSession: true);
// Simulate restart — should not find session
var store2 = new MqttSessionStore(backingStore);
await store2.ConnectAsync("client-clean", cleanSession: false);
store2.GetSubscriptions("client-clean").ShouldBeEmpty();
}
// -----------------------------------------------------------------------
// Retained messages with JetStream backing
// -----------------------------------------------------------------------
[Fact]
public async Task Retained_persists_and_recovers_from_jetstream()
{
// Go reference: server/mqtt.go retained messages stored in $MQTT_rmsgs
var backingStore = new MemStore(new StreamConfig
{
Name = MqttProtocolConstants.RetainedMsgsStreamName,
Subjects = [$"{MqttProtocolConstants.RetainedMsgsStreamSubject}>"],
MaxMsgsPer = 1,
});
var retained1 = new MqttRetainedStore(backingStore);
await retained1.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray());
// Simulate restart — new store backed by same JetStream
var retained2 = new MqttRetainedStore(backingStore);
var msg = await retained2.GetRetainedAsync("sensors/temp");
msg.ShouldNotBeNull();
System.Text.Encoding.UTF8.GetString(msg).ShouldBe("72.5");
}
[Fact]
public async Task Retained_tombstone_removes_from_jetstream()
{
// Go reference: empty payload + retain = delete retained
var backingStore = new MemStore(new StreamConfig
{
Name = MqttProtocolConstants.RetainedMsgsStreamName,
Subjects = [$"{MqttProtocolConstants.RetainedMsgsStreamSubject}>"],
MaxMsgsPer = 1,
});
var retained = new MqttRetainedStore(backingStore);
await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray());
await retained.SetRetainedAsync("sensors/temp", ReadOnlyMemory.Empty); // tombstone
// Should be gone even from backing store
var recovered = new MqttRetainedStore(backingStore);
var msg = await recovered.GetRetainedAsync("sensors/temp");
msg.ShouldBeNull();
}
// -----------------------------------------------------------------------
// Flow controller — JetStream integration
// -----------------------------------------------------------------------
[Fact]
public async Task FlowController_IsAtCapacity_when_max_reached()
{
// Go reference: server/mqtt.go mqttMaxAckPending flow control
using var fc = new MqttFlowController(defaultMaxAckPending: 2);
// Acquire 2 slots
(await fc.TryAcquireAsync("sub1")).ShouldBeTrue();
(await fc.TryAcquireAsync("sub1")).ShouldBeTrue();
// Now at capacity
fc.IsAtCapacity("sub1").ShouldBeTrue();
(await fc.TryAcquireAsync("sub1")).ShouldBeFalse();
// Release one
fc.Release("sub1");
fc.IsAtCapacity("sub1").ShouldBeFalse();
(await fc.TryAcquireAsync("sub1")).ShouldBeTrue();
}
// -----------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------
private static (StreamManager StreamMgr, ConsumerManager ConsumerMgr, MqttStreamInitializer Initializer) CreateJetStreamInfra()
{
var consumerMgr = new ConsumerManager();
var streamMgr = new StreamManager(consumerManager: consumerMgr);
consumerMgr.StreamManager = streamMgr;
var initializer = new MqttStreamInitializer(streamMgr);
return (streamMgr, consumerMgr, initializer);
}
}