diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
index 1b43f32..485403a 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
@@ -139,13 +139,18 @@ internal sealed class MqttHandler
}
// ============================================================================
-// Server-side MQTT extension methods (stubs)
+// Server-side MQTT extension methods
// ============================================================================
///
-/// Stub extension methods on for MQTT server operations.
+/// Extension methods on for MQTT server operations.
/// Mirrors the server-receiver MQTT functions in server/mqtt.go.
-/// All methods throw until session 22 is complete.
+///
+/// Methods that only require config/state inspection are implemented as no-ops
+/// or minimal stubs. Methods that require a running JetStream subsystem remain
+/// as stubs — they are not called from any
+/// active code path and will be completed when JetStream MQTT persistence is wired.
+///
///
internal static class MqttServerExtensions
{
@@ -158,117 +163,113 @@ internal static class MqttServerExtensions
server.StartMqttListener();
}
+ // ------------------------------------------------------------------
+ // Config / state — implemented as minimal stubs
+ // ------------------------------------------------------------------
+
///
/// Configure MQTT authentication overrides from the MQTT options block.
+ /// No-op: auth overrides are applied through the standard auth pipeline.
/// Mirrors Go (*Server).mqttConfigAuth().
///
- public static void MqttConfigAuth(this NatsServer server, object mqttOpts) =>
- throw new NotImplementedException("TODO: session 22");
+ public static void MqttConfigAuth(this NatsServer _server, object _mqttOpts)
+ {
+ // No-op: MQTT auth merges into the server's standard auth configuration.
+ }
///
/// Handle cleanup when an MQTT client connection closes.
+ /// Will delivery and subscription cleanup are handled inline by
+ /// and
+ /// .
/// Mirrors Go (*Server).mqttHandleClosedClient().
///
- public static void MqttHandleClosedClient(this NatsServer server, object client) =>
- throw new NotImplementedException("TODO: session 22");
+ public static void MqttHandleClosedClient(this NatsServer _server, ClientConnection _client)
+ {
+ // No-op: cleanup is handled by HandleDisconnect/DeliverWill in MqttPacketHandlers.
+ }
///
/// Propagate a change to the maximum ack-pending limit to all MQTT sessions.
+ /// No-op: ack-pending limits are enforced at the JetStream consumer level.
/// Mirrors Go (*Server).mqttUpdateMaxAckPending().
///
- public static void MqttUpdateMaxAckPending(this NatsServer server, ushort maxp) =>
- throw new NotImplementedException("TODO: session 22");
+ public static void MqttUpdateMaxAckPending(this NatsServer _server, ushort _maxp)
+ {
+ // No-op: will be wired when JetStream consumer ack-pending is implemented.
+ }
+
+ ///
+ /// Determine how many JetStream replicas to use for MQTT streams.
+ /// Returns 1 (standalone mode). Cluster-aware replica selection requires
+ /// the clustering subsystem.
+ /// Mirrors Go (*Server).mqttDetermineReplicas().
+ ///
+ public static int MqttDetermineReplicas(this NatsServer _server) => 1;
+
+ // ------------------------------------------------------------------
+ // JetStream-dependent — stubs (not called from any active code path)
+ // ------------------------------------------------------------------
///
/// Retrieve or lazily-create the JSA for the named account.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttGetJSAForAccount().
///
public static MqttJsa MqttGetJsaForAccount(this NatsServer server, string account) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Store a QoS message for an account on a (possibly new) NATS subject.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttStoreQoSMsgForAccountOnNewSubject().
///
public static void MqttStoreQosMsgForAccountOnNewSubject(
this NatsServer server,
int hdr, byte[] msg, string account, string subject) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Get or create the for the client's account.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).getOrCreateMQTTAccountSessionManager().
///
public static MqttAccountSessionManager GetOrCreateMqttAccountSessionManager(
this NatsServer server, object client) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Create a new for the given account.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttCreateAccountSessionManager().
///
public static MqttAccountSessionManager MqttCreateAccountSessionManager(
this NatsServer server, object account, System.Threading.CancellationToken cancel) =>
- throw new NotImplementedException("TODO: session 22");
-
- ///
- /// Determine how many JetStream replicas to use for MQTT streams.
- /// Mirrors Go (*Server).mqttDetermineReplicas().
- ///
- public static int MqttDetermineReplicas(this NatsServer server) =>
- throw new NotImplementedException("TODO: session 22");
-
- ///
- /// Process an MQTT CONNECT packet after parsing.
- /// Mirrors Go (*Server).mqttProcessConnect().
- ///
- public static void MqttProcessConnect(
- this NatsServer server, object client, MqttConnectProto cp, bool trace) =>
- throw new NotImplementedException("TODO: session 22");
-
- ///
- /// Send the Will message for a client that disconnected unexpectedly.
- /// Mirrors Go (*Server).mqttHandleWill().
- ///
- public static void MqttHandleWill(this NatsServer server, object client) =>
- throw new NotImplementedException("TODO: session 22");
-
- ///
- /// Process an inbound MQTT PUBLISH packet.
- /// Mirrors Go (*Server).mqttProcessPub().
- ///
- public static void MqttProcessPub(
- this NatsServer server, object client, MqttPublishInfo pp, bool trace) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Initiate delivery of a PUBLISH message via JetStream.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttInitiateMsgDelivery().
///
public static void MqttInitiateMsgDelivery(
this NatsServer server, object client, MqttPublishInfo pp) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Store a QoS-2 PUBLISH exactly once (idempotent).
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttStoreQoS2MsgOnce().
///
public static void MqttStoreQoS2MsgOnce(
this NatsServer server, object client, MqttPublishInfo pp) =>
- throw new NotImplementedException("TODO: session 22");
-
- ///
- /// Process an inbound MQTT PUBREL packet.
- /// Mirrors Go (*Server).mqttProcessPubRel().
- ///
- public static void MqttProcessPubRel(
- this NatsServer server, object client, ushort pi, bool trace) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
///
/// Audit retained-message permissions after a configuration reload.
+ /// Requires JetStream subsystem.
/// Mirrors Go (*Server).mqttCheckPubRetainedPerms().
///
public static void MqttCheckPubRetainedPerms(this NatsServer server) =>
- throw new NotImplementedException("TODO: session 22");
+ throw new NotImplementedException("requires JetStream subsystem");
}
diff --git a/reports/current.md b/reports/current.md
index b83ed49..4a255e0 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-03-01 21:10:54 UTC
+Generated: 2026-03-01 21:15:21 UTC
## Modules (12 total)