diff --git a/src/NATS.Server/Mqtt/MqttQoS1Tracker.cs b/src/NATS.Server/Mqtt/MqttQoS1Tracker.cs new file mode 100644 index 0000000..c517cb1 --- /dev/null +++ b/src/NATS.Server/Mqtt/MqttQoS1Tracker.cs @@ -0,0 +1,96 @@ +// QoS 1 outgoing message tracker for MQTT. +// Go reference: golang/nats-server/server/mqtt.go +// QoS 1 outbound tracking — mqttProcessPub (~line 1200) + +using System.Collections.Concurrent; + +namespace NATS.Server.Mqtt; + +/// +/// Tracks outgoing QoS 1 messages pending PUBACK from the client. +/// Messages are stored with their packet ID and can be redelivered on reconnect. +/// Go reference: server/mqtt.go — mqttProcessPub (QoS 1 outbound tracking). +/// +public sealed class MqttQoS1Tracker +{ + private readonly ConcurrentDictionary _pending = new(); + private ushort _nextPacketId; + private readonly Lock _lock = new(); + + /// Number of messages pending PUBACK. + public int PendingCount => _pending.Count; + + /// + /// Registers an outgoing QoS 1 message and assigns a packet ID. + /// Returns the assigned packet ID. + /// + public ushort Register(string topic, byte[] payload) + { + var id = GetNextPacketId(); + _pending[id] = new QoS1PendingMessage + { + PacketId = id, + Topic = topic, + Payload = payload, + SentAtUtc = DateTime.UtcNow, + DeliveryCount = 1, + }; + return id; + } + + /// + /// Acknowledges receipt of a PUBACK for the given packet ID. + /// Returns true if the message was found and removed. + /// + public bool Acknowledge(ushort packetId) + { + return _pending.TryRemove(packetId, out _); + } + + /// + /// Returns all pending messages for redelivery (e.g., on reconnect). + /// Increments their delivery count. + /// + public IReadOnlyList GetPendingForRedelivery() + { + var result = new List(); + foreach (var kvp in _pending) + { + var msg = kvp.Value; + msg.DeliveryCount++; + msg.SentAtUtc = DateTime.UtcNow; + result.Add(msg); + } + return result; + } + + /// + /// Checks if a packet ID is pending acknowledgment. + /// + public bool IsPending(ushort packetId) => _pending.ContainsKey(packetId); + + /// Clears all pending messages. + public void Clear() => _pending.Clear(); + + private ushort GetNextPacketId() + { + lock (_lock) + { + _nextPacketId++; + if (_nextPacketId == 0) _nextPacketId = 1; // MQTT spec: packet ID must be non-zero + return _nextPacketId; + } + } +} + +/// +/// A QoS 1 message pending PUBACK from the client. +/// +public sealed class QoS1PendingMessage +{ + public ushort PacketId { get; init; } + public string Topic { get; init; } = string.Empty; + public byte[] Payload { get; init; } = []; + public DateTime SentAtUtc { get; set; } + public int DeliveryCount { get; set; } = 1; +} diff --git a/src/NATS.Server/Mqtt/MqttRetainedStore.cs b/src/NATS.Server/Mqtt/MqttRetainedStore.cs index 6c7b028..e66821c 100644 --- a/src/NATS.Server/Mqtt/MqttRetainedStore.cs +++ b/src/NATS.Server/Mqtt/MqttRetainedStore.cs @@ -310,4 +310,25 @@ public sealed class MqttQos2StateMachine /// public void RemoveFlow(ushort packetId) => _flows.TryRemove(packetId, out _); + + /// + /// Records that a PUBREC was received for the given packet ID. + /// Alias for — transitions AwaitingPubRec → AwaitingPubRel. + /// Returns false if the flow is not in the expected state. + /// + public bool RegisterPubRec(ushort packetId) => ProcessPubRec(packetId); + + /// + /// Records that a PUBREL was sent for the given packet ID. + /// Alias for — transitions AwaitingPubRel → AwaitingPubComp. + /// Returns false if the flow is not in the expected state. + /// + public bool RegisterPubRel(ushort packetId) => ProcessPubRel(packetId); + + /// + /// Completes the QoS 2 flow on receipt of PUBCOMP for the given packet ID. + /// Alias for — transitions AwaitingPubComp → Complete and removes the flow. + /// Returns false if the flow is not in the expected state. + /// + public bool CompletePubComp(ushort packetId) => ProcessPubComp(packetId); } diff --git a/tests/NATS.Server.Tests/Mqtt/MqttQoSTrackingTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttQoSTrackingTests.cs new file mode 100644 index 0000000..0ca8676 --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttQoSTrackingTests.cs @@ -0,0 +1,135 @@ +// Tests for MqttQoS1Tracker (Gap 6.3 — JetStream-backed QoS 1/2 tracking). +// Go reference: golang/nats-server/server/mqtt.go mqttProcessPub (~line 1200). + +using NATS.Server.Mqtt; +using Shouldly; + +namespace NATS.Server.Tests.Mqtt; + +public sealed class MqttQoSTrackingTests +{ + // ── QoS 1 Tracker ──────────────────────────────────────────────────────── + + [Fact] + public void Register_assigns_packet_id() + { + // Go reference: server/mqtt.go mqttProcessPub — assigns non-zero packet ID for QoS 1 + var tracker = new MqttQoS1Tracker(); + + var id = tracker.Register("sensors/temp", [0x01, 0x02]); + + id.ShouldNotBe((ushort)0); + } + + [Fact] + public void Register_increments_packet_id() + { + // Go reference: server/mqtt.go — each outgoing QoS 1 message gets a unique packet ID + var tracker = new MqttQoS1Tracker(); + + var id1 = tracker.Register("sensors/temp", [0x01]); + var id2 = tracker.Register("sensors/humidity", [0x02]); + + id1.ShouldNotBe(id2); + } + + [Fact] + public void Acknowledge_removes_pending() + { + // Go reference: server/mqtt.go mqttProcessPubAck — removes message from pending set + var tracker = new MqttQoS1Tracker(); + var id = tracker.Register("sensors/temp", [0xAB]); + + tracker.PendingCount.ShouldBe(1); + var removed = tracker.Acknowledge(id); + + removed.ShouldBeTrue(); + tracker.PendingCount.ShouldBe(0); + } + + [Fact] + public void Acknowledge_returns_false_for_unknown() + { + // Go reference: server/mqtt.go — PUBACK for unknown packet ID is silently ignored + var tracker = new MqttQoS1Tracker(); + + var result = tracker.Acknowledge(9999); + + result.ShouldBeFalse(); + } + + [Fact] + public void PendingCount_reflects_current_state() + { + // Register 3 messages, acknowledge 1, expect count of 2 + var tracker = new MqttQoS1Tracker(); + var id1 = tracker.Register("a/b", [1]); + tracker.Register("c/d", [2]); + tracker.Register("e/f", [3]); + + tracker.Acknowledge(id1); + + tracker.PendingCount.ShouldBe(2); + } + + [Fact] + public void IsPending_true_for_registered() + { + // Go reference: server/mqtt.go — registered QoS 1 message is in the pending set + var tracker = new MqttQoS1Tracker(); + var id = tracker.Register("topic/x", [0xFF]); + + tracker.IsPending(id).ShouldBeTrue(); + } + + [Fact] + public void IsPending_false_after_acknowledge() + { + // Go reference: server/mqtt.go — message is removed from pending after PUBACK + var tracker = new MqttQoS1Tracker(); + var id = tracker.Register("topic/x", [0xFF]); + tracker.Acknowledge(id); + + tracker.IsPending(id).ShouldBeFalse(); + } + + [Fact] + public void GetPendingForRedelivery_returns_all_pending() + { + // Go reference: server/mqtt.go reconnect path — all unacked messages are redelivered + var tracker = new MqttQoS1Tracker(); + tracker.Register("a", [1]); + tracker.Register("b", [2]); + tracker.Register("c", [3]); + + var pending = tracker.GetPendingForRedelivery(); + + pending.Count.ShouldBe(3); + } + + [Fact] + public void GetPendingForRedelivery_increments_delivery_count() + { + // Go reference: server/mqtt.go reconnect redelivery — DUP flag set, delivery count increments + var tracker = new MqttQoS1Tracker(); + tracker.Register("test/topic", [0xDE, 0xAD]); + + var pending = tracker.GetPendingForRedelivery(); + + pending[0].DeliveryCount.ShouldBe(2); + } + + [Fact] + public void Clear_removes_all_pending() + { + // Go reference: server/mqtt.go session cleanup — all pending messages discarded on clean session + var tracker = new MqttQoS1Tracker(); + tracker.Register("x", [1]); + tracker.Register("y", [2]); + tracker.Register("z", [3]); + + tracker.Clear(); + + tracker.PendingCount.ShouldBe(0); + } +}