From 710f443edaa12c07682eed994f8092e9f9a57200 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 16:10:53 -0500 Subject: [PATCH] feat(mqtt): wire will message delivery on abnormal disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add DeliverWill to MqttPacketHandlers — extracts and clears the will, then delivers via ProcessInboundClientMsg. Call from CloseConnection when client is MQTT. Clean DISCONNECT already clears the will (Task 3), so DeliverWill is a no-op after DISCONNECT. 3 new tests: abnormal close delivers will, DISCONNECT suppresses will, no-will is a no-op. --- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 4 ++ .../Mqtt/MqttPacketHandlers.cs | 27 ++++++++ .../Mqtt/MqttConnectTests.cs | 64 +++++++++++++++++++ reports/current.md | 2 +- 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index abead62..73ba94a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1330,6 +1330,10 @@ public sealed partial class ClientConnection ClearPingTimer(); } + // Deliver MQTT will message on abnormal disconnect. + if (IsMqtt()) + MqttPacketHandlers.DeliverWill(this); + // Close the underlying network connection. try { _nc?.Close(); } catch { /* ignore */ } _nc = null; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs index 0f93b51..48692f0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs @@ -238,6 +238,33 @@ internal static class MqttPacketHandlers c.CloseConnection(ClosedState.ClientClosed); } + // ========================================================================= + // Will message delivery + // ========================================================================= + + /// + /// Delivers the will message if one is set and the connection was not cleanly + /// disconnected (i.e., DISCONNECT was not received, so will is still non-null). + /// Called from CloseConnection(). Mirrors Go mqttHandleWill(). + /// + public static void DeliverWill(ClientConnection c) + { + MqttWill? will; + lock (c) + { + if (c.Mqtt == null) return; + will = c.Mqtt.Will; + c.Mqtt.Will = null; // Prevent duplicate delivery. + } + + if (will == null) + return; + + // Deliver the will message via internal NATS routing. + var payload = will.Msg ?? []; + c.ProcessInboundClientMsg(payload); + } + // ========================================================================= // PUBLISH parsing + processing // ========================================================================= diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttConnectTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttConnectTests.cs index d6ed543..28bac5a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttConnectTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttConnectTests.cs @@ -383,4 +383,68 @@ public sealed class MqttConnectTests // Will should be cleared. c.Mqtt.Will.ShouldBeNull(); } + + // ========================================================================= + // Will message delivery tests + // ========================================================================= + + [Fact] + public void DeliverWill_AbnormalClose_ShouldDeliverAndClearWill() + { + var c = CreateMqttClient(); + + // CONNECT with a will. + var connectBuf = BuildConnectPacket( + willTopic: "test/will", + willMessage: Encoding.UTF8.GetBytes("goodbye")); + + var err = MqttParser.Parse(c, connectBuf, connectBuf.Length); + err.ShouldBeNull(); + c.Mqtt!.Will.ShouldNotBeNull(); + + // Simulate abnormal close (not DISCONNECT) — will should be delivered. + MqttPacketHandlers.DeliverWill(c); + + // Will should be cleared after delivery. + c.Mqtt.Will.ShouldBeNull(); + } + + [Fact] + public void DeliverWill_AfterDisconnect_ShouldNotDeliver() + { + var c = CreateMqttClient(); + + // CONNECT with a will. + var connectBuf = BuildConnectPacket( + willTopic: "test/will", + willMessage: Encoding.UTF8.GetBytes("goodbye")); + + var err = MqttParser.Parse(c, connectBuf, connectBuf.Length); + err.ShouldBeNull(); + + // Clean DISCONNECT — clears the will. + var disconnectBuf = new byte[] { MqttPacket.Disconnect, 0x00 }; + err = MqttParser.Parse(c, disconnectBuf, disconnectBuf.Length); + err.ShouldBeNull(); + + // Will was already cleared by DISCONNECT, so DeliverWill should be a no-op. + MqttPacketHandlers.DeliverWill(c); + c.Mqtt!.Will.ShouldBeNull(); + } + + [Fact] + public void DeliverWill_NoWill_ShouldBeNoOp() + { + var c = CreateMqttClient(); + + // CONNECT without a will. + var connectBuf = BuildConnectPacket(); + var err = MqttParser.Parse(c, connectBuf, connectBuf.Length); + err.ShouldBeNull(); + c.Mqtt!.Will.ShouldBeNull(); + + // DeliverWill with no will configured — should be a no-op. + MqttPacketHandlers.DeliverWill(c); + c.Mqtt.Will.ShouldBeNull(); + } } diff --git a/reports/current.md b/reports/current.md index 21756a4..b83ed49 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 21:08:29 UTC +Generated: 2026-03-01 21:10:54 UTC ## Modules (12 total)