From 6fb7f4333596e40a01a00b02734954d667fe234c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 15:35:41 -0500 Subject: [PATCH] feat(mqtt): add MQTT listener, client creation, and shutdown wiring Wire up the MQTT server-side orchestration layer (Task 1 of 7): - Create NatsServer.Mqtt.cs with StartMqttListener(), CreateMqttClient(), MqttAddr() - Forward MqttHandler.StartMqtt() to server.StartMqttListener() - Add _mqttListener to Shutdown() doneExpected counting - Fix ReadyForConnections to recognize active MQTT listener - Handle RandomPort (-1) as ephemeral for MQTT listener - Remove duplicate Mqtt field from ClientConnection.cs (already in ClientConnection.Mqtt.cs) - Add 2 MQTT boot integration tests (accept + shutdown lifecycle) --- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 2 +- .../ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs | 2 +- .../NatsServer.Lifecycle.cs | 8 +- .../ZB.MOM.NatsNet.Server/NatsServer.Mqtt.cs | 183 ++++++++++++++++++ .../src/ZB.MOM.NatsNet.Server/NatsServer.cs | 3 + .../ServerBootTests.cs | 72 +++++++ reports/current.md | 2 +- 7 files changed, 266 insertions(+), 6 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Mqtt.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 3232812..c9c5028 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1803,7 +1803,7 @@ public sealed partial class ClientConnection // IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked) // ========================================================================= - internal bool IsMqtt() => false; // Deferred to session 22 (MQTT). + internal bool IsMqtt() => Mqtt != null; internal bool IsWebSocket() => Ws != null; internal bool IsHubLeafNode() => Kind == ClientKind.Leaf && Leaf?.IsSpoke != true; internal string RemoteCluster() => Leaf?.RemoteCluster ?? string.Empty; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs index aa94c9f..fa510eb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs @@ -135,7 +135,7 @@ internal static class MqttServerExtensions /// public static void StartMqtt(this NatsServer server) { - server.Warnf("MQTT listener not yet implemented; skipping MQTT startup"); + server.StartMqttListener(); } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index a0f9eb1..ad1653f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -79,9 +79,11 @@ public sealed partial class NatsServer _listener = null; } doneExpected += CloseWebsocketServer(); - if (_gateway.Enabled) + if (_mqttListener != null) { - // mqtt listener managed by session 22 + doneExpected++; + _mqttListener.Stop(); + _mqttListener = null; } if (_leafNodeListener != null) { @@ -505,7 +507,7 @@ public sealed partial class NatsServer leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null; leafErr = _leafNodeListenerErr; wsOk = opts.Websocket.Port == 0 || _websocket.Listener != null; - mqttOk = opts.Mqtt.Port == 0; + mqttOk = opts.Mqtt.Port == 0 || _mqttListener != null; _mu.ExitReadLock(); checks["server"] = (serverOk, serverErr); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Mqtt.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Mqtt.cs new file mode 100644 index 0000000..ef070c3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Mqtt.cs @@ -0,0 +1,183 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/mqtt.go in the NATS server Go source. + +using System.Net; +using System.Net.Sockets; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Mqtt; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // MQTT Listener and Client Creation + // Mirrors Go startMQTT() and createMQTTClient() in server/mqtt.go. + // ========================================================================= + + /// + /// Starts the MQTT TCP listener and accept loop. + /// Called from extension method. + /// Mirrors Go (*Server).startMQTT(). + /// + internal void StartMqttListener() + { + if (IsShuttingDown()) return; + + var opts = GetOpts(); + var port = opts.Mqtt.Port; + var host = opts.Mqtt.Host; + + if (string.IsNullOrEmpty(host)) + host = "0.0.0.0"; + + // RandomPort (-1) means ephemeral — pass 0 to TcpListener. + var listenPort = port < 0 ? 0 : port; + + TcpListener listener; + try + { + var addr = host == "0.0.0.0" || host == "::" + ? (host == "::" ? IPAddress.IPv6Any : IPAddress.Any) + : IPAddress.Parse(host); + + listener = new TcpListener(addr, listenPort); + listener.Start(); + } + catch (Exception ex) + { + Fatalf("Can't listen for MQTT client connections: {0}", ex.Message); + return; + } + + var ep = (IPEndPoint)listener.LocalEndpoint!; + var scheme = opts.Mqtt.TlsConfig != null ? "tls" : "mqtt"; + Noticef("Listening for MQTT client connections on {0}://{1}:{2}", scheme, host, ep.Port); + + // Write back resolved port if ephemeral (0 or -1). + if (port <= 0) + opts.Mqtt.Port = ep.Port; + + _mu.EnterWriteLock(); + _mqttListener = listener; + _mu.ExitWriteLock(); + + // Start accept loop in a goroutine. + _ = Task.Run(() => + { + AcceptConnections(listener, "Mqtt", tc => CreateMqttClient(tc)); + }); + } + + /// + /// Creates and registers a new MQTT client connection from the accepted TCP client. + /// Unlike NATS clients, MQTT clients do not receive an INFO line — the MQTT CONNECT + /// handshake is initiated by the client. + /// Mirrors Go createMQTTClient() in server/mqtt.go. + /// + private ClientConnection? CreateMqttClient(TcpClient tc) + { + var opts = GetOpts(); + var now = DateTime.UtcNow; + var nc = tc.GetStream(); + + var c = new ClientConnection(ClientKind.Client, this, nc) + { + Start = now, + Last = now, + Opts = ClientOptions.Default, + Headers = true, // MQTT always uses NATS headers for QoS metadata + }; + + c.InitMqtt(new MqttHandler + { + RejectQoS2Pub = opts.Mqtt.RejectQoS2Pub, + DowngradeQoS2Sub = opts.Mqtt.DowngradeQoS2Sub, + }); + + // Register with the global account. + var globalAcc = GlobalAccount(); + if (globalAcc != null) + { + try { c.RegisterWithAccount(globalAcc); } + catch (Exception ex) + { + c.ReportErrRegisterAccount(globalAcc, ex); + return null; + } + } + + // Register with the server — no INFO sent for MQTT clients. + _mu.EnterWriteLock(); + if (!IsRunning() || _ldm) + { + if (IsShuttingDown()) + nc.Close(); + _mu.ExitWriteLock(); + return c; + } + + _totalClients++; + + if (opts.MaxConn > 0 && _clients.Count >= opts.MaxConn) + { + _mu.ExitWriteLock(); + c.MaxConnExceeded(); + return null; + } + + _clients[c.Cid] = c; + _mu.ExitWriteLock(); + + lock (c) + { + if (c.IsClosed()) + { + c.CloseConnection(ClosedState.WriteError); + return null; + } + + c.InitClient(); + c.Debugf("MQTT client connection created"); + + // Set auth timer if authentication is required. + if (_info.AuthRequired) + { + c.Flags |= ClientFlags.ExpectConnect; + c.SetAuthTimer(TimeSpan.FromSeconds( + opts.Mqtt.AuthTimeout > 0 ? opts.Mqtt.AuthTimeout : opts.AuthTimeout)); + } + + c.SetPingTimer(); + + // Start read/write loops. + StartGoRoutine(() => c.ReadLoop(null)); + StartGoRoutine(() => c.WriteLoop()); + } + + return c; + } + + /// + /// Returns the MQTT listener address, or null if not listening. + /// Mirrors Go (*Server).MQTTAddr(). + /// + public IPEndPoint? MqttAddr() + { + _mu.EnterReadLock(); + try { return _mqttListener?.LocalEndpoint as IPEndPoint; } + finally { _mu.ExitReadLock(); } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index e0177f3..98ad7a3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -121,6 +121,9 @@ public sealed partial class NatsServer : INatsServer private System.Net.Sockets.TcpListener? _leafNodeListener; private Exception? _leafNodeListenerErr; + // MQTT listener + private System.Net.Sockets.TcpListener? _mqttListener; + // Profiling listener private System.Net.Sockets.TcpListener? _profiler; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs index fe44ff1..1cc98db 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs @@ -88,6 +88,78 @@ public sealed class ServerBootTests : IDisposable } } + /// + /// Validates that a server can boot with an MQTT listener on an ephemeral port, + /// accept a TCP connection on the MQTT port, and register it as a client. + /// + [Fact] + public async Task MqttBoot_AcceptsConnection_ShouldSucceed() + { + var opts = new ServerOptions + { + Host = "127.0.0.1", + Port = 0, + Mqtt = { Port = -1, Host = "127.0.0.1" }, + }; + + var (server, err) = NatsServer.NewServer(opts); + err.ShouldBeNull("NewServer should succeed"); + server.ShouldNotBeNull(); + + try + { + server!.Start(); + + // Verify MQTT listener is up + var mqttAddr = server.MqttAddr(); + mqttAddr.ShouldNotBeNull("MqttAddr should return the MQTT listener address"); + mqttAddr!.Port.ShouldBeGreaterThan(0); + + // ReadyForConnections should include MQTT + server.ReadyForConnections(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + + // Connect a raw TCP client to the MQTT port + using var tcp = new System.Net.Sockets.TcpClient(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await tcp.ConnectAsync(mqttAddr.Address, mqttAddr.Port, cts.Token); + tcp.Connected.ShouldBeTrue(); + + // Give CreateMqttClient a moment to register + await Task.Delay(100); + server.NumClients().ShouldBeGreaterThan(0); + } + finally + { + server!.Shutdown(); + } + } + + /// + /// Validates that an MQTT listener starts and shuts down cleanly. + /// + [Fact] + public void MqttBoot_StartAndShutdown_ShouldSucceed() + { + var opts = new ServerOptions + { + Host = "127.0.0.1", + Port = 0, + DontListen = true, + Mqtt = { Port = -1, Host = "127.0.0.1" }, + }; + + var (server, err) = NatsServer.NewServer(opts); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.Start(); + server.Running().ShouldBeTrue(); + server.MqttAddr().ShouldNotBeNull(); + + server.Shutdown(); + server.Running().ShouldBeFalse(); + } + /// /// Validates that Shutdown() after Start() completes cleanly. /// Uses DontListen to skip TCP binding — tests lifecycle only. diff --git a/reports/current.md b/reports/current.md index 9e15cc2..7f98db9 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 20:25:04 UTC +Generated: 2026-03-01 20:35:42 UTC ## Modules (12 total)