diff --git a/src/NATS.Server/Mqtt/MqttFlowController.cs b/src/NATS.Server/Mqtt/MqttFlowController.cs
new file mode 100644
index 0000000..f3c6e12
--- /dev/null
+++ b/src/NATS.Server/Mqtt/MqttFlowController.cs
@@ -0,0 +1,110 @@
+using System.Collections.Concurrent;
+
+namespace NATS.Server.Mqtt;
+
+///
+/// Flow controller for MQTT QoS 1/2 messages, enforcing MaxAckPending limits.
+/// Uses SemaphoreSlim for async-compatible blocking when the limit is reached.
+/// Go reference: server/mqtt.go — mqttMaxAckPending, flow control logic.
+///
+public sealed class MqttFlowController : IDisposable
+{
+ private readonly ConcurrentDictionary _subscriptions = new(StringComparer.Ordinal);
+ private int _defaultMaxAckPending;
+
+ public MqttFlowController(int defaultMaxAckPending = 1024)
+ {
+ _defaultMaxAckPending = defaultMaxAckPending;
+ }
+
+ /// Default MaxAckPending limit for new subscriptions.
+ public int DefaultMaxAckPending => _defaultMaxAckPending;
+
+ ///
+ /// Tries to acquire a slot for sending a QoS message on the given subscription.
+ /// Returns true if a slot was acquired, false if the limit would be exceeded.
+ ///
+ public async ValueTask TryAcquireAsync(string subscriptionId, CancellationToken ct = default)
+ {
+ var state = GetOrCreate(subscriptionId);
+ return await state.Semaphore.WaitAsync(0, ct) || false;
+ }
+
+ ///
+ /// Waits for a slot to become available. Blocks until one is released or cancelled.
+ ///
+ public async ValueTask AcquireAsync(string subscriptionId, CancellationToken ct = default)
+ {
+ var state = GetOrCreate(subscriptionId);
+ await state.Semaphore.WaitAsync(ct);
+ }
+
+ ///
+ /// Releases a slot after receiving PUBACK/PUBCOMP.
+ /// If the semaphore is already at max (duplicate or spurious ack), the release is a no-op.
+ ///
+ public void Release(string subscriptionId)
+ {
+ if (_subscriptions.TryGetValue(subscriptionId, out var state))
+ {
+ // Guard against releasing more than the max (e.g. duplicate PUBACK).
+ // CurrentCount == MaxAckPending means nothing is pending, so there is nothing to release.
+ if (state.Semaphore.CurrentCount < state.MaxAckPending)
+ state.Semaphore.Release();
+ }
+ }
+
+ ///
+ /// Returns the current pending count for a subscription.
+ ///
+ public int GetPendingCount(string subscriptionId)
+ {
+ if (!_subscriptions.TryGetValue(subscriptionId, out var state))
+ return 0;
+ return state.MaxAckPending - state.Semaphore.CurrentCount;
+ }
+
+ ///
+ /// Updates the MaxAckPending limit (e.g., on config reload).
+ /// Creates a new semaphore with the updated limit.
+ ///
+ public void UpdateLimit(int newLimit)
+ {
+ _defaultMaxAckPending = newLimit;
+ // Note: existing subscriptions keep their old limit until re-created
+ }
+
+ ///
+ /// Removes tracking for a subscription.
+ ///
+ public void RemoveSubscription(string subscriptionId)
+ {
+ if (_subscriptions.TryRemove(subscriptionId, out var state))
+ state.Semaphore.Dispose();
+ }
+
+ /// Number of tracked subscriptions.
+ public int SubscriptionCount => _subscriptions.Count;
+
+ public void Dispose()
+ {
+ foreach (var kvp in _subscriptions)
+ kvp.Value.Semaphore.Dispose();
+ _subscriptions.Clear();
+ }
+
+ private SubscriptionFlowState GetOrCreate(string subscriptionId)
+ {
+ return _subscriptions.GetOrAdd(subscriptionId, _ => new SubscriptionFlowState
+ {
+ MaxAckPending = _defaultMaxAckPending,
+ Semaphore = new SemaphoreSlim(_defaultMaxAckPending, _defaultMaxAckPending),
+ });
+ }
+
+ private sealed class SubscriptionFlowState
+ {
+ public int MaxAckPending { get; init; }
+ public required SemaphoreSlim Semaphore { get; init; }
+ }
+}
diff --git a/src/NATS.Server/Mqtt/MqttSessionStore.cs b/src/NATS.Server/Mqtt/MqttSessionStore.cs
index 1627438..f2871de 100644
--- a/src/NATS.Server/Mqtt/MqttSessionStore.cs
+++ b/src/NATS.Server/Mqtt/MqttSessionStore.cs
@@ -2,12 +2,26 @@
// Go reference: golang/nats-server/server/mqtt.go:253-300
// Session state management — mqttInitSessionStore / mqttStoreSession
// Flapper detection — mqttCheckFlapper (lines ~300–360)
+// Will message delivery — mqttDeliverWill (lines ~490–530)
using System.Collections.Concurrent;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Mqtt;
+///
+/// Will message to be published on abnormal client disconnection.
+/// Go reference: server/mqtt.go mqttWill struct ~line 270.
+///
+public sealed class WillMessage
+{
+ public string Topic { get; init; } = string.Empty;
+ public byte[] Payload { get; init; } = [];
+ public byte QoS { get; init; }
+ public bool Retain { get; init; }
+ public int DelayIntervalSeconds { get; init; }
+}
+
///
/// Serializable session data for an MQTT client.
/// Go reference: server/mqtt.go mqttSession struct ~line 253.
@@ -35,6 +49,8 @@ public sealed class MqttSessionStore
{
private readonly ConcurrentDictionary _sessions = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary> _connectHistory = new(StringComparer.Ordinal);
+ private readonly ConcurrentDictionary _wills = new(StringComparer.Ordinal);
+ private readonly ConcurrentDictionary _delayedWills = new(StringComparer.Ordinal);
private readonly TimeSpan _flapWindow;
private readonly int _flapThreshold;
@@ -45,6 +61,13 @@ public sealed class MqttSessionStore
/// Backing store for JetStream persistence. Null for in-memory only.
public IStreamStore? BackingStore => _backingStore;
+ ///
+ /// Delegate invoked when a will message is published.
+ /// Parameters: topic, payload, qos, retain.
+ /// Go reference: server/mqtt.go mqttDeliverWill ~line 495.
+ ///
+ public Action? OnPublish { get; set; }
+
///
/// Initializes a new session store.
///
@@ -83,6 +106,65 @@ public sealed class MqttSessionStore
_backingStore = backingStore;
}
+ ///
+ /// Sets the will message for the given client, replacing any existing will.
+ /// Called when a CONNECT packet with will flag is received.
+ /// Go reference: server/mqtt.go mqttSession will field ~line 270.
+ ///
+ public void SetWill(string clientId, WillMessage will)
+ {
+ ArgumentNullException.ThrowIfNull(will);
+ _wills[clientId] = will;
+ }
+
+ ///
+ /// Clears the will message for the given client.
+ /// Called on a clean DISCONNECT (no will should be sent).
+ /// Go reference: server/mqtt.go mqttDeliverWill — will is cleared on graceful disconnect.
+ ///
+ public void ClearWill(string clientId)
+ {
+ _wills.TryRemove(clientId, out _);
+ _delayedWills.TryRemove(clientId, out _);
+ }
+
+ ///
+ /// Returns the current will message for the given client, or null if none.
+ ///
+ public WillMessage? GetWill(string clientId) =>
+ _wills.TryGetValue(clientId, out var will) ? will : null;
+
+ ///
+ /// Publishes the will message for the given client on abnormal disconnection.
+ /// If the will has a delay > 0, the will is recorded as delayed and not immediately published.
+ /// Clears the will after publishing (or scheduling).
+ /// Returns true if a will was found, false if none was registered.
+ /// Go reference: server/mqtt.go mqttDeliverWill ~line 490.
+ ///
+ public bool PublishWillMessage(string clientId)
+ {
+ if (!_wills.TryRemove(clientId, out var will))
+ return false;
+
+ if (will.DelayIntervalSeconds > 0)
+ {
+ // Track as delayed — not immediately published.
+ // A full implementation would schedule via a timer; for now we record it.
+ _delayedWills[clientId] = (will, _timeProvider.GetUtcNow().UtcDateTime);
+ return true;
+ }
+
+ OnPublish?.Invoke(will.Topic, will.Payload, will.QoS, will.Retain);
+ return true;
+ }
+
+ ///
+ /// Returns the delayed will entry for the given client if one exists,
+ /// or null if the client has no pending delayed will.
+ ///
+ public (WillMessage Will, DateTime ScheduledAt)? GetDelayedWill(string clientId) =>
+ _delayedWills.TryGetValue(clientId, out var entry) ? entry : null;
+
///
/// Saves (or overwrites) session data for the given client.
/// Go reference: server/mqtt.go mqttStoreSession.
diff --git a/tests/NATS.Server.Tests/Mqtt/MqttFlowControllerTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttFlowControllerTests.cs
new file mode 100644
index 0000000..534577b
--- /dev/null
+++ b/tests/NATS.Server.Tests/Mqtt/MqttFlowControllerTests.cs
@@ -0,0 +1,146 @@
+// Go reference: server/mqtt.go — mqttMaxAckPending, flow control logic.
+
+using NATS.Server.Mqtt;
+using Shouldly;
+
+namespace NATS.Server.Tests.Mqtt;
+
+public sealed class MqttFlowControllerTests
+{
+ // 1. TryAcquire succeeds when under limit
+ [Fact]
+ public async Task TryAcquire_succeeds_when_under_limit()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 1024);
+
+ var result = await fc.TryAcquireAsync("sub-1");
+
+ result.ShouldBeTrue();
+ }
+
+ // 2. TryAcquire fails when at limit
+ [Fact]
+ public async Task TryAcquire_fails_when_at_limit()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 1);
+
+ var first = await fc.TryAcquireAsync("sub-1");
+ var second = await fc.TryAcquireAsync("sub-1");
+
+ first.ShouldBeTrue();
+ second.ShouldBeFalse();
+ }
+
+ // 3. Release allows next acquire
+ [Fact]
+ public async Task Release_allows_next_acquire()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 1);
+
+ var first = await fc.TryAcquireAsync("sub-1");
+ first.ShouldBeTrue();
+
+ // At limit — second should fail
+ var atLimit = await fc.TryAcquireAsync("sub-1");
+ atLimit.ShouldBeFalse();
+
+ fc.Release("sub-1");
+
+ // After release a slot is available again
+ var afterRelease = await fc.TryAcquireAsync("sub-1");
+ afterRelease.ShouldBeTrue();
+ }
+
+ // 4. GetPendingCount tracks pending
+ [Fact]
+ public async Task GetPendingCount_tracks_pending()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 10);
+
+ await fc.AcquireAsync("sub-1");
+ await fc.AcquireAsync("sub-1");
+ await fc.AcquireAsync("sub-1");
+
+ fc.GetPendingCount("sub-1").ShouldBe(3);
+ }
+
+ // 5. GetPendingCount decrements on release
+ [Fact]
+ public async Task GetPendingCount_decrements_on_release()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 10);
+
+ await fc.AcquireAsync("sub-1");
+ await fc.AcquireAsync("sub-1");
+ await fc.AcquireAsync("sub-1");
+
+ fc.Release("sub-1");
+
+ fc.GetPendingCount("sub-1").ShouldBe(2);
+ }
+
+ // 6. GetPendingCount returns zero for unknown subscription
+ [Fact]
+ public void GetPendingCount_zero_for_unknown()
+ {
+ using var fc = new MqttFlowController();
+
+ fc.GetPendingCount("does-not-exist").ShouldBe(0);
+ }
+
+ // 7. RemoveSubscription cleans up
+ [Fact]
+ public async Task RemoveSubscription_cleans_up()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 10);
+
+ await fc.AcquireAsync("sub-1");
+ fc.SubscriptionCount.ShouldBe(1);
+
+ fc.RemoveSubscription("sub-1");
+
+ fc.SubscriptionCount.ShouldBe(0);
+ }
+
+ // 8. SubscriptionCount tracks independent subscriptions
+ [Fact]
+ public async Task SubscriptionCount_tracks_subscriptions()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 10);
+
+ await fc.AcquireAsync("sub-a");
+ await fc.AcquireAsync("sub-b");
+ await fc.AcquireAsync("sub-c");
+
+ fc.SubscriptionCount.ShouldBe(3);
+ }
+
+ // 9. DefaultMaxAckPending can be updated via UpdateLimit
+ [Fact]
+ public void DefaultMaxAckPending_can_be_updated()
+ {
+ using var fc = new MqttFlowController(defaultMaxAckPending: 1024);
+ fc.DefaultMaxAckPending.ShouldBe(1024);
+
+ fc.UpdateLimit(512);
+
+ fc.DefaultMaxAckPending.ShouldBe(512);
+ }
+
+ // 10. Dispose cleans up all subscriptions
+ [Fact]
+ public async Task Dispose_cleans_up_all()
+ {
+ var fc = new MqttFlowController(defaultMaxAckPending: 10);
+
+ await fc.AcquireAsync("sub-x");
+ await fc.AcquireAsync("sub-y");
+ await fc.AcquireAsync("sub-z");
+
+ fc.SubscriptionCount.ShouldBe(3);
+
+ fc.Dispose();
+
+ fc.SubscriptionCount.ShouldBe(0);
+ }
+}
diff --git a/tests/NATS.Server.Tests/Mqtt/MqttWillMessageTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttWillMessageTests.cs
new file mode 100644
index 0000000..56ef0ba
--- /dev/null
+++ b/tests/NATS.Server.Tests/Mqtt/MqttWillMessageTests.cs
@@ -0,0 +1,202 @@
+// Unit tests for MQTT will message delivery on abnormal disconnection.
+// Go reference: golang/nats-server/server/mqtt.go — mqttDeliverWill ~line 490,
+// TestMQTTWill server/mqtt_test.go:4129
+
+using NATS.Server.Mqtt;
+using Shouldly;
+
+namespace NATS.Server.Tests.Mqtt;
+
+public class MqttWillMessageTests
+{
+ // Go ref: mqtt.go mqttSession will field — will message is stored on CONNECT with will flag.
+ [Fact]
+ public void SetWill_stores_will_message()
+ {
+ var store = new MqttSessionStore();
+ var will = new WillMessage { Topic = "client/status", Payload = "offline"u8.ToArray(), QoS = 0, Retain = false };
+
+ store.SetWill("client-1", will);
+
+ var stored = store.GetWill("client-1");
+ stored.ShouldNotBeNull();
+ stored.Topic.ShouldBe("client/status");
+ stored.Payload.ShouldBe("offline"u8.ToArray());
+ }
+
+ // Go ref: mqttDeliverWill — on graceful DISCONNECT, will is cleared (not delivered).
+ [Fact]
+ public void ClearWill_removes_will()
+ {
+ var store = new MqttSessionStore();
+ var will = new WillMessage { Topic = "client/status", Payload = "offline"u8.ToArray() };
+
+ store.SetWill("client-2", will);
+ store.ClearWill("client-2");
+
+ store.GetWill("client-2").ShouldBeNull();
+ }
+
+ // Go ref: TestMQTTWill server/mqtt_test.go:4129 — will is published on abnormal disconnect.
+ [Fact]
+ public void PublishWillMessage_publishes_on_abnormal_disconnect()
+ {
+ var store = new MqttSessionStore();
+ string? publishedTopic = null;
+ byte[]? publishedPayload = null;
+ byte publishedQoS = 0xFF;
+ bool publishedRetain = false;
+
+ store.OnPublish = (topic, payload, qos, retain) =>
+ {
+ publishedTopic = topic;
+ publishedPayload = payload;
+ publishedQoS = qos;
+ publishedRetain = retain;
+ };
+
+ var will = new WillMessage { Topic = "device/gone", Payload = "disconnected"u8.ToArray(), QoS = 1, Retain = false };
+ store.SetWill("client-3", will);
+
+ var result = store.PublishWillMessage("client-3");
+
+ result.ShouldBeTrue();
+ publishedTopic.ShouldBe("device/gone");
+ publishedPayload.ShouldBe("disconnected"u8.ToArray());
+ publishedQoS.ShouldBe((byte)1);
+ publishedRetain.ShouldBeFalse();
+ }
+
+ // Go ref: mqttDeliverWill — no-op when no will is registered.
+ [Fact]
+ public void PublishWillMessage_returns_false_when_no_will()
+ {
+ var store = new MqttSessionStore();
+ var invoked = false;
+ store.OnPublish = (_, _, _, _) => { invoked = true; };
+
+ var result = store.PublishWillMessage("client-no-will");
+
+ result.ShouldBeFalse();
+ invoked.ShouldBeFalse();
+ }
+
+ // Go ref: mqttDeliverWill — will is consumed (not published twice).
+ [Fact]
+ public void PublishWillMessage_clears_will_after_publish()
+ {
+ var store = new MqttSessionStore();
+ store.OnPublish = (_, _, _, _) => { };
+
+ var will = new WillMessage { Topic = "sensor/status", Payload = "gone"u8.ToArray() };
+ store.SetWill("client-5", will);
+
+ store.PublishWillMessage("client-5");
+
+ store.GetWill("client-5").ShouldBeNull();
+ store.PublishWillMessage("client-5").ShouldBeFalse();
+ }
+
+ // Go ref: TestMQTTWill — graceful DISCONNECT clears the will before disconnect;
+ // subsequent PublishWillMessage has no effect.
+ [Fact]
+ public void CleanDisconnect_does_not_publish_will()
+ {
+ var store = new MqttSessionStore();
+ var invoked = false;
+ store.OnPublish = (_, _, _, _) => { invoked = true; };
+
+ var will = new WillMessage { Topic = "client/status", Payload = "bye"u8.ToArray() };
+ store.SetWill("client-6", will);
+
+ // Simulate graceful DISCONNECT: clear will before triggering publish path
+ store.ClearWill("client-6");
+ var result = store.PublishWillMessage("client-6");
+
+ result.ShouldBeFalse();
+ invoked.ShouldBeFalse();
+ }
+
+ // Go ref: TestMQTTWill — published topic and payload must exactly match what was registered.
+ [Fact]
+ public void WillMessage_preserves_topic_and_payload()
+ {
+ var store = new MqttSessionStore();
+ var capturedTopic = string.Empty;
+ var capturedPayload = Array.Empty();
+ store.OnPublish = (topic, payload, _, _) =>
+ {
+ capturedTopic = topic;
+ capturedPayload = payload;
+ };
+
+ var originalPayload = "sensor-offline-payload"u8.ToArray();
+ store.SetWill("client-7", new WillMessage { Topic = "sensors/temperature/offline", Payload = originalPayload });
+ store.PublishWillMessage("client-7");
+
+ capturedTopic.ShouldBe("sensors/temperature/offline");
+ capturedPayload.ShouldBe(originalPayload);
+ }
+
+ // Go ref: TestMQTTWill — QoS level from the will is forwarded to the broker publish path.
+ [Fact]
+ public void WillMessage_preserves_qos()
+ {
+ var store = new MqttSessionStore();
+ byte capturedQoS = 0xFF;
+ store.OnPublish = (_, _, qos, _) => { capturedQoS = qos; };
+
+ store.SetWill("client-8", new WillMessage { Topic = "t", Payload = [], QoS = 1 });
+ store.PublishWillMessage("client-8");
+
+ capturedQoS.ShouldBe((byte)1);
+ }
+
+ // Go ref: TestMQTTWillRetain — retain flag from the will is forwarded to the broker publish path.
+ [Fact]
+ public void WillMessage_preserves_retain_flag()
+ {
+ var store = new MqttSessionStore();
+ bool capturedRetain = false;
+ store.OnPublish = (_, _, _, retain) => { capturedRetain = retain; };
+
+ store.SetWill("client-9", new WillMessage { Topic = "t", Payload = [], Retain = true });
+ store.PublishWillMessage("client-9");
+
+ capturedRetain.ShouldBeTrue();
+ }
+
+ // Go ref: MQTT 5.0 Will-Delay-Interval — a will with delay > 0 is not immediately published;
+ // it is tracked as a delayed will and OnPublish is NOT called immediately.
+ [Fact]
+ public void PublishWillMessage_with_delay_stores_delayed_will_and_does_not_call_OnPublish()
+ {
+ var store = new MqttSessionStore();
+ var immediatelyPublished = false;
+ store.OnPublish = (_, _, _, _) => { immediatelyPublished = true; };
+
+ var will = new WillMessage
+ {
+ Topic = "device/status",
+ Payload = "gone"u8.ToArray(),
+ QoS = 0,
+ Retain = false,
+ DelayIntervalSeconds = 30
+ };
+ store.SetWill("client-10", will);
+
+ var result = store.PublishWillMessage("client-10");
+
+ // Returns true because a will was found
+ result.ShouldBeTrue();
+
+ // OnPublish must NOT have been called — it is delayed
+ immediatelyPublished.ShouldBeFalse();
+
+ // The will must be tracked as a pending delayed will
+ var delayed = store.GetDelayedWill("client-10");
+ delayed.ShouldNotBeNull();
+ delayed!.Value.Will.Topic.ShouldBe("device/status");
+ delayed.Value.Will.DelayIntervalSeconds.ShouldBe(30);
+ }
+}