feat: deliver retained messages on MQTT SUBSCRIBE (Gap 6.5)
Add DeliverRetainedOnSubscribe to MqttRetainedStore, which iterates GetMatchingRetained and fires a callback with (topic, payload, qos, retain=true) for each match. Add 11 unit tests covering exact-match, '+' single-level, '#' multi-level, no-match, cross-level rejection, callback arguments, return count, and empty-store behaviour.
This commit is contained in:
@@ -89,6 +89,20 @@ public sealed class MqttRetainedStore
|
||||
return results;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Delivers all retained messages matching the given topic filter to the provided callback.
|
||||
/// The callback receives (topic, payload, qos, retain=true) for each matching message.
|
||||
/// Returns the number of messages delivered.
|
||||
/// Go reference: server/mqtt.go mqttGetRetainedMessages / mqttHandleRetainedMsg ~line 1650.
|
||||
/// </summary>
|
||||
public int DeliverRetainedOnSubscribe(string topicFilter, Action<string, byte[], byte, bool> deliver)
|
||||
{
|
||||
var matches = GetMatchingRetained(topicFilter);
|
||||
foreach (var msg in matches)
|
||||
deliver(msg.Topic, msg.Payload.ToArray(), 0, true);
|
||||
return matches.Count;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets (or clears) the retained message and persists to backing store.
|
||||
/// Go reference: server/mqtt.go mqttHandleRetainedMsg with JetStream.
|
||||
|
||||
163
tests/NATS.Server.Tests/Mqtt/MqttRetainedDeliveryTests.cs
Normal file
163
tests/NATS.Server.Tests/Mqtt/MqttRetainedDeliveryTests.cs
Normal file
@@ -0,0 +1,163 @@
|
||||
// Tests for retained message delivery on MQTT SUBSCRIBE.
|
||||
// Covers GetMatchingRetained and DeliverRetainedOnSubscribe with MQTT wildcard matching.
|
||||
// Go reference: server/mqtt.go mqttGetRetainedMessages ~line 1650.
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.Mqtt;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests.Mqtt;
|
||||
|
||||
public class MqttRetainedDeliveryTests
|
||||
{
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — exact topic lookup
|
||||
[Fact]
|
||||
public void GetMatchingRetained_exact_topic_match()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("a/b", Encoding.UTF8.GetBytes("hello"));
|
||||
|
||||
var results = store.GetMatchingRetained("a/b");
|
||||
|
||||
results.Count.ShouldBe(1);
|
||||
results[0].Topic.ShouldBe("a/b");
|
||||
Encoding.UTF8.GetString(results[0].Payload.Span).ShouldBe("hello");
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — '+' single-level wildcard
|
||||
[Fact]
|
||||
public void GetMatchingRetained_plus_wildcard()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("a/b", Encoding.UTF8.GetBytes("payload-b"));
|
||||
store.SetRetained("a/c", Encoding.UTF8.GetBytes("payload-c"));
|
||||
|
||||
var results = store.GetMatchingRetained("a/+");
|
||||
|
||||
results.Count.ShouldBe(2);
|
||||
results.Select(r => r.Topic).ShouldContain("a/b");
|
||||
results.Select(r => r.Topic).ShouldContain("a/c");
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — '#' multi-level wildcard
|
||||
[Fact]
|
||||
public void GetMatchingRetained_hash_wildcard()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("a/b/c", Encoding.UTF8.GetBytes("deep"));
|
||||
|
||||
var results = store.GetMatchingRetained("a/#");
|
||||
|
||||
results.Count.ShouldBe(1);
|
||||
results[0].Topic.ShouldBe("a/b/c");
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — '#' alone matches all topics
|
||||
[Fact]
|
||||
public void GetMatchingRetained_hash_matches_all()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("x/y", Encoding.UTF8.GetBytes("v1"));
|
||||
store.SetRetained("a/b", Encoding.UTF8.GetBytes("v2"));
|
||||
|
||||
var results = store.GetMatchingRetained("#");
|
||||
|
||||
results.Count.ShouldBe(2);
|
||||
results.Select(r => r.Topic).ShouldContain("x/y");
|
||||
results.Select(r => r.Topic).ShouldContain("a/b");
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — no match returns empty list
|
||||
[Fact]
|
||||
public void GetMatchingRetained_no_match()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("a/b", Encoding.UTF8.GetBytes("data"));
|
||||
|
||||
var results = store.GetMatchingRetained("c/d");
|
||||
|
||||
results.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — callback invoked for each match
|
||||
[Fact]
|
||||
public void DeliverRetainedOnSubscribe_calls_deliver_for_each_match()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("sensor/temp", Encoding.UTF8.GetBytes("25"));
|
||||
store.SetRetained("sensor/humidity", Encoding.UTF8.GetBytes("60"));
|
||||
|
||||
var deliveredTopics = new List<string>();
|
||||
store.DeliverRetainedOnSubscribe("sensor/+", (topic, _, _, _) => deliveredTopics.Add(topic));
|
||||
|
||||
deliveredTopics.Count.ShouldBe(2);
|
||||
deliveredTopics.ShouldContain("sensor/temp");
|
||||
deliveredTopics.ShouldContain("sensor/humidity");
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — retain flag is always true on delivery
|
||||
[Fact]
|
||||
public void DeliverRetainedOnSubscribe_passes_retain_flag_true()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("home/light", Encoding.UTF8.GetBytes("on"));
|
||||
|
||||
bool? capturedRetain = null;
|
||||
store.DeliverRetainedOnSubscribe("home/+", (_, _, _, retain) => capturedRetain = retain);
|
||||
|
||||
capturedRetain.ShouldBe(true);
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — return value equals number of deliveries
|
||||
[Fact]
|
||||
public void DeliverRetainedOnSubscribe_returns_count()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("dev/a", Encoding.UTF8.GetBytes("1"));
|
||||
store.SetRetained("dev/b", Encoding.UTF8.GetBytes("2"));
|
||||
store.SetRetained("dev/c", Encoding.UTF8.GetBytes("3"));
|
||||
|
||||
var count = store.DeliverRetainedOnSubscribe("dev/+", (_, _, _, _) => { });
|
||||
|
||||
count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — '+' does NOT match multiple levels
|
||||
[Fact]
|
||||
public void GetMatchingRetained_plus_does_not_cross_levels()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
store.SetRetained("a/b/c", Encoding.UTF8.GetBytes("deep"));
|
||||
|
||||
// "a/+" matches exactly two levels: "a/<one token>". "a/b/c" has three levels.
|
||||
var results = store.GetMatchingRetained("a/+");
|
||||
|
||||
results.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — empty store delivers nothing
|
||||
[Fact]
|
||||
public void DeliverRetainedOnSubscribe_empty_store_returns_zero()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
|
||||
var count = store.DeliverRetainedOnSubscribe("#", (_, _, _, _) => { });
|
||||
|
||||
count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go ref: server/mqtt.go mqttGetRetainedMessages — payload bytes are passed correctly
|
||||
[Fact]
|
||||
public void DeliverRetainedOnSubscribe_passes_correct_payload()
|
||||
{
|
||||
var store = new MqttRetainedStore();
|
||||
var expected = Encoding.UTF8.GetBytes("temperature=42");
|
||||
store.SetRetained("env/temp", expected);
|
||||
|
||||
byte[]? capturedPayload = null;
|
||||
store.DeliverRetainedOnSubscribe("env/+", (_, payload, _, _) => capturedPayload = payload);
|
||||
|
||||
capturedPayload.ShouldNotBeNull();
|
||||
capturedPayload.ShouldBe(expected);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user