feat(mqtt): wire will message delivery on abnormal disconnect
Add DeliverWill to MqttPacketHandlers — extracts and clears the will, then delivers via ProcessInboundClientMsg. Call from CloseConnection when client is MQTT. Clean DISCONNECT already clears the will (Task 3), so DeliverWill is a no-op after DISCONNECT. 3 new tests: abnormal close delivers will, DISCONNECT suppresses will, no-will is a no-op.
This commit is contained in:
@@ -1330,6 +1330,10 @@ public sealed partial class ClientConnection
|
|||||||
ClearPingTimer();
|
ClearPingTimer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deliver MQTT will message on abnormal disconnect.
|
||||||
|
if (IsMqtt())
|
||||||
|
MqttPacketHandlers.DeliverWill(this);
|
||||||
|
|
||||||
// Close the underlying network connection.
|
// Close the underlying network connection.
|
||||||
try { _nc?.Close(); } catch { /* ignore */ }
|
try { _nc?.Close(); } catch { /* ignore */ }
|
||||||
_nc = null;
|
_nc = null;
|
||||||
|
|||||||
@@ -238,6 +238,33 @@ internal static class MqttPacketHandlers
|
|||||||
c.CloseConnection(ClosedState.ClientClosed);
|
c.CloseConnection(ClosedState.ClientClosed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Will message delivery
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Delivers the will message if one is set and the connection was not cleanly
|
||||||
|
/// disconnected (i.e., DISCONNECT was not received, so will is still non-null).
|
||||||
|
/// Called from CloseConnection(). Mirrors Go <c>mqttHandleWill()</c>.
|
||||||
|
/// </summary>
|
||||||
|
public static void DeliverWill(ClientConnection c)
|
||||||
|
{
|
||||||
|
MqttWill? will;
|
||||||
|
lock (c)
|
||||||
|
{
|
||||||
|
if (c.Mqtt == null) return;
|
||||||
|
will = c.Mqtt.Will;
|
||||||
|
c.Mqtt.Will = null; // Prevent duplicate delivery.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (will == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Deliver the will message via internal NATS routing.
|
||||||
|
var payload = will.Msg ?? [];
|
||||||
|
c.ProcessInboundClientMsg(payload);
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// PUBLISH parsing + processing
|
// PUBLISH parsing + processing
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
|
|||||||
@@ -383,4 +383,68 @@ public sealed class MqttConnectTests
|
|||||||
// Will should be cleared.
|
// Will should be cleared.
|
||||||
c.Mqtt.Will.ShouldBeNull();
|
c.Mqtt.Will.ShouldBeNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Will message delivery tests
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DeliverWill_AbnormalClose_ShouldDeliverAndClearWill()
|
||||||
|
{
|
||||||
|
var c = CreateMqttClient();
|
||||||
|
|
||||||
|
// CONNECT with a will.
|
||||||
|
var connectBuf = BuildConnectPacket(
|
||||||
|
willTopic: "test/will",
|
||||||
|
willMessage: Encoding.UTF8.GetBytes("goodbye"));
|
||||||
|
|
||||||
|
var err = MqttParser.Parse(c, connectBuf, connectBuf.Length);
|
||||||
|
err.ShouldBeNull();
|
||||||
|
c.Mqtt!.Will.ShouldNotBeNull();
|
||||||
|
|
||||||
|
// Simulate abnormal close (not DISCONNECT) — will should be delivered.
|
||||||
|
MqttPacketHandlers.DeliverWill(c);
|
||||||
|
|
||||||
|
// Will should be cleared after delivery.
|
||||||
|
c.Mqtt.Will.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DeliverWill_AfterDisconnect_ShouldNotDeliver()
|
||||||
|
{
|
||||||
|
var c = CreateMqttClient();
|
||||||
|
|
||||||
|
// CONNECT with a will.
|
||||||
|
var connectBuf = BuildConnectPacket(
|
||||||
|
willTopic: "test/will",
|
||||||
|
willMessage: Encoding.UTF8.GetBytes("goodbye"));
|
||||||
|
|
||||||
|
var err = MqttParser.Parse(c, connectBuf, connectBuf.Length);
|
||||||
|
err.ShouldBeNull();
|
||||||
|
|
||||||
|
// Clean DISCONNECT — clears the will.
|
||||||
|
var disconnectBuf = new byte[] { MqttPacket.Disconnect, 0x00 };
|
||||||
|
err = MqttParser.Parse(c, disconnectBuf, disconnectBuf.Length);
|
||||||
|
err.ShouldBeNull();
|
||||||
|
|
||||||
|
// Will was already cleared by DISCONNECT, so DeliverWill should be a no-op.
|
||||||
|
MqttPacketHandlers.DeliverWill(c);
|
||||||
|
c.Mqtt!.Will.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DeliverWill_NoWill_ShouldBeNoOp()
|
||||||
|
{
|
||||||
|
var c = CreateMqttClient();
|
||||||
|
|
||||||
|
// CONNECT without a will.
|
||||||
|
var connectBuf = BuildConnectPacket();
|
||||||
|
var err = MqttParser.Parse(c, connectBuf, connectBuf.Length);
|
||||||
|
err.ShouldBeNull();
|
||||||
|
c.Mqtt!.Will.ShouldBeNull();
|
||||||
|
|
||||||
|
// DeliverWill with no will configured — should be a no-op.
|
||||||
|
MqttPacketHandlers.DeliverWill(c);
|
||||||
|
c.Mqtt.Will.ShouldBeNull();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# NATS .NET Porting Status Report
|
# NATS .NET Porting Status Report
|
||||||
|
|
||||||
Generated: 2026-03-01 21:08:29 UTC
|
Generated: 2026-03-01 21:10:54 UTC
|
||||||
|
|
||||||
## Modules (12 total)
|
## Modules (12 total)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user