diff --git a/src/NATS.Server/Mqtt/MqttRetainedStore.cs b/src/NATS.Server/Mqtt/MqttRetainedStore.cs index e66821c..7ae70d4 100644 --- a/src/NATS.Server/Mqtt/MqttRetainedStore.cs +++ b/src/NATS.Server/Mqtt/MqttRetainedStore.cs @@ -89,6 +89,20 @@ public sealed class MqttRetainedStore return results; } + /// + /// Delivers all retained messages matching the given topic filter to the provided callback. + /// The callback receives (topic, payload, qos, retain=true) for each matching message. + /// Returns the number of messages delivered. + /// Go reference: server/mqtt.go mqttGetRetainedMessages / mqttHandleRetainedMsg ~line 1650. + /// + public int DeliverRetainedOnSubscribe(string topicFilter, Action deliver) + { + var matches = GetMatchingRetained(topicFilter); + foreach (var msg in matches) + deliver(msg.Topic, msg.Payload.ToArray(), 0, true); + return matches.Count; + } + /// /// Sets (or clears) the retained message and persists to backing store. /// Go reference: server/mqtt.go mqttHandleRetainedMsg with JetStream. diff --git a/tests/NATS.Server.Tests/Mqtt/MqttRetainedDeliveryTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttRetainedDeliveryTests.cs new file mode 100644 index 0000000..c5df2a4 --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttRetainedDeliveryTests.cs @@ -0,0 +1,163 @@ +// Tests for retained message delivery on MQTT SUBSCRIBE. +// Covers GetMatchingRetained and DeliverRetainedOnSubscribe with MQTT wildcard matching. +// Go reference: server/mqtt.go mqttGetRetainedMessages ~line 1650. + +using System.Text; +using NATS.Server.Mqtt; +using Shouldly; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttRetainedDeliveryTests +{ + // Go ref: server/mqtt.go mqttGetRetainedMessages — exact topic lookup + [Fact] + public void GetMatchingRetained_exact_topic_match() + { + var store = new MqttRetainedStore(); + store.SetRetained("a/b", Encoding.UTF8.GetBytes("hello")); + + var results = store.GetMatchingRetained("a/b"); + + results.Count.ShouldBe(1); + results[0].Topic.ShouldBe("a/b"); + Encoding.UTF8.GetString(results[0].Payload.Span).ShouldBe("hello"); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — '+' single-level wildcard + [Fact] + public void GetMatchingRetained_plus_wildcard() + { + var store = new MqttRetainedStore(); + store.SetRetained("a/b", Encoding.UTF8.GetBytes("payload-b")); + store.SetRetained("a/c", Encoding.UTF8.GetBytes("payload-c")); + + var results = store.GetMatchingRetained("a/+"); + + results.Count.ShouldBe(2); + results.Select(r => r.Topic).ShouldContain("a/b"); + results.Select(r => r.Topic).ShouldContain("a/c"); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — '#' multi-level wildcard + [Fact] + public void GetMatchingRetained_hash_wildcard() + { + var store = new MqttRetainedStore(); + store.SetRetained("a/b/c", Encoding.UTF8.GetBytes("deep")); + + var results = store.GetMatchingRetained("a/#"); + + results.Count.ShouldBe(1); + results[0].Topic.ShouldBe("a/b/c"); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — '#' alone matches all topics + [Fact] + public void GetMatchingRetained_hash_matches_all() + { + var store = new MqttRetainedStore(); + store.SetRetained("x/y", Encoding.UTF8.GetBytes("v1")); + store.SetRetained("a/b", Encoding.UTF8.GetBytes("v2")); + + var results = store.GetMatchingRetained("#"); + + results.Count.ShouldBe(2); + results.Select(r => r.Topic).ShouldContain("x/y"); + results.Select(r => r.Topic).ShouldContain("a/b"); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — no match returns empty list + [Fact] + public void GetMatchingRetained_no_match() + { + var store = new MqttRetainedStore(); + store.SetRetained("a/b", Encoding.UTF8.GetBytes("data")); + + var results = store.GetMatchingRetained("c/d"); + + results.Count.ShouldBe(0); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — callback invoked for each match + [Fact] + public void DeliverRetainedOnSubscribe_calls_deliver_for_each_match() + { + var store = new MqttRetainedStore(); + store.SetRetained("sensor/temp", Encoding.UTF8.GetBytes("25")); + store.SetRetained("sensor/humidity", Encoding.UTF8.GetBytes("60")); + + var deliveredTopics = new List(); + store.DeliverRetainedOnSubscribe("sensor/+", (topic, _, _, _) => deliveredTopics.Add(topic)); + + deliveredTopics.Count.ShouldBe(2); + deliveredTopics.ShouldContain("sensor/temp"); + deliveredTopics.ShouldContain("sensor/humidity"); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — retain flag is always true on delivery + [Fact] + public void DeliverRetainedOnSubscribe_passes_retain_flag_true() + { + var store = new MqttRetainedStore(); + store.SetRetained("home/light", Encoding.UTF8.GetBytes("on")); + + bool? capturedRetain = null; + store.DeliverRetainedOnSubscribe("home/+", (_, _, _, retain) => capturedRetain = retain); + + capturedRetain.ShouldBe(true); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — return value equals number of deliveries + [Fact] + public void DeliverRetainedOnSubscribe_returns_count() + { + var store = new MqttRetainedStore(); + store.SetRetained("dev/a", Encoding.UTF8.GetBytes("1")); + store.SetRetained("dev/b", Encoding.UTF8.GetBytes("2")); + store.SetRetained("dev/c", Encoding.UTF8.GetBytes("3")); + + var count = store.DeliverRetainedOnSubscribe("dev/+", (_, _, _, _) => { }); + + count.ShouldBe(3); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — '+' does NOT match multiple levels + [Fact] + public void GetMatchingRetained_plus_does_not_cross_levels() + { + var store = new MqttRetainedStore(); + store.SetRetained("a/b/c", Encoding.UTF8.GetBytes("deep")); + + // "a/+" matches exactly two levels: "a/". "a/b/c" has three levels. + var results = store.GetMatchingRetained("a/+"); + + results.Count.ShouldBe(0); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — empty store delivers nothing + [Fact] + public void DeliverRetainedOnSubscribe_empty_store_returns_zero() + { + var store = new MqttRetainedStore(); + + var count = store.DeliverRetainedOnSubscribe("#", (_, _, _, _) => { }); + + count.ShouldBe(0); + } + + // Go ref: server/mqtt.go mqttGetRetainedMessages — payload bytes are passed correctly + [Fact] + public void DeliverRetainedOnSubscribe_passes_correct_payload() + { + var store = new MqttRetainedStore(); + var expected = Encoding.UTF8.GetBytes("temperature=42"); + store.SetRetained("env/temp", expected); + + byte[]? capturedPayload = null; + store.DeliverRetainedOnSubscribe("env/+", (_, payload, _, _) => capturedPayload = payload); + + capturedPayload.ShouldNotBeNull(); + capturedPayload.ShouldBe(expected); + } +}