feat(mqtt): add MQTT listener, client creation, and shutdown wiring

Wire up the MQTT server-side orchestration layer (Task 1 of 7):
- Create NatsServer.Mqtt.cs with StartMqttListener(), CreateMqttClient(), MqttAddr()
- Forward MqttHandler.StartMqtt() to server.StartMqttListener()
- Add _mqttListener to Shutdown() doneExpected counting
- Fix ReadyForConnections to recognize active MQTT listener
- Handle RandomPort (-1) as ephemeral for MQTT listener
- Remove duplicate Mqtt field from ClientConnection.cs (already in ClientConnection.Mqtt.cs)
- Add 2 MQTT boot integration tests (accept + shutdown lifecycle)
This commit is contained in:
Joseph Doherty
2026-03-01 15:35:41 -05:00
parent 60bb56a90c
commit 6fb7f43335
7 changed files with 266 additions and 6 deletions

View File

@@ -1803,7 +1803,7 @@ public sealed partial class ClientConnection
// IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked)
// =========================================================================
internal bool IsMqtt() => false; // Deferred to session 22 (MQTT).
internal bool IsMqtt() => Mqtt != null;
internal bool IsWebSocket() => Ws != null;
internal bool IsHubLeafNode() => Kind == ClientKind.Leaf && Leaf?.IsSpoke != true;
internal string RemoteCluster() => Leaf?.RemoteCluster ?? string.Empty;

View File

@@ -135,7 +135,7 @@ internal static class MqttServerExtensions
/// </summary>
public static void StartMqtt(this NatsServer server)
{
server.Warnf("MQTT listener not yet implemented; skipping MQTT startup");
server.StartMqttListener();
}
/// <summary>

View File

@@ -79,9 +79,11 @@ public sealed partial class NatsServer
_listener = null;
}
doneExpected += CloseWebsocketServer();
if (_gateway.Enabled)
if (_mqttListener != null)
{
// mqtt listener managed by session 22
doneExpected++;
_mqttListener.Stop();
_mqttListener = null;
}
if (_leafNodeListener != null)
{
@@ -505,7 +507,7 @@ public sealed partial class NatsServer
leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null;
leafErr = _leafNodeListenerErr;
wsOk = opts.Websocket.Port == 0 || _websocket.Listener != null;
mqttOk = opts.Mqtt.Port == 0;
mqttOk = opts.Mqtt.Port == 0 || _mqttListener != null;
_mu.ExitReadLock();
checks["server"] = (serverOk, serverErr);

View File

@@ -0,0 +1,183 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/mqtt.go in the NATS server Go source.
using System.Net;
using System.Net.Sockets;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Mqtt;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
// =========================================================================
// MQTT Listener and Client Creation
// Mirrors Go startMQTT() and createMQTTClient() in server/mqtt.go.
// =========================================================================
/// <summary>
/// Starts the MQTT TCP listener and accept loop.
/// Called from <see cref="MqttServerExtensions.StartMqtt"/> extension method.
/// Mirrors Go <c>(*Server).startMQTT()</c>.
/// </summary>
internal void StartMqttListener()
{
if (IsShuttingDown()) return;
var opts = GetOpts();
var port = opts.Mqtt.Port;
var host = opts.Mqtt.Host;
if (string.IsNullOrEmpty(host))
host = "0.0.0.0";
// RandomPort (-1) means ephemeral — pass 0 to TcpListener.
var listenPort = port < 0 ? 0 : port;
TcpListener listener;
try
{
var addr = host == "0.0.0.0" || host == "::"
? (host == "::" ? IPAddress.IPv6Any : IPAddress.Any)
: IPAddress.Parse(host);
listener = new TcpListener(addr, listenPort);
listener.Start();
}
catch (Exception ex)
{
Fatalf("Can't listen for MQTT client connections: {0}", ex.Message);
return;
}
var ep = (IPEndPoint)listener.LocalEndpoint!;
var scheme = opts.Mqtt.TlsConfig != null ? "tls" : "mqtt";
Noticef("Listening for MQTT client connections on {0}://{1}:{2}", scheme, host, ep.Port);
// Write back resolved port if ephemeral (0 or -1).
if (port <= 0)
opts.Mqtt.Port = ep.Port;
_mu.EnterWriteLock();
_mqttListener = listener;
_mu.ExitWriteLock();
// Start accept loop in a goroutine.
_ = Task.Run(() =>
{
AcceptConnections(listener, "Mqtt", tc => CreateMqttClient(tc));
});
}
/// <summary>
/// Creates and registers a new MQTT client connection from the accepted TCP client.
/// Unlike NATS clients, MQTT clients do not receive an INFO line — the MQTT CONNECT
/// handshake is initiated by the client.
/// Mirrors Go <c>createMQTTClient()</c> in server/mqtt.go.
/// </summary>
private ClientConnection? CreateMqttClient(TcpClient tc)
{
var opts = GetOpts();
var now = DateTime.UtcNow;
var nc = tc.GetStream();
var c = new ClientConnection(ClientKind.Client, this, nc)
{
Start = now,
Last = now,
Opts = ClientOptions.Default,
Headers = true, // MQTT always uses NATS headers for QoS metadata
};
c.InitMqtt(new MqttHandler
{
RejectQoS2Pub = opts.Mqtt.RejectQoS2Pub,
DowngradeQoS2Sub = opts.Mqtt.DowngradeQoS2Sub,
});
// Register with the global account.
var globalAcc = GlobalAccount();
if (globalAcc != null)
{
try { c.RegisterWithAccount(globalAcc); }
catch (Exception ex)
{
c.ReportErrRegisterAccount(globalAcc, ex);
return null;
}
}
// Register with the server — no INFO sent for MQTT clients.
_mu.EnterWriteLock();
if (!IsRunning() || _ldm)
{
if (IsShuttingDown())
nc.Close();
_mu.ExitWriteLock();
return c;
}
_totalClients++;
if (opts.MaxConn > 0 && _clients.Count >= opts.MaxConn)
{
_mu.ExitWriteLock();
c.MaxConnExceeded();
return null;
}
_clients[c.Cid] = c;
_mu.ExitWriteLock();
lock (c)
{
if (c.IsClosed())
{
c.CloseConnection(ClosedState.WriteError);
return null;
}
c.InitClient();
c.Debugf("MQTT client connection created");
// Set auth timer if authentication is required.
if (_info.AuthRequired)
{
c.Flags |= ClientFlags.ExpectConnect;
c.SetAuthTimer(TimeSpan.FromSeconds(
opts.Mqtt.AuthTimeout > 0 ? opts.Mqtt.AuthTimeout : opts.AuthTimeout));
}
c.SetPingTimer();
// Start read/write loops.
StartGoRoutine(() => c.ReadLoop(null));
StartGoRoutine(() => c.WriteLoop());
}
return c;
}
/// <summary>
/// Returns the MQTT listener address, or null if not listening.
/// Mirrors Go <c>(*Server).MQTTAddr()</c>.
/// </summary>
public IPEndPoint? MqttAddr()
{
_mu.EnterReadLock();
try { return _mqttListener?.LocalEndpoint as IPEndPoint; }
finally { _mu.ExitReadLock(); }
}
}

View File

@@ -121,6 +121,9 @@ public sealed partial class NatsServer : INatsServer
private System.Net.Sockets.TcpListener? _leafNodeListener;
private Exception? _leafNodeListenerErr;
// MQTT listener
private System.Net.Sockets.TcpListener? _mqttListener;
// Profiling listener
private System.Net.Sockets.TcpListener? _profiler;

View File

@@ -88,6 +88,78 @@ public sealed class ServerBootTests : IDisposable
}
}
/// <summary>
/// Validates that a server can boot with an MQTT listener on an ephemeral port,
/// accept a TCP connection on the MQTT port, and register it as a client.
/// </summary>
[Fact]
public async Task MqttBoot_AcceptsConnection_ShouldSucceed()
{
var opts = new ServerOptions
{
Host = "127.0.0.1",
Port = 0,
Mqtt = { Port = -1, Host = "127.0.0.1" },
};
var (server, err) = NatsServer.NewServer(opts);
err.ShouldBeNull("NewServer should succeed");
server.ShouldNotBeNull();
try
{
server!.Start();
// Verify MQTT listener is up
var mqttAddr = server.MqttAddr();
mqttAddr.ShouldNotBeNull("MqttAddr should return the MQTT listener address");
mqttAddr!.Port.ShouldBeGreaterThan(0);
// ReadyForConnections should include MQTT
server.ReadyForConnections(TimeSpan.FromSeconds(5)).ShouldBeTrue();
// Connect a raw TCP client to the MQTT port
using var tcp = new System.Net.Sockets.TcpClient();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await tcp.ConnectAsync(mqttAddr.Address, mqttAddr.Port, cts.Token);
tcp.Connected.ShouldBeTrue();
// Give CreateMqttClient a moment to register
await Task.Delay(100);
server.NumClients().ShouldBeGreaterThan(0);
}
finally
{
server!.Shutdown();
}
}
/// <summary>
/// Validates that an MQTT listener starts and shuts down cleanly.
/// </summary>
[Fact]
public void MqttBoot_StartAndShutdown_ShouldSucceed()
{
var opts = new ServerOptions
{
Host = "127.0.0.1",
Port = 0,
DontListen = true,
Mqtt = { Port = -1, Host = "127.0.0.1" },
};
var (server, err) = NatsServer.NewServer(opts);
err.ShouldBeNull();
server.ShouldNotBeNull();
server!.Start();
server.Running().ShouldBeTrue();
server.MqttAddr().ShouldNotBeNull();
server.Shutdown();
server.Running().ShouldBeFalse();
}
/// <summary>
/// Validates that Shutdown() after Start() completes cleanly.
/// Uses DontListen to skip TCP binding — tests lifecycle only.

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-03-01 20:25:04 UTC
Generated: 2026-03-01 20:35:42 UTC
## Modules (12 total)