perf: optimize MQTT cross-protocol path (0.30x → 0.78x Go)

Replace per-message async fire-and-forget with direct-buffer write loop
mirroring NatsClient pattern: SpinLock-guarded buffer append, double-
buffer swap, single WriteAsync per batch.

- MqttConnection: add _directBuf/_writeBuf + RunMqttWriteLoopAsync
- MqttConnection: add EnqueuePublishNoFlush (zero-alloc PUBLISH format)
- MqttPacketWriter: add WritePublishTo(Span<byte>) + MeasurePublish
- MqttTopicMapper: add NatsToMqttBytes with bounded ConcurrentDictionary
- MqttNatsClientAdapter: synchronous SendMessageNoFlush + SignalFlush
- Skip FlushAsync on plain TCP sockets (TCP auto-flushes)
This commit is contained in:
Joseph Doherty
2026-03-13 14:25:13 -04:00
parent 699449da6a
commit 11e01b9026
14 changed files with 1113 additions and 10 deletions

View File

@@ -1,8 +1,10 @@
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Channels;
using NATS.Server.Auth;
using static NATS.Server.Mqtt.MqttBinaryDecoder;
@@ -26,6 +28,16 @@ public sealed class MqttConnection : IAsyncDisposable
private readonly Dictionary<string, string> _topicToSid = new(StringComparer.Ordinal);
private int _nextSid;
// Direct-buffer write loop for high-throughput MQTT message delivery.
// Mirrors the NatsClient _directBuf/_writeBuf + SpinLock + write-loop pattern.
private byte[] _directBuf = new byte[65536];
private byte[] _writeBuf = new byte[65536];
private int _directBufUsed;
private SpinLock _directBufLock = new(enableThreadOwnerTracking: false);
private readonly Channel<byte> _flushSignal = Channel.CreateBounded<byte>(
new BoundedChannelOptions(1) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.DropWrite });
private readonly bool _isPlainSocket;
/// <summary>Auth result after successful CONNECT (populated for AuthService path).</summary>
public AuthResult? AuthResult { get; private set; }
@@ -46,6 +58,7 @@ public sealed class MqttConnection : IAsyncDisposable
_stream = client.GetStream();
_listener = listener;
_useBinaryProtocol = useBinaryProtocol;
_isPlainSocket = true; // NetworkStream over TCP — plain socket
}
/// <summary>
@@ -56,6 +69,7 @@ public sealed class MqttConnection : IAsyncDisposable
_stream = stream;
_listener = listener;
_useBinaryProtocol = useBinaryProtocol;
_isPlainSocket = false; // Wrapped stream (TLS, test, etc.)
}
/// <summary>
@@ -68,6 +82,7 @@ public sealed class MqttConnection : IAsyncDisposable
_listener = listener;
_useBinaryProtocol = useBinaryProtocol;
_clientCert = clientCert;
_isPlainSocket = false; // TLS-wrapped stream
}
public async Task RunAsync(CancellationToken ct)
@@ -80,6 +95,9 @@ public sealed class MqttConnection : IAsyncDisposable
private async Task RunBinaryAsync(CancellationToken ct)
{
// Start the write loop alongside the read loop
var writeTask = RunMqttWriteLoopAsync(ct);
var pipeReader = PipeReader.Create(_stream, new StreamPipeReaderOptions(leaveOpen: true));
try
@@ -147,6 +165,10 @@ public sealed class MqttConnection : IAsyncDisposable
{
await pipeReader.CompleteAsync();
// Signal write loop to exit and wait for it
_flushSignal.Writer.TryComplete();
try { await writeTask; } catch { /* write loop may throw on cancel */ }
// Publish will message if not cleanly disconnected
if (_connected && !_willCleared && _connectInfo.WillTopic != null)
{
@@ -492,6 +514,98 @@ public sealed class MqttConnection : IAsyncDisposable
return WriteLineAsync($"MSG {topic} {payload}", ct);
}
/// <summary>
/// Enqueues an MQTT PUBLISH packet into the direct buffer under SpinLock.
/// Zero-allocation hot path — formats the packet directly into the buffer.
/// Called synchronously from the NATS delivery path (DeliverMessage).
/// </summary>
public void EnqueuePublishNoFlush(ReadOnlySpan<byte> topicUtf8, ReadOnlyMemory<byte> payload,
byte qos = 0, bool retain = false, ushort packetId = 0)
{
var totalLen = MqttPacketWriter.MeasurePublish(topicUtf8.Length, payload.Length, qos);
var lockTaken = false;
_directBufLock.Enter(ref lockTaken);
try
{
// Grow buffer if needed
var needed = _directBufUsed + totalLen;
if (needed > _directBuf.Length)
{
var newSize = Math.Max(_directBuf.Length * 2, needed);
var newBuf = new byte[newSize];
_directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf);
_directBuf = newBuf;
}
MqttPacketWriter.WritePublishTo(
_directBuf.AsSpan(_directBufUsed),
topicUtf8,
payload.Span,
qos, retain, dup: false, packetId);
_directBufUsed += totalLen;
}
finally
{
if (lockTaken) _directBufLock.Exit();
}
}
/// <summary>
/// Signals the write loop to flush buffered MQTT packets.
/// </summary>
public void SignalMqttFlush() => _flushSignal.Writer.TryWrite(0);
/// <summary>
/// Write loop that drains the direct buffer and writes to the stream in batches.
/// Mirrors NatsClient.RunWriteLoopAsync — swap buffers under SpinLock, single write+flush.
/// </summary>
private async Task RunMqttWriteLoopAsync(CancellationToken ct)
{
var flushReader = _flushSignal.Reader;
try
{
while (await flushReader.WaitToReadAsync(ct))
{
// Drain all pending signals
while (flushReader.TryRead(out _)) { }
// Swap buffers under SpinLock
int directLen = 0;
var lockTaken = false;
_directBufLock.Enter(ref lockTaken);
try
{
if (_directBufUsed > 0)
{
(_directBuf, _writeBuf) = (_writeBuf, _directBuf);
directLen = _directBufUsed;
_directBufUsed = 0;
}
}
finally
{
if (lockTaken) _directBufLock.Exit();
}
if (directLen > 0)
{
await _stream.WriteAsync(_writeBuf.AsMemory(0, directLen), ct);
// For plain TCP sockets (NetworkStream), TCP auto-flushes — skip FlushAsync.
// For TLS/wrapped streams, flush once per batch.
if (!_isPlainSocket)
await _stream.FlushAsync(ct);
}
}
}
catch (OperationCanceledException) { }
catch (IOException) { }
catch (ObjectDisposedException) { }
}
public async ValueTask DisposeAsync()
{
// Clean up adapter subscriptions and unregister from listener

View File

@@ -5,7 +5,6 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
using System.Text;
namespace NATS.Server.Mqtt;
@@ -35,27 +34,32 @@ public sealed class MqttNatsClientAdapter : INatsClient
/// <summary>
/// Delivers a NATS message to this MQTT client by translating the NATS subject
/// to an MQTT topic and writing a binary PUBLISH packet.
/// to an MQTT topic and enqueueing a PUBLISH packet into the direct buffer.
/// </summary>
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
var mqttTopic = MqttTopicMapper.NatsToMqtt(subject);
// Fire-and-forget async send; MQTT delivery is best-effort for QoS 0
_ = _connection.SendBinaryPublishAsync(mqttTopic, payload, qos: 0,
retain: false, packetId: 0, CancellationToken.None);
SendMessageNoFlush(subject, sid, replyTo, headers, payload);
SignalFlush();
}
/// <summary>
/// Enqueues an MQTT PUBLISH into the connection's direct buffer without flushing.
/// Uses cached topic bytes to avoid re-encoding. Zero allocation on the hot path.
/// </summary>
public void SendMessageNoFlush(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
// MQTT has no concept of deferred flush — deliver immediately
SendMessage(subject, sid, replyTo, headers, payload);
var topicBytes = MqttTopicMapper.NatsToMqttBytes(subject);
_connection.EnqueuePublishNoFlush(topicBytes, payload, qos: 0, retain: false, packetId: 0);
}
/// <summary>
/// Signals the MQTT connection's write loop to flush buffered packets.
/// </summary>
public void SignalFlush()
{
// No-op for MQTT — each packet is written and flushed immediately
_connection.SignalMqttFlush();
}
public bool QueueOutbound(ReadOnlyMemory<byte> data)

View File

@@ -146,6 +146,92 @@ public static class MqttPacketWriter
return Write(MqttControlPacketType.Publish, totalPayload, flags);
}
/// <summary>
/// Writes a complete MQTT PUBLISH packet directly into a destination span.
/// Returns the number of bytes written. Zero-allocation hot path for message delivery.
/// </summary>
public static int WritePublishTo(Span<byte> dest, ReadOnlySpan<byte> topicUtf8,
ReadOnlySpan<byte> payload, byte qos = 0, bool retain = false, bool dup = false, ushort packetId = 0)
{
// Calculate remaining length: 2 (topic len) + topic + optional 2 (packet id) + payload
var remainingLength = 2 + topicUtf8.Length + (qos > 0 ? 2 : 0) + payload.Length;
// Encode remaining length into scratch
Span<byte> rlScratch = stackalloc byte[4];
var rlLen = EncodeRemainingLengthTo(rlScratch, remainingLength);
var totalLen = 1 + rlLen + remainingLength;
// Fixed header byte
byte flags = 0;
if (dup) flags |= 0x08;
flags |= (byte)((qos & 0x03) << 1);
if (retain) flags |= 0x01;
dest[0] = (byte)(((byte)MqttControlPacketType.Publish << 4) | flags);
var pos = 1;
// Remaining length
rlScratch[..rlLen].CopyTo(dest[pos..]);
pos += rlLen;
// Topic name (length-prefixed)
BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], (ushort)topicUtf8.Length);
pos += 2;
topicUtf8.CopyTo(dest[pos..]);
pos += topicUtf8.Length;
// Packet ID (only for QoS > 0)
if (qos > 0)
{
BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], packetId);
pos += 2;
}
// Application payload
payload.CopyTo(dest[pos..]);
pos += payload.Length;
return totalLen;
}
/// <summary>
/// Calculates the total wire size of a PUBLISH packet without writing it.
/// </summary>
public static int MeasurePublish(int topicLen, int payloadLen, byte qos)
{
var remainingLength = 2 + topicLen + (qos > 0 ? 2 : 0) + payloadLen;
var rlLen = MeasureRemainingLength(remainingLength);
return 1 + rlLen + remainingLength;
}
internal static int EncodeRemainingLengthTo(Span<byte> dest, int value)
{
var index = 0;
do
{
var digit = (byte)(value % 128);
value /= 128;
if (value > 0)
digit |= 0x80;
dest[index++] = digit;
} while (value > 0);
return index;
}
internal static int MeasureRemainingLength(int value)
{
var count = 0;
do
{
value /= 128;
count++;
} while (value > 0);
return count;
}
internal static byte[] EncodeRemainingLength(int value)
{
if (value < 0 || value > MqttProtocolConstants.MaxPayloadSize)

View File

@@ -15,6 +15,7 @@
// '*' → '+'
// '>' → '#'
using System.Collections.Concurrent;
using System.Text;
namespace NATS.Server.Mqtt;
@@ -25,6 +26,36 @@ namespace NATS.Server.Mqtt;
/// </summary>
public static class MqttTopicMapper
{
private const int MaxCacheEntries = 4096;
private static readonly ConcurrentDictionary<string, byte[]> TopicBytesCache = new(StringComparer.Ordinal);
private static int _cacheCount;
/// <summary>
/// Returns the MQTT topic as pre-encoded UTF-8 bytes, using a bounded cache
/// to avoid repeated string translation and encoding on the hot path.
/// </summary>
public static byte[] NatsToMqttBytes(string natsSubject)
{
if (TopicBytesCache.TryGetValue(natsSubject, out var cached))
return cached;
var mqttTopic = NatsToMqtt(natsSubject);
var bytes = Encoding.UTF8.GetBytes(mqttTopic);
// Bounded cache — stop adding after limit to avoid unbounded growth
if (Interlocked.Increment(ref _cacheCount) <= MaxCacheEntries)
{
if (!TopicBytesCache.TryAdd(natsSubject, bytes))
Interlocked.Decrement(ref _cacheCount);
}
else
{
Interlocked.Decrement(ref _cacheCount);
}
return bytes;
}
// Escape sequence for dots that appear in MQTT topic names.
// Go uses _DOT_ internally to represent a literal dot in the NATS subject.
private const string DotEscape = "_DOT_";