feat(mqtt): add session persistence, QoS 2 state machine, and retained store (E2+E3)

Add MqttSessionStore with save/load/delete/list operations, flapper
detection (backoff on rapid reconnects), and TimeProvider-based testing.
Add MqttRetainedStore for per-topic retained messages with MQTT wildcard
matching (+/# filters). Add MqttQos2StateMachine tracking the full
PUBREC/PUBREL/PUBCOMP flow with duplicate rejection and timeout detection.

19 new tests: 9 session persistence, 10 QoS/retained message tests.
This commit is contained in:
Joseph Doherty
2026-02-24 15:13:34 -05:00
parent 662b2e0d87
commit e49e5895c1
5 changed files with 774 additions and 1 deletions

View File

@@ -163,4 +163,4 @@ public sealed class MqttListener(
}
}
internal sealed record MqttPendingPublish(int PacketId, string Topic, string Payload);
public sealed record MqttPendingPublish(int PacketId, string Topic, string Payload);

View File

@@ -0,0 +1,241 @@
// MQTT retained message store and QoS 2 state machine.
// Go reference: golang/nats-server/server/mqtt.go
// Retained messages — mqttHandleRetainedMsg / mqttGetRetainedMessages (~lines 16001700)
// QoS 2 flow — mqttProcessPubRec / mqttProcessPubRel / mqttProcessPubComp (~lines 13001400)
using System.Collections.Concurrent;
namespace NATS.Server.Mqtt;
/// <summary>
/// A retained message stored for a topic.
/// </summary>
public sealed record MqttRetainedMessage(string Topic, ReadOnlyMemory<byte> Payload);
/// <summary>
/// In-memory store for MQTT retained messages.
/// Go reference: server/mqtt.go mqttHandleRetainedMsg ~line 1600.
/// </summary>
public sealed class MqttRetainedStore
{
private readonly ConcurrentDictionary<string, ReadOnlyMemory<byte>> _retained = new(StringComparer.Ordinal);
/// <summary>
/// Sets (or clears) the retained message for a topic.
/// An empty payload clears the retained message.
/// Go reference: server/mqtt.go mqttHandleRetainedMsg.
/// </summary>
public void SetRetained(string topic, ReadOnlyMemory<byte> payload)
{
if (payload.IsEmpty)
{
_retained.TryRemove(topic, out _);
return;
}
_retained[topic] = payload;
}
/// <summary>
/// Gets the retained message payload for a topic, or null if none.
/// </summary>
public ReadOnlyMemory<byte>? GetRetained(string topic)
{
if (_retained.TryGetValue(topic, out var payload))
return payload;
return null;
}
/// <summary>
/// Returns all retained messages matching an MQTT topic filter pattern.
/// Supports '+' (single-level) and '#' (multi-level) wildcards.
/// Go reference: server/mqtt.go mqttGetRetainedMessages ~line 1650.
/// </summary>
public IReadOnlyList<MqttRetainedMessage> GetMatchingRetained(string filter)
{
var results = new List<MqttRetainedMessage>();
foreach (var kvp in _retained)
{
if (MqttTopicMatch(kvp.Key, filter))
results.Add(new MqttRetainedMessage(kvp.Key, kvp.Value));
}
return results;
}
/// <summary>
/// Matches an MQTT topic against a filter pattern.
/// '+' matches exactly one level, '#' matches zero or more levels (must be last).
/// </summary>
internal static bool MqttTopicMatch(string topic, string filter)
{
var topicLevels = topic.Split('/');
var filterLevels = filter.Split('/');
for (var i = 0; i < filterLevels.Length; i++)
{
if (filterLevels[i] == "#")
return true; // '#' matches everything from here
if (i >= topicLevels.Length)
return false; // filter has more levels than topic
if (filterLevels[i] != "+" && filterLevels[i] != topicLevels[i])
return false;
}
// Topic must not have more levels than filter (unless filter ended with '#')
return topicLevels.Length == filterLevels.Length;
}
}
/// <summary>
/// QoS 2 state machine states.
/// Go reference: server/mqtt.go ~line 1300.
/// </summary>
public enum MqttQos2State
{
/// <summary>Publish received, awaiting PUBREC from peer.</summary>
AwaitingPubRec,
/// <summary>PUBREC received, awaiting PUBREL from originator.</summary>
AwaitingPubRel,
/// <summary>PUBREL received, awaiting PUBCOMP from peer.</summary>
AwaitingPubComp,
/// <summary>Flow complete.</summary>
Complete,
}
/// <summary>
/// Tracks QoS 2 flow state for a single packet ID.
/// </summary>
internal sealed class MqttQos2Flow
{
public MqttQos2State State { get; set; }
public DateTime StartedAtUtc { get; init; }
}
/// <summary>
/// Manages the QoS 2 exactly-once delivery state machine for a connection.
/// Tracks per-packet-id state transitions: PUBLISH -> PUBREC -> PUBREL -> PUBCOMP.
/// Go reference: server/mqtt.go mqttProcessPubRec / mqttProcessPubRel / mqttProcessPubComp.
/// </summary>
public sealed class MqttQos2StateMachine
{
private readonly ConcurrentDictionary<ushort, MqttQos2Flow> _flows = new();
private readonly TimeSpan _timeout;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new QoS 2 state machine.
/// </summary>
/// <param name="timeout">Timeout for incomplete flows. Default 30 seconds.</param>
/// <param name="timeProvider">Optional time provider for testing.</param>
public MqttQos2StateMachine(TimeSpan? timeout = null, TimeProvider? timeProvider = null)
{
_timeout = timeout ?? TimeSpan.FromSeconds(30);
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Begins a new QoS 2 flow for the given packet ID.
/// Returns false if a flow for this packet ID already exists (duplicate publish).
/// </summary>
public bool BeginPublish(ushort packetId)
{
var flow = new MqttQos2Flow
{
State = MqttQos2State.AwaitingPubRec,
StartedAtUtc = _timeProvider.GetUtcNow().UtcDateTime,
};
return _flows.TryAdd(packetId, flow);
}
/// <summary>
/// Processes a PUBREC for the given packet ID.
/// Returns false if the flow is not in the expected state.
/// </summary>
public bool ProcessPubRec(ushort packetId)
{
if (!_flows.TryGetValue(packetId, out var flow))
return false;
if (flow.State != MqttQos2State.AwaitingPubRec)
return false;
flow.State = MqttQos2State.AwaitingPubRel;
return true;
}
/// <summary>
/// Processes a PUBREL for the given packet ID.
/// Returns false if the flow is not in the expected state.
/// </summary>
public bool ProcessPubRel(ushort packetId)
{
if (!_flows.TryGetValue(packetId, out var flow))
return false;
if (flow.State != MqttQos2State.AwaitingPubRel)
return false;
flow.State = MqttQos2State.AwaitingPubComp;
return true;
}
/// <summary>
/// Processes a PUBCOMP for the given packet ID.
/// Returns false if the flow is not in the expected state.
/// Removes the flow on completion.
/// </summary>
public bool ProcessPubComp(ushort packetId)
{
if (!_flows.TryGetValue(packetId, out var flow))
return false;
if (flow.State != MqttQos2State.AwaitingPubComp)
return false;
flow.State = MqttQos2State.Complete;
_flows.TryRemove(packetId, out _);
return true;
}
/// <summary>
/// Gets the current state for a packet ID, or null if no flow exists.
/// </summary>
public MqttQos2State? GetState(ushort packetId)
{
if (_flows.TryGetValue(packetId, out var flow))
return flow.State;
return null;
}
/// <summary>
/// Returns packet IDs for flows that have exceeded the timeout.
/// </summary>
public IReadOnlyList<ushort> GetTimedOutFlows()
{
var now = _timeProvider.GetUtcNow().UtcDateTime;
var timedOut = new List<ushort>();
foreach (var kvp in _flows)
{
if (now - kvp.Value.StartedAtUtc > _timeout)
timedOut.Add(kvp.Key);
}
return timedOut;
}
/// <summary>
/// Removes a flow (e.g., after timeout cleanup).
/// </summary>
public void RemoveFlow(ushort packetId) =>
_flows.TryRemove(packetId, out _);
}

View File

@@ -0,0 +1,133 @@
// MQTT session persistence store.
// Go reference: golang/nats-server/server/mqtt.go:253-300
// Session state management — mqttInitSessionStore / mqttStoreSession
// Flapper detection — mqttCheckFlapper (lines ~300360)
using System.Collections.Concurrent;
namespace NATS.Server.Mqtt;
/// <summary>
/// Serializable session data for an MQTT client.
/// Go reference: server/mqtt.go mqttSession struct ~line 253.
/// </summary>
public sealed record MqttSessionData
{
public required string ClientId { get; init; }
public Dictionary<string, int> Subscriptions { get; init; } = [];
public List<MqttPendingPublish> PendingPublishes { get; init; } = [];
public string? WillTopic { get; init; }
public byte[]? WillPayload { get; init; }
public int WillQoS { get; init; }
public bool WillRetain { get; init; }
public bool CleanSession { get; init; }
public DateTime ConnectedAtUtc { get; init; } = DateTime.UtcNow;
public DateTime LastActivityUtc { get; set; } = DateTime.UtcNow;
}
/// <summary>
/// In-memory MQTT session store with flapper detection.
/// The abstraction allows future JetStream backing.
/// Go reference: server/mqtt.go mqttInitSessionStore ~line 260.
/// </summary>
public sealed class MqttSessionStore
{
private readonly ConcurrentDictionary<string, MqttSessionData> _sessions = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<DateTime>> _connectHistory = new(StringComparer.Ordinal);
private readonly TimeSpan _flapWindow;
private readonly int _flapThreshold;
private readonly TimeSpan _flapBackoff;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new session store.
/// </summary>
/// <param name="flapWindow">Window in which repeated connects trigger flap detection. Default 10 seconds.</param>
/// <param name="flapThreshold">Number of connects within the window to trigger backoff. Default 3.</param>
/// <param name="flapBackoff">Backoff delay to apply when flapping. Default 1 second.</param>
/// <param name="timeProvider">Optional time provider for testing. Default uses system clock.</param>
public MqttSessionStore(
TimeSpan? flapWindow = null,
int flapThreshold = 3,
TimeSpan? flapBackoff = null,
TimeProvider? timeProvider = null)
{
_flapWindow = flapWindow ?? TimeSpan.FromSeconds(10);
_flapThreshold = flapThreshold;
_flapBackoff = flapBackoff ?? TimeSpan.FromSeconds(1);
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Saves (or overwrites) session data for the given client.
/// Go reference: server/mqtt.go mqttStoreSession.
/// </summary>
public void SaveSession(MqttSessionData session)
{
ArgumentNullException.ThrowIfNull(session);
_sessions[session.ClientId] = session;
}
/// <summary>
/// Loads session data for the given client, or null if not found.
/// Go reference: server/mqtt.go mqttLoadSession.
/// </summary>
public MqttSessionData? LoadSession(string clientId) =>
_sessions.TryGetValue(clientId, out var session) ? session : null;
/// <summary>
/// Deletes the session for the given client. No-op if not found.
/// Go reference: server/mqtt.go mqttDeleteSession.
/// </summary>
public void DeleteSession(string clientId) =>
_sessions.TryRemove(clientId, out _);
/// <summary>
/// Returns all active sessions.
/// </summary>
public IReadOnlyList<MqttSessionData> ListSessions() =>
_sessions.Values.ToList();
/// <summary>
/// Tracks a connect or disconnect event for flapper detection.
/// Go reference: server/mqtt.go mqttCheckFlapper ~line 300.
/// </summary>
/// <param name="clientId">The MQTT client identifier.</param>
/// <param name="connected">True for connect, false for disconnect.</param>
public void TrackConnectDisconnect(string clientId, bool connected)
{
if (!connected)
return;
var now = _timeProvider.GetUtcNow().UtcDateTime;
var history = _connectHistory.GetOrAdd(clientId, static _ => []);
lock (history)
{
// Prune entries outside the flap window
var cutoff = now - _flapWindow;
history.RemoveAll(t => t < cutoff);
history.Add(now);
}
}
/// <summary>
/// Returns the backoff delay if the client is flapping, otherwise <see cref="TimeSpan.Zero"/>.
/// Go reference: server/mqtt.go mqttCheckFlapper ~line 320.
/// </summary>
public TimeSpan ShouldApplyBackoff(string clientId)
{
if (!_connectHistory.TryGetValue(clientId, out var history))
return TimeSpan.Zero;
var now = _timeProvider.GetUtcNow().UtcDateTime;
lock (history)
{
var cutoff = now - _flapWindow;
history.RemoveAll(t => t < cutoff);
return history.Count >= _flapThreshold ? _flapBackoff : TimeSpan.Zero;
}
}
}