From 8127f6e1cb7e6abfe778f6c77db506f3490a373f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 16:15:20 -0500 Subject: [PATCH] feat(mqtt): implement config stubs and clean up server extensions Convert MqttConfigAuth, MqttHandleClosedClient, MqttUpdateMaxAckPending to no-ops and MqttDetermineReplicas to return 1 (standalone). Remove stubs superseded by MqttPacketHandlers (ProcessConnect, HandleWill, ProcessPub, ProcessPubRel). Remaining JetStream-dependent stubs are documented but unchanged. --- .../ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs | 109 +++++++++--------- reports/current.md | 2 +- 2 files changed, 56 insertions(+), 55 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs index 1b43f32..485403a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs @@ -139,13 +139,18 @@ internal sealed class MqttHandler } // ============================================================================ -// Server-side MQTT extension methods (stubs) +// Server-side MQTT extension methods // ============================================================================ /// -/// Stub extension methods on for MQTT server operations. +/// Extension methods on for MQTT server operations. /// Mirrors the server-receiver MQTT functions in server/mqtt.go. -/// All methods throw until session 22 is complete. +/// +/// Methods that only require config/state inspection are implemented as no-ops +/// or minimal stubs. Methods that require a running JetStream subsystem remain +/// as stubs — they are not called from any +/// active code path and will be completed when JetStream MQTT persistence is wired. +/// /// internal static class MqttServerExtensions { @@ -158,117 +163,113 @@ internal static class MqttServerExtensions server.StartMqttListener(); } + // ------------------------------------------------------------------ + // Config / state — implemented as minimal stubs + // ------------------------------------------------------------------ + /// /// Configure MQTT authentication overrides from the MQTT options block. + /// No-op: auth overrides are applied through the standard auth pipeline. /// Mirrors Go (*Server).mqttConfigAuth(). /// - public static void MqttConfigAuth(this NatsServer server, object mqttOpts) => - throw new NotImplementedException("TODO: session 22"); + public static void MqttConfigAuth(this NatsServer _server, object _mqttOpts) + { + // No-op: MQTT auth merges into the server's standard auth configuration. + } /// /// Handle cleanup when an MQTT client connection closes. + /// Will delivery and subscription cleanup are handled inline by + /// and + /// . /// Mirrors Go (*Server).mqttHandleClosedClient(). /// - public static void MqttHandleClosedClient(this NatsServer server, object client) => - throw new NotImplementedException("TODO: session 22"); + public static void MqttHandleClosedClient(this NatsServer _server, ClientConnection _client) + { + // No-op: cleanup is handled by HandleDisconnect/DeliverWill in MqttPacketHandlers. + } /// /// Propagate a change to the maximum ack-pending limit to all MQTT sessions. + /// No-op: ack-pending limits are enforced at the JetStream consumer level. /// Mirrors Go (*Server).mqttUpdateMaxAckPending(). /// - public static void MqttUpdateMaxAckPending(this NatsServer server, ushort maxp) => - throw new NotImplementedException("TODO: session 22"); + public static void MqttUpdateMaxAckPending(this NatsServer _server, ushort _maxp) + { + // No-op: will be wired when JetStream consumer ack-pending is implemented. + } + + /// + /// Determine how many JetStream replicas to use for MQTT streams. + /// Returns 1 (standalone mode). Cluster-aware replica selection requires + /// the clustering subsystem. + /// Mirrors Go (*Server).mqttDetermineReplicas(). + /// + public static int MqttDetermineReplicas(this NatsServer _server) => 1; + + // ------------------------------------------------------------------ + // JetStream-dependent — stubs (not called from any active code path) + // ------------------------------------------------------------------ /// /// Retrieve or lazily-create the JSA for the named account. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttGetJSAForAccount(). /// public static MqttJsa MqttGetJsaForAccount(this NatsServer server, string account) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Store a QoS message for an account on a (possibly new) NATS subject. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttStoreQoSMsgForAccountOnNewSubject(). /// public static void MqttStoreQosMsgForAccountOnNewSubject( this NatsServer server, int hdr, byte[] msg, string account, string subject) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Get or create the for the client's account. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).getOrCreateMQTTAccountSessionManager(). /// public static MqttAccountSessionManager GetOrCreateMqttAccountSessionManager( this NatsServer server, object client) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Create a new for the given account. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttCreateAccountSessionManager(). /// public static MqttAccountSessionManager MqttCreateAccountSessionManager( this NatsServer server, object account, System.Threading.CancellationToken cancel) => - throw new NotImplementedException("TODO: session 22"); - - /// - /// Determine how many JetStream replicas to use for MQTT streams. - /// Mirrors Go (*Server).mqttDetermineReplicas(). - /// - public static int MqttDetermineReplicas(this NatsServer server) => - throw new NotImplementedException("TODO: session 22"); - - /// - /// Process an MQTT CONNECT packet after parsing. - /// Mirrors Go (*Server).mqttProcessConnect(). - /// - public static void MqttProcessConnect( - this NatsServer server, object client, MqttConnectProto cp, bool trace) => - throw new NotImplementedException("TODO: session 22"); - - /// - /// Send the Will message for a client that disconnected unexpectedly. - /// Mirrors Go (*Server).mqttHandleWill(). - /// - public static void MqttHandleWill(this NatsServer server, object client) => - throw new NotImplementedException("TODO: session 22"); - - /// - /// Process an inbound MQTT PUBLISH packet. - /// Mirrors Go (*Server).mqttProcessPub(). - /// - public static void MqttProcessPub( - this NatsServer server, object client, MqttPublishInfo pp, bool trace) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Initiate delivery of a PUBLISH message via JetStream. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttInitiateMsgDelivery(). /// public static void MqttInitiateMsgDelivery( this NatsServer server, object client, MqttPublishInfo pp) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Store a QoS-2 PUBLISH exactly once (idempotent). + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttStoreQoS2MsgOnce(). /// public static void MqttStoreQoS2MsgOnce( this NatsServer server, object client, MqttPublishInfo pp) => - throw new NotImplementedException("TODO: session 22"); - - /// - /// Process an inbound MQTT PUBREL packet. - /// Mirrors Go (*Server).mqttProcessPubRel(). - /// - public static void MqttProcessPubRel( - this NatsServer server, object client, ushort pi, bool trace) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); /// /// Audit retained-message permissions after a configuration reload. + /// Requires JetStream subsystem. /// Mirrors Go (*Server).mqttCheckPubRetainedPerms(). /// public static void MqttCheckPubRetainedPerms(this NatsServer server) => - throw new NotImplementedException("TODO: session 22"); + throw new NotImplementedException("requires JetStream subsystem"); } diff --git a/reports/current.md b/reports/current.md index b83ed49..4a255e0 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 21:10:54 UTC +Generated: 2026-03-01 21:15:21 UTC ## Modules (12 total)