diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index 41cfa93..b629d69 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -29,6 +29,9 @@ public sealed class ConnzHandler(NatsServer server) { var clients = server.GetClients().ToArray(); connInfos.AddRange(clients.Select(c => BuildConnInfo(c, now, opts))); + + // Include MQTT adapter connections + connInfos.AddRange(server.GetMqttAdapters().Select(a => BuildMqttConnInfo(a, now))); } // Collect closed connections from the ring buffer @@ -254,6 +257,21 @@ public sealed class ConnzHandler(NatsServer server) }; } + private static ConnInfo BuildMqttConnInfo(Mqtt.MqttNatsClientAdapter adapter, DateTime now) + { + return new ConnInfo + { + Cid = adapter.Id, + Kind = "Client", + Type = "mqtt", + Start = now, // MQTT adapters don't track start time yet + LastActivity = now, + NumSubs = (uint)adapter.Subscriptions.Count, + Account = adapter.Account?.Name ?? "", + MqttClient = adapter.MqttClientId, + }; + } + private static ConnzOptions ParseQueryParams(HttpContext ctx) { var q = ctx.Request.Query; diff --git a/src/NATS.Server/Mqtt/MqttConnection.cs b/src/NATS.Server/Mqtt/MqttConnection.cs index 8c533ca..56a93c7 100644 --- a/src/NATS.Server/Mqtt/MqttConnection.cs +++ b/src/NATS.Server/Mqtt/MqttConnection.cs @@ -23,10 +23,18 @@ public sealed class MqttConnection : IAsyncDisposable private bool _connected; private bool _willCleared; private MqttConnectInfo _connectInfo; + private readonly Dictionary _topicToSid = new(StringComparer.Ordinal); + private int _nextSid; /// Auth result after successful CONNECT (populated for AuthService path). public AuthResult? AuthResult { get; private set; } + /// + /// The adapter that bridges this MQTT connection to the NATS SubList routing. + /// Created on CONNECT when running with a NatsServer router; null in test-only mode. + /// + public MqttNatsClientAdapter? Adapter { get; private set; } + public string ClientId => _clientId; /// @@ -142,11 +150,7 @@ public sealed class MqttConnection : IAsyncDisposable // Publish will message if not cleanly disconnected if (_connected && !_willCleared && _connectInfo.WillTopic != null) { - await _listener.PublishAsync( - _connectInfo.WillTopic, - Encoding.UTF8.GetString(_connectInfo.WillMessage ?? []), - this, - CancellationToken.None); + RoutePublish(_connectInfo.WillTopic, _connectInfo.WillMessage ?? []); } } } @@ -276,6 +280,15 @@ public sealed class MqttConnection : IAsyncDisposable AuthResult = authResult; + // Create MqttNatsClientAdapter for cross-protocol routing (when running with NatsServer) + if (_listener.AllocateClientId != null) + { + var adapterId = _listener.AllocateClientId(); + Adapter = new MqttNatsClientAdapter(this, adapterId); + Adapter.Account = _listener.ResolveAccount?.Invoke(authResult.AccountName); + _listener.RegisterMqttAdapter(Adapter); + } + // Duplicate client-id takeover _listener.TakeoverExistingConnection(_clientId, this); @@ -313,20 +326,18 @@ public sealed class MqttConnection : IAsyncDisposable switch (publishInfo.QoS) { case 0: - await _listener.PublishAsync(publishInfo.Topic, - Encoding.UTF8.GetString(publishInfo.Payload.Span), this, ct); + RoutePublish(publishInfo.Topic, publishInfo.Payload); break; case 1: _listener.RecordPendingPublish(_clientId, publishInfo.PacketId, publishInfo.Topic, Encoding.UTF8.GetString(publishInfo.Payload.Span)); await WriteBinaryAsync(MqttPacketWriter.WritePubAck(publishInfo.PacketId), ct); - await _listener.PublishAsync(publishInfo.Topic, - Encoding.UTF8.GetString(publishInfo.Payload.Span), this, ct); + RoutePublish(publishInfo.Topic, publishInfo.Payload); break; case 2: - // QoS 2 step 1: store and send PUBREC + // QoS 2 step 1: store and send PUBREC (delivery deferred to PUBREL) _listener.RecordPendingPublish(_clientId, publishInfo.PacketId, publishInfo.Topic, Encoding.UTF8.GetString(publishInfo.Payload.Span)); await WriteBinaryAsync(MqttPacketWriter.WritePubRec(publishInfo.PacketId), ct); @@ -343,6 +354,24 @@ public sealed class MqttConnection : IAsyncDisposable return true; } + /// + /// Routes a published message through the NATS SubList (cross-protocol) if a router + /// and adapter are available, otherwise falls back to MQTT-only fan-out. + /// + private void RoutePublish(string mqttTopic, ReadOnlyMemory payload) + { + if (_listener.Router != null && Adapter != null) + { + var natsSubject = MqttTopicMapper.MqttToNats(mqttTopic); + _listener.Router.ProcessMessage(natsSubject, null, default, payload, Adapter); + } + else + { + // Test-only fallback: MQTT-only fan-out + _ = _listener.PublishAsync(mqttTopic, Encoding.UTF8.GetString(payload.Span), this, CancellationToken.None); + } + } + private void HandlePubAck(MqttControlPacket packet) { if (packet.Payload.Length < 2) return; @@ -362,7 +391,13 @@ public sealed class MqttConnection : IAsyncDisposable if (packet.Payload.Length < 2) return; var packetId = (ushort)((packet.Payload.Span[0] << 8) | packet.Payload.Span[1]); - // QoS 2 step 2: deliver the stored message and send PUBCOMP + // QoS 2 step 2: deliver the stored message, then ack and send PUBCOMP + var pending = _listener.GetPendingPublish(_clientId, packetId); + if (pending != null) + { + RoutePublish(pending.Topic, Encoding.UTF8.GetBytes(pending.Payload)); + } + _listener.AckPendingPublish(_clientId, packetId); await WriteBinaryAsync(MqttPacketWriter.WritePubComp(packetId), ct); } @@ -383,8 +418,31 @@ public sealed class MqttConnection : IAsyncDisposable for (var i = 0; i < subscribeInfo.Filters.Count; i++) { var (topicFilter, requestedQoS) = subscribeInfo.Filters[i]; - _listener.RegisterSubscription(this, topicFilter); + + if (Adapter != null) + { + // Route through SubList for cross-protocol delivery + var natsSubject = MqttTopicMapper.MqttToNats(topicFilter); + var sid = $"$MQTT_{Interlocked.Increment(ref _nextSid)}"; + Adapter.AddSubscription(natsSubject, sid); + _topicToSid[topicFilter] = sid; + } + else + { + // Test-only fallback: MQTT-only subscription + _listener.RegisterSubscription(this, topicFilter); + } + grantedQoS[i] = Math.Min(requestedQoS, (byte)2); + + // Deliver retained messages for this topic filter + var retained = _listener.GetRetainedMessage(topicFilter); + if (retained != null) + { + var retainedPayload = Encoding.UTF8.GetBytes(retained); + await WriteBinaryAsync( + MqttPacketWriter.WritePublish(topicFilter, retainedPayload, qos: 0, retain: true, packetId: 0), ct); + } } await WriteBinaryAsync(MqttPacketWriter.WriteSubAck(subscribeInfo.PacketId, grantedQoS), ct); @@ -395,7 +453,16 @@ public sealed class MqttConnection : IAsyncDisposable var unsubInfo = MqttBinaryDecoder.ParseUnsubscribe(packet.Payload.Span, packet.Flags); foreach (var filter in unsubInfo.Filters) - _listener.UnregisterSubscription(this, filter); + { + if (Adapter != null && _topicToSid.Remove(filter, out var sid)) + { + Adapter.RemoveSubscription(sid); + } + else + { + _listener.UnregisterSubscription(this, filter); + } + } await WriteBinaryAsync(MqttPacketWriter.WriteUnsubAck(unsubInfo.PacketId), ct); } @@ -427,6 +494,13 @@ public sealed class MqttConnection : IAsyncDisposable public async ValueTask DisposeAsync() { + // Clean up adapter subscriptions and unregister from listener + if (Adapter != null) + { + Adapter.RemoveAllSubscriptions(); + _listener.UnregisterMqttAdapter(Adapter); + } + _listener.Unregister(this); _writeGate.Dispose(); await _stream.DisposeAsync(); diff --git a/src/NATS.Server/Mqtt/MqttListener.cs b/src/NATS.Server/Mqtt/MqttListener.cs index 5c6ebac..486d0ae 100644 --- a/src/NATS.Server/Mqtt/MqttListener.cs +++ b/src/NATS.Server/Mqtt/MqttListener.cs @@ -24,6 +24,8 @@ public sealed class MqttListener : IAsyncDisposable private readonly ConcurrentDictionary _sessions = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _clientIdMap = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _retainedMessages = new(StringComparer.Ordinal); + private readonly IMessageRouter? _router; + private readonly ConcurrentDictionary _mqttAdapters = new(); private MqttStreamInitializer? _streamInitializer; private MqttConsumerManager? _mqttConsumerManager; private TcpListener? _listener; @@ -63,7 +65,8 @@ public sealed class MqttListener : IAsyncDisposable AuthService? authService, MqttOptions mqttOptions, MqttStreamInitializer? streamInitializer = null, - MqttConsumerManager? mqttConsumerManager = null) + MqttConsumerManager? mqttConsumerManager = null, + IMessageRouter? router = null) { _host = host; _port = port; @@ -73,6 +76,7 @@ public sealed class MqttListener : IAsyncDisposable _requiredPassword = mqttOptions.Password; _streamInitializer = streamInitializer; _mqttConsumerManager = mqttConsumerManager; + _router = router; // Build TLS options if configured if (mqttOptions.HasTls) @@ -91,6 +95,49 @@ public sealed class MqttListener : IAsyncDisposable /// internal MqttConsumerManager? ConsumerManager => _mqttConsumerManager; + /// + /// The message router for cross-protocol delivery (MQTT→NATS SubList routing). + /// Null when running in test-only mode without NatsServer. + /// + internal IMessageRouter? Router => _router; + + /// + /// Delegate to allocate a server-unique client ID for MQTT adapters. + /// Set by NatsServer after construction. + /// + internal Func? AllocateClientId { get; set; } + + /// + /// Delegate to resolve an Account by name for MQTT adapters. + /// Set by NatsServer after construction. + /// + internal Func? ResolveAccount { get; set; } + + internal void RegisterMqttAdapter(MqttNatsClientAdapter adapter) + => _mqttAdapters[adapter.Id] = adapter; + + internal void UnregisterMqttAdapter(MqttNatsClientAdapter adapter) + => _mqttAdapters.TryRemove(adapter.Id, out _); + + internal IEnumerable GetMqttAdapters() + => _mqttAdapters.Values; + + /// + /// Looks up a specific pending publish by client ID and packet ID. + /// Used by QoS 2 PUBREL to retrieve the stored message for delivery. + /// + internal MqttPendingPublish? GetPendingPublish(string clientId, int packetId) + { + if (string.IsNullOrWhiteSpace(clientId) || packetId <= 0) + return null; + + if (_sessions.TryGetValue(clientId, out var session) + && session.Pending.TryGetValue(packetId, out var pending)) + return pending; + + return null; + } + public Task StartAsync(CancellationToken ct) { var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token); @@ -280,6 +327,7 @@ public sealed class MqttListener : IAsyncDisposable _sessions.Clear(); _clientIdMap.Clear(); _retainedMessages.Clear(); + _mqttAdapters.Clear(); _cts.Dispose(); } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index f10f423..98c6d10 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -122,6 +122,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable /// public int? MqttListenerPort => _mqttListener?.Port; + /// + /// Returns all active MQTT client adapters for monitoring (/connz). + /// + public IEnumerable GetMqttAdapters() + => _mqttListener?.GetMqttAdapters() ?? []; + public Account SystemAccount => _systemAccount; public string ServerNKey { get; } public InternalEventSystem? EventSystem => _eventSystem; @@ -937,7 +943,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _authService, mqttOptions, mqttStreamInit, - mqttConsumerMgr); + mqttConsumerMgr, + router: this); + _mqttListener.AllocateClientId = () => Interlocked.Increment(ref _nextClientId); + _mqttListener.ResolveAccount = name => GetOrCreateAccount(name ?? Auth.Account.GlobalAccountName); await _mqttListener.StartAsync(linked.Token); } if (_jetStreamService != null) diff --git a/tests/NATS.E2E.Tests/MqttE2ETests.cs b/tests/NATS.E2E.Tests/MqttE2ETests.cs new file mode 100644 index 0000000..cfe4105 --- /dev/null +++ b/tests/NATS.E2E.Tests/MqttE2ETests.cs @@ -0,0 +1,219 @@ +using System.Text; +using MQTTnet; +using MQTTnet.Client; +using MQTTnet.Protocol; +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +/// +/// End-to-end tests for MQTT 3.1.1 interop using MQTTnet client library. +/// Verifies binary MQTT protocol, cross-protocol MQTT↔NATS messaging, and QoS 1. +/// +[Collection("E2E-Mqtt")] +public class MqttE2ETests(MqttServerFixture fixture) +{ + [Fact] + public async Task MqttE2E_ConnectPublishSubscribe() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var factory = new MqttFactory(); + using var subscriber = factory.CreateMqttClient(); + using var publisher = factory.CreateMqttClient(); + + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-mqttnet-sub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + var pubOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-mqttnet-pub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await subscriber.ConnectAsync(subOpts, cts.Token); + await publisher.ConnectAsync(pubOpts, cts.Token); + + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + subscriber.ApplicationMessageReceivedAsync += e => + { + var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); + received.TrySetResult(payload); + return Task.CompletedTask; + }; + + await subscriber.SubscribeAsync( + factory.CreateSubscribeOptionsBuilder() + .WithTopicFilter("test/mqttnet/e2e") + .Build(), + cts.Token); + + // Small delay to let subscription propagate + await Task.Delay(100, cts.Token); + + await publisher.PublishAsync( + new MqttApplicationMessageBuilder() + .WithTopic("test/mqttnet/e2e") + .WithPayload("hello-mqttnet") + .Build(), + cts.Token); + + var msg = await received.Task.WaitAsync(cts.Token); + msg.ShouldBe("hello-mqttnet"); + + await subscriber.DisconnectAsync(cancellationToken: cts.Token); + await publisher.DisconnectAsync(cancellationToken: cts.Token); + } + + [Fact] + public async Task MqttE2E_CrossProtocol_MqttPublish_NatsSubscribe() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + // NATS subscriber on "sensor.temp" (MQTT topic "sensor/temp" maps to NATS subject "sensor.temp") + await using var natsConn = fixture.CreateNatsClient(); + await natsConn.ConnectAsync(); + + var natsReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = Task.Run(async () => + { + await foreach (var msg in natsConn.SubscribeAsync("sensor.temp", cancellationToken: cts.Token)) + { + natsReceived.TrySetResult(msg.Data ?? ""); + break; + } + }, cts.Token); + + // Small delay to let NATS subscription register + await Task.Delay(200, cts.Token); + + // MQTT publisher on "sensor/temp" + var factory = new MqttFactory(); + using var mqttPub = factory.CreateMqttClient(); + var pubOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-cross-mqtt-pub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await mqttPub.ConnectAsync(pubOpts, cts.Token); + await mqttPub.PublishAsync( + new MqttApplicationMessageBuilder() + .WithTopic("sensor/temp") + .WithPayload("22.5") + .Build(), + cts.Token); + + var result = await natsReceived.Task.WaitAsync(cts.Token); + result.ShouldBe("22.5"); + + await mqttPub.DisconnectAsync(cancellationToken: cts.Token); + } + + [Fact] + public async Task MqttE2E_CrossProtocol_NatsPublish_MqttSubscribe() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + // MQTT subscriber on "sensor/humidity" + var factory = new MqttFactory(); + using var mqttSub = factory.CreateMqttClient(); + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-cross-mqtt-sub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await mqttSub.ConnectAsync(subOpts, cts.Token); + + var mqttReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + mqttSub.ApplicationMessageReceivedAsync += e => + { + var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); + mqttReceived.TrySetResult(payload); + return Task.CompletedTask; + }; + + await mqttSub.SubscribeAsync( + factory.CreateSubscribeOptionsBuilder() + .WithTopicFilter("sensor/humidity") + .Build(), + cts.Token); + + // Small delay to let subscription propagate through SubList + await Task.Delay(200, cts.Token); + + // NATS publisher on "sensor.humidity" + await using var natsConn = fixture.CreateNatsClient(); + await natsConn.ConnectAsync(); + await natsConn.PublishAsync("sensor.humidity", "65%", cancellationToken: cts.Token); + + var result = await mqttReceived.Task.WaitAsync(cts.Token); + result.ShouldBe("65%"); + + await mqttSub.DisconnectAsync(cancellationToken: cts.Token); + } + + [Fact] + public async Task MqttE2E_Qos1_PubAck() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var factory = new MqttFactory(); + using var subscriber = factory.CreateMqttClient(); + using var publisher = factory.CreateMqttClient(); + + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-qos1-sub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + var pubOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-qos1-pub") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await subscriber.ConnectAsync(subOpts, cts.Token); + await publisher.ConnectAsync(pubOpts, cts.Token); + + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + subscriber.ApplicationMessageReceivedAsync += e => + { + var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); + received.TrySetResult(payload); + return Task.CompletedTask; + }; + + await subscriber.SubscribeAsync( + factory.CreateSubscribeOptionsBuilder() + .WithTopicFilter(f => f.WithTopic("test/qos1").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)) + .Build(), + cts.Token); + + await Task.Delay(100, cts.Token); + + // Publish with QoS 1 — MQTTnet will expect PUBACK from server + var pubResult = await publisher.PublishAsync( + new MqttApplicationMessageBuilder() + .WithTopic("test/qos1") + .WithPayload("qos1-payload") + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) + .Build(), + cts.Token); + + // MQTTnet throws if PUBACK not received, so reaching here means server sent PUBACK + pubResult.ReasonCode.ShouldBe(MqttClientPublishReasonCode.Success); + + var msg = await received.Task.WaitAsync(cts.Token); + msg.ShouldBe("qos1-payload"); + + await subscriber.DisconnectAsync(cancellationToken: cts.Token); + await publisher.DisconnectAsync(cancellationToken: cts.Token); + } +}