Files
natsdotnet/tests/NATS.Server.Tests/Mqtt/MqttQosTests.cs
Joseph Doherty e49e5895c1 feat(mqtt): add session persistence, QoS 2 state machine, and retained store (E2+E3)
Add MqttSessionStore with save/load/delete/list operations, flapper
detection (backoff on rapid reconnects), and TimeProvider-based testing.
Add MqttRetainedStore for per-topic retained messages with MQTT wildcard
matching (+/# filters). Add MqttQos2StateMachine tracking the full
PUBREC/PUBREL/PUBCOMP flow with duplicate rejection and timeout detection.

19 new tests: 9 session persistence, 10 QoS/retained message tests.
2026-02-24 15:13:34 -05:00

191 lines
6.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// MQTT QoS and retained message tests.
// Go reference: golang/nats-server/server/mqtt.go
// Retained messages — mqttHandleRetainedMsg / mqttGetRetainedMessages (~lines 16001700)
// QoS 2 flow — mqttProcessPubRec / mqttProcessPubRel / mqttProcessPubComp (~lines 13001400)
using System.Text;
using NATS.Server.Mqtt;
namespace NATS.Server.Tests.Mqtt;
public class MqttQosTests
{
[Fact]
public void RetainedStore_SetAndGet_RoundTrips()
{
// Go reference: server/mqtt.go mqttHandleRetainedMsg — store and retrieve
var store = new MqttRetainedStore();
var payload = Encoding.UTF8.GetBytes("temperature=72.5");
store.SetRetained("sensors/temp", payload);
var result = store.GetRetained("sensors/temp");
result.ShouldNotBeNull();
Encoding.UTF8.GetString(result.Value.Span).ShouldBe("temperature=72.5");
}
[Fact]
public void RetainedStore_EmptyPayload_ClearsRetained()
{
// Go reference: server/mqtt.go mqttHandleRetainedMsg — empty payload clears
var store = new MqttRetainedStore();
store.SetRetained("sensors/temp", Encoding.UTF8.GetBytes("old-value"));
store.SetRetained("sensors/temp", ReadOnlyMemory<byte>.Empty);
store.GetRetained("sensors/temp").ShouldBeNull();
}
[Fact]
public void RetainedStore_Overwrite_ReplacesOld()
{
// Go reference: server/mqtt.go mqttHandleRetainedMsg — overwrite replaces
var store = new MqttRetainedStore();
store.SetRetained("sensors/temp", Encoding.UTF8.GetBytes("first"));
store.SetRetained("sensors/temp", Encoding.UTF8.GetBytes("second"));
var result = store.GetRetained("sensors/temp");
result.ShouldNotBeNull();
Encoding.UTF8.GetString(result.Value.Span).ShouldBe("second");
}
[Fact]
public void RetainedStore_GetMatching_WildcardPlus()
{
// Go reference: server/mqtt.go mqttGetRetainedMessages — '+' single-level wildcard
var store = new MqttRetainedStore();
store.SetRetained("sensors/temp", Encoding.UTF8.GetBytes("72.5"));
store.SetRetained("sensors/humidity", Encoding.UTF8.GetBytes("45%"));
store.SetRetained("alerts/fire", Encoding.UTF8.GetBytes("!"));
var matches = store.GetMatchingRetained("sensors/+");
matches.Count.ShouldBe(2);
matches.Select(m => m.Topic).ShouldBe(
new[] { "sensors/temp", "sensors/humidity" },
ignoreOrder: true);
}
[Fact]
public void RetainedStore_GetMatching_WildcardHash()
{
// Go reference: server/mqtt.go mqttGetRetainedMessages — '#' multi-level wildcard
var store = new MqttRetainedStore();
store.SetRetained("home/living/temp", Encoding.UTF8.GetBytes("22"));
store.SetRetained("home/living/light", Encoding.UTF8.GetBytes("on"));
store.SetRetained("home/kitchen/temp", Encoding.UTF8.GetBytes("24"));
store.SetRetained("office/desk/light", Encoding.UTF8.GetBytes("off"));
var matches = store.GetMatchingRetained("home/#");
matches.Count.ShouldBe(3);
matches.Select(m => m.Topic).ShouldBe(
new[] { "home/living/temp", "home/living/light", "home/kitchen/temp" },
ignoreOrder: true);
}
[Fact]
public void Qos2_FullFlow_PubRecPubRelPubComp()
{
// Go reference: server/mqtt.go mqttProcessPubRec / mqttProcessPubRel / mqttProcessPubComp
var sm = new MqttQos2StateMachine();
// Begin publish
sm.BeginPublish(100).ShouldBeTrue();
sm.GetState(100).ShouldBe(MqttQos2State.AwaitingPubRec);
// PUBREC
sm.ProcessPubRec(100).ShouldBeTrue();
sm.GetState(100).ShouldBe(MqttQos2State.AwaitingPubRel);
// PUBREL
sm.ProcessPubRel(100).ShouldBeTrue();
sm.GetState(100).ShouldBe(MqttQos2State.AwaitingPubComp);
// PUBCOMP — completes and removes flow
sm.ProcessPubComp(100).ShouldBeTrue();
sm.GetState(100).ShouldBeNull();
}
[Fact]
public void Qos2_DuplicatePublish_Rejected()
{
// Go reference: server/mqtt.go — duplicate packet ID rejected during active flow
var sm = new MqttQos2StateMachine();
sm.BeginPublish(200).ShouldBeTrue();
// Same packet ID while flow is active — should be rejected
sm.BeginPublish(200).ShouldBeFalse();
}
[Fact]
public void Qos2_IncompleteFlow_TimesOut()
{
// Go reference: server/mqtt.go — incomplete QoS 2 flows time out
var fakeTime = new FakeTimeProvider(new DateTimeOffset(2026, 1, 15, 12, 0, 0, TimeSpan.Zero));
var sm = new MqttQos2StateMachine(timeout: TimeSpan.FromSeconds(5), timeProvider: fakeTime);
sm.BeginPublish(300).ShouldBeTrue();
// Not timed out yet
fakeTime.Advance(TimeSpan.FromSeconds(3));
sm.GetTimedOutFlows().ShouldBeEmpty();
// Advance past timeout
fakeTime.Advance(TimeSpan.FromSeconds(3));
var timedOut = sm.GetTimedOutFlows();
timedOut.Count.ShouldBe(1);
timedOut[0].ShouldBe((ushort)300);
// Clean up
sm.RemoveFlow(300);
sm.GetState(300).ShouldBeNull();
}
[Fact]
public void Qos1_Puback_RemovesPending()
{
// Go reference: server/mqtt.go — QoS 1 PUBACK removes from pending
// This tests the existing MqttListener pending publish / ack mechanism
// in the context of the session store.
var store = new MqttSessionStore();
var session = new MqttSessionData
{
ClientId = "qos1-client",
PendingPublishes =
[
new MqttPendingPublish(1, "topic/a", "payload-a"),
new MqttPendingPublish(2, "topic/b", "payload-b"),
],
};
store.SaveSession(session);
// Simulate PUBACK for packet 1: remove it from pending
var loaded = store.LoadSession("qos1-client");
loaded.ShouldNotBeNull();
loaded.PendingPublishes.RemoveAll(p => p.PacketId == 1);
store.SaveSession(loaded);
// Verify only packet 2 remains
var updated = store.LoadSession("qos1-client");
updated.ShouldNotBeNull();
updated.PendingPublishes.Count.ShouldBe(1);
updated.PendingPublishes[0].PacketId.ShouldBe(2);
}
[Fact]
public void RetainedStore_GetMatching_NoMatch_ReturnsEmpty()
{
// Go reference: server/mqtt.go mqttGetRetainedMessages — no match returns empty
var store = new MqttRetainedStore();
store.SetRetained("sensors/temp", Encoding.UTF8.GetBytes("72"));
var matches = store.GetMatchingRetained("alerts/+");
matches.ShouldBeEmpty();
}
}