feat: add MQTT will message delivery on abnormal disconnect (Gap 6.2)
Adds WillMessage class, SetWill/ClearWill/GetWill methods to MqttSessionStore, PublishWillMessage that dispatches via OnPublish delegate (or tracks as delayed when DelayIntervalSeconds > 0), and 10 unit tests covering all will message behaviors.
This commit is contained in:
110
src/NATS.Server/Mqtt/MqttFlowController.cs
Normal file
110
src/NATS.Server/Mqtt/MqttFlowController.cs
Normal file
@@ -0,0 +1,110 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace NATS.Server.Mqtt;
|
||||
|
||||
/// <summary>
|
||||
/// Flow controller for MQTT QoS 1/2 messages, enforcing MaxAckPending limits.
|
||||
/// Uses SemaphoreSlim for async-compatible blocking when the limit is reached.
|
||||
/// Go reference: server/mqtt.go — mqttMaxAckPending, flow control logic.
|
||||
/// </summary>
|
||||
public sealed class MqttFlowController : IDisposable
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, SubscriptionFlowState> _subscriptions = new(StringComparer.Ordinal);
|
||||
private int _defaultMaxAckPending;
|
||||
|
||||
public MqttFlowController(int defaultMaxAckPending = 1024)
|
||||
{
|
||||
_defaultMaxAckPending = defaultMaxAckPending;
|
||||
}
|
||||
|
||||
/// <summary>Default MaxAckPending limit for new subscriptions.</summary>
|
||||
public int DefaultMaxAckPending => _defaultMaxAckPending;
|
||||
|
||||
/// <summary>
|
||||
/// Tries to acquire a slot for sending a QoS message on the given subscription.
|
||||
/// Returns true if a slot was acquired, false if the limit would be exceeded.
|
||||
/// </summary>
|
||||
public async ValueTask<bool> TryAcquireAsync(string subscriptionId, CancellationToken ct = default)
|
||||
{
|
||||
var state = GetOrCreate(subscriptionId);
|
||||
return await state.Semaphore.WaitAsync(0, ct) || false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits for a slot to become available. Blocks until one is released or cancelled.
|
||||
/// </summary>
|
||||
public async ValueTask AcquireAsync(string subscriptionId, CancellationToken ct = default)
|
||||
{
|
||||
var state = GetOrCreate(subscriptionId);
|
||||
await state.Semaphore.WaitAsync(ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Releases a slot after receiving PUBACK/PUBCOMP.
|
||||
/// If the semaphore is already at max (duplicate or spurious ack), the release is a no-op.
|
||||
/// </summary>
|
||||
public void Release(string subscriptionId)
|
||||
{
|
||||
if (_subscriptions.TryGetValue(subscriptionId, out var state))
|
||||
{
|
||||
// Guard against releasing more than the max (e.g. duplicate PUBACK).
|
||||
// CurrentCount == MaxAckPending means nothing is pending, so there is nothing to release.
|
||||
if (state.Semaphore.CurrentCount < state.MaxAckPending)
|
||||
state.Semaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the current pending count for a subscription.
|
||||
/// </summary>
|
||||
public int GetPendingCount(string subscriptionId)
|
||||
{
|
||||
if (!_subscriptions.TryGetValue(subscriptionId, out var state))
|
||||
return 0;
|
||||
return state.MaxAckPending - state.Semaphore.CurrentCount;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates the MaxAckPending limit (e.g., on config reload).
|
||||
/// Creates a new semaphore with the updated limit.
|
||||
/// </summary>
|
||||
public void UpdateLimit(int newLimit)
|
||||
{
|
||||
_defaultMaxAckPending = newLimit;
|
||||
// Note: existing subscriptions keep their old limit until re-created
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes tracking for a subscription.
|
||||
/// </summary>
|
||||
public void RemoveSubscription(string subscriptionId)
|
||||
{
|
||||
if (_subscriptions.TryRemove(subscriptionId, out var state))
|
||||
state.Semaphore.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>Number of tracked subscriptions.</summary>
|
||||
public int SubscriptionCount => _subscriptions.Count;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
foreach (var kvp in _subscriptions)
|
||||
kvp.Value.Semaphore.Dispose();
|
||||
_subscriptions.Clear();
|
||||
}
|
||||
|
||||
private SubscriptionFlowState GetOrCreate(string subscriptionId)
|
||||
{
|
||||
return _subscriptions.GetOrAdd(subscriptionId, _ => new SubscriptionFlowState
|
||||
{
|
||||
MaxAckPending = _defaultMaxAckPending,
|
||||
Semaphore = new SemaphoreSlim(_defaultMaxAckPending, _defaultMaxAckPending),
|
||||
});
|
||||
}
|
||||
|
||||
private sealed class SubscriptionFlowState
|
||||
{
|
||||
public int MaxAckPending { get; init; }
|
||||
public required SemaphoreSlim Semaphore { get; init; }
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,26 @@
|
||||
// Go reference: golang/nats-server/server/mqtt.go:253-300
|
||||
// Session state management — mqttInitSessionStore / mqttStoreSession
|
||||
// Flapper detection — mqttCheckFlapper (lines ~300–360)
|
||||
// Will message delivery — mqttDeliverWill (lines ~490–530)
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Mqtt;
|
||||
|
||||
/// <summary>
|
||||
/// Will message to be published on abnormal client disconnection.
|
||||
/// Go reference: server/mqtt.go mqttWill struct ~line 270.
|
||||
/// </summary>
|
||||
public sealed class WillMessage
|
||||
{
|
||||
public string Topic { get; init; } = string.Empty;
|
||||
public byte[] Payload { get; init; } = [];
|
||||
public byte QoS { get; init; }
|
||||
public bool Retain { get; init; }
|
||||
public int DelayIntervalSeconds { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serializable session data for an MQTT client.
|
||||
/// Go reference: server/mqtt.go mqttSession struct ~line 253.
|
||||
@@ -35,6 +49,8 @@ public sealed class MqttSessionStore
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, MqttSessionData> _sessions = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<DateTime>> _connectHistory = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, WillMessage> _wills = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, (WillMessage Will, DateTime ScheduledAt)> _delayedWills = new(StringComparer.Ordinal);
|
||||
|
||||
private readonly TimeSpan _flapWindow;
|
||||
private readonly int _flapThreshold;
|
||||
@@ -45,6 +61,13 @@ public sealed class MqttSessionStore
|
||||
/// <summary>Backing store for JetStream persistence. Null for in-memory only.</summary>
|
||||
public IStreamStore? BackingStore => _backingStore;
|
||||
|
||||
/// <summary>
|
||||
/// Delegate invoked when a will message is published.
|
||||
/// Parameters: topic, payload, qos, retain.
|
||||
/// Go reference: server/mqtt.go mqttDeliverWill ~line 495.
|
||||
/// </summary>
|
||||
public Action<string, byte[], byte, bool>? OnPublish { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new session store.
|
||||
/// </summary>
|
||||
@@ -83,6 +106,65 @@ public sealed class MqttSessionStore
|
||||
_backingStore = backingStore;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets the will message for the given client, replacing any existing will.
|
||||
/// Called when a CONNECT packet with will flag is received.
|
||||
/// Go reference: server/mqtt.go mqttSession will field ~line 270.
|
||||
/// </summary>
|
||||
public void SetWill(string clientId, WillMessage will)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(will);
|
||||
_wills[clientId] = will;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Clears the will message for the given client.
|
||||
/// Called on a clean DISCONNECT (no will should be sent).
|
||||
/// Go reference: server/mqtt.go mqttDeliverWill — will is cleared on graceful disconnect.
|
||||
/// </summary>
|
||||
public void ClearWill(string clientId)
|
||||
{
|
||||
_wills.TryRemove(clientId, out _);
|
||||
_delayedWills.TryRemove(clientId, out _);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the current will message for the given client, or null if none.
|
||||
/// </summary>
|
||||
public WillMessage? GetWill(string clientId) =>
|
||||
_wills.TryGetValue(clientId, out var will) ? will : null;
|
||||
|
||||
/// <summary>
|
||||
/// Publishes the will message for the given client on abnormal disconnection.
|
||||
/// If the will has a delay > 0, the will is recorded as delayed and not immediately published.
|
||||
/// Clears the will after publishing (or scheduling).
|
||||
/// Returns true if a will was found, false if none was registered.
|
||||
/// Go reference: server/mqtt.go mqttDeliverWill ~line 490.
|
||||
/// </summary>
|
||||
public bool PublishWillMessage(string clientId)
|
||||
{
|
||||
if (!_wills.TryRemove(clientId, out var will))
|
||||
return false;
|
||||
|
||||
if (will.DelayIntervalSeconds > 0)
|
||||
{
|
||||
// Track as delayed — not immediately published.
|
||||
// A full implementation would schedule via a timer; for now we record it.
|
||||
_delayedWills[clientId] = (will, _timeProvider.GetUtcNow().UtcDateTime);
|
||||
return true;
|
||||
}
|
||||
|
||||
OnPublish?.Invoke(will.Topic, will.Payload, will.QoS, will.Retain);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the delayed will entry for the given client if one exists,
|
||||
/// or null if the client has no pending delayed will.
|
||||
/// </summary>
|
||||
public (WillMessage Will, DateTime ScheduledAt)? GetDelayedWill(string clientId) =>
|
||||
_delayedWills.TryGetValue(clientId, out var entry) ? entry : null;
|
||||
|
||||
/// <summary>
|
||||
/// Saves (or overwrites) session data for the given client.
|
||||
/// Go reference: server/mqtt.go mqttStoreSession.
|
||||
|
||||
Reference in New Issue
Block a user