diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index 4ed9883..a0fd7dc 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -1,6 +1,6 @@ # Go vs .NET NATS Server — Benchmark Comparison -Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. +Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. **Environment:** Apple M4, .NET SDK 10.0.101, benchmark README command run in the benchmark project's default `Debug` configuration, Go toolchain installed, Go reference server built from `golang/nats-server/`. @@ -75,6 +75,37 @@ Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ra --- +## MQTT Throughput + +| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|-----------|----------|---------|------------|-----------|-----------------| +| MQTT PubSub (128B, QoS 0) | 34,224 | 4.2 | 44,142 | 5.4 | **1.29x** | +| Cross-Protocol NATS→MQTT (128B) | 118,322 | 14.4 | 92,485 | 11.3 | 0.78x | + +> **Note:** Pure MQTT pub/sub remains above Go at 1.29x. Cross-protocol NATS→MQTT improved from 0.30x to 0.78x after adopting direct-buffer write loop + zero-alloc PUBLISH formatting + topic cache (matching the NatsClient batching pattern). The remaining gap is likely due to Go's writev() scatter-gather and goroutine-level parallelism in message routing. + +--- + +## Transport Overhead + +### TLS + +| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|-----------|----------|---------|------------|-----------|-----------------| +| TLS PubSub 1:1 (128B) | 289,548 | 35.3 | 251,935 | 30.8 | 0.87x | +| TLS Pub-Only (128B) | 1,782,442 | 217.6 | 1,163,021 | 142.0 | 0.65x | + +### WebSocket + +| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|-----------|----------|---------|------------|-----------|-----------------| +| WS PubSub 1:1 (128B) | 66,584 | 8.1 | 73,023 | 8.9 | **1.10x** | +| WS Pub-Only (128B) | 106,302 | 13.0 | 88,682 | 10.8 | 0.83x | + +> **Note:** TLS pub/sub is close to parity at 0.87x. WebSocket pub/sub slightly favors .NET at 1.10x. Both WebSocket numbers are lower than plaintext due to WS framing overhead. + +--- + ## Hot Path Microbenchmarks (.NET only) ### SubList @@ -119,6 +150,12 @@ Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ra | JetStream async file publish | 0.39x | Improved versus older snapshots, still storage-bound | | JetStream ordered consume | 0.18x | Highest-priority JetStream gap | | JetStream durable fetch | 0.56x | Regressed from prior snapshot | +| MQTT pub/sub | **1.29x** | .NET outperforms Go | +| MQTT cross-protocol | 0.78x | Improved from 0.30x via direct-buffer write loop | +| TLS pub/sub | 0.87x | Close to parity | +| TLS pub-only | 0.65x | Encryption throughput gap | +| WebSocket pub/sub | **1.10x** | .NET slightly ahead | +| WebSocket pub-only | 0.83x | Good | ### Key Observations @@ -128,11 +165,26 @@ Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ra 4. **Ordered consumer throughput remains the largest JetStream gap at 0.18x** (102K vs 573K msg/s). That is better than the intermediate 0.11x run, but it is still the clearest post-FileStore optimization target. 5. **Durable fetch regressed to 0.56x in the final run**, which keeps consumer delivery and storage-read coordination in the top tier of remaining work even after the FileStore changes. 6. **Parser and SubList microbenchmarks remain stable and low-allocation**. The storage and consumer layers continue to dominate the server-level benchmark gaps, not the parser or subject matcher hot paths. +7. **Pure MQTT pub/sub shows .NET outperforming Go at 1.29x** (44K vs 34K msg/s). The .NET MQTT protocol bridge is competitive for direct MQTT-to-MQTT messaging. +8. **MQTT cross-protocol routing (NATS→MQTT) improved from 0.30x to 0.78x** (92K vs 118K msg/s) after adopting the same direct-buffer write loop pattern used by NatsClient: SpinLock-guarded buffer append, double-buffer swap, single write per batch, plus zero-alloc MQTT PUBLISH formatting and cached topic-to-bytes translation. +9. **TLS pub/sub is close to parity at 0.87x** (252K vs 290K msg/s). TLS pub-only is 0.65x (1.16M vs 1.78M msg/s), consistent with the general publish-path gap seen in plaintext benchmarks. +10. **WebSocket pub/sub slightly favors .NET at 1.10x** (73K vs 67K msg/s). WebSocket pub-only is 0.83x (89K vs 106K msg/s). Both servers show similar WS framing overhead relative to their plaintext performance. --- ## Optimization History +### Round 7: MQTT Cross-Protocol Write Path + +Four optimizations targeting the NATS→MQTT delivery hot path (cross-protocol throughput improved from 0.30x to 0.78x): + +| # | Root Cause | Fix | Impact | +|---|-----------|-----|--------| +| 24 | **Per-message async fire-and-forget in MqttNatsClientAdapter** — each `SendMessage` called `SendBinaryPublishAsync` which acquired a `SemaphoreSlim`, allocated a full PUBLISH packet `byte[]`, wrote, and flushed the stream — all per message, bypassing the server's deferred-flush batching | Replaced with synchronous `EnqueuePublishNoFlush()` that formats MQTT PUBLISH directly into `_directBuf` under SpinLock, matching the NatsClient pattern; `SignalFlush()` signals the write loop for batch flush | Eliminates async Task + SemaphoreSlim + per-message flush | +| 25 | **Per-message `byte[]` allocation for MQTT PUBLISH packets** — `MqttPacketWriter.WritePublish()` allocated topic bytes, variable header, remaining-length array, and full packet array on every delivery | Added `WritePublishTo(Span)` that formats the entire PUBLISH packet directly into the destination span using `Span` operations — zero heap allocation | Eliminates 4+ `byte[]` allocs per delivery | +| 26 | **Per-message NATS→MQTT topic translation** — `NatsToMqtt()` allocated a `StringBuilder`, produced a `string`, then `Encoding.UTF8.GetBytes()` re-encoded it on every delivery | Added `NatsToMqttBytes()` with bounded `ConcurrentDictionary` cache (4096 entries); cached result includes pre-encoded UTF-8 bytes | Eliminates string + encoding alloc per delivery for cached topics | +| 27 | **Per-message `FlushAsync` on plain TCP sockets** — `WriteBinaryAsync` flushed after every packet write, even on `NetworkStream` where TCP auto-flushes | Write loop skips `FlushAsync` for plain sockets; for TLS/wrapped streams, flushes once per batch (not per message) | Reduces syscalls from 2N to 1 per batch | + ### Round 6: Batch Flush Signaling + Fetch Optimizations Four optimizations targeting fan-out and consumer fetch hot paths: diff --git a/src/NATS.Server/Mqtt/MqttConnection.cs b/src/NATS.Server/Mqtt/MqttConnection.cs index 56a93c7..c2121ba 100644 --- a/src/NATS.Server/Mqtt/MqttConnection.cs +++ b/src/NATS.Server/Mqtt/MqttConnection.cs @@ -1,8 +1,10 @@ using System.Buffers; +using System.Buffers.Binary; using System.IO.Pipelines; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Threading.Channels; using NATS.Server.Auth; using static NATS.Server.Mqtt.MqttBinaryDecoder; @@ -26,6 +28,16 @@ public sealed class MqttConnection : IAsyncDisposable private readonly Dictionary _topicToSid = new(StringComparer.Ordinal); private int _nextSid; + // Direct-buffer write loop for high-throughput MQTT message delivery. + // Mirrors the NatsClient _directBuf/_writeBuf + SpinLock + write-loop pattern. + private byte[] _directBuf = new byte[65536]; + private byte[] _writeBuf = new byte[65536]; + private int _directBufUsed; + private SpinLock _directBufLock = new(enableThreadOwnerTracking: false); + private readonly Channel _flushSignal = Channel.CreateBounded( + new BoundedChannelOptions(1) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.DropWrite }); + private readonly bool _isPlainSocket; + /// Auth result after successful CONNECT (populated for AuthService path). public AuthResult? AuthResult { get; private set; } @@ -46,6 +58,7 @@ public sealed class MqttConnection : IAsyncDisposable _stream = client.GetStream(); _listener = listener; _useBinaryProtocol = useBinaryProtocol; + _isPlainSocket = true; // NetworkStream over TCP — plain socket } /// @@ -56,6 +69,7 @@ public sealed class MqttConnection : IAsyncDisposable _stream = stream; _listener = listener; _useBinaryProtocol = useBinaryProtocol; + _isPlainSocket = false; // Wrapped stream (TLS, test, etc.) } /// @@ -68,6 +82,7 @@ public sealed class MqttConnection : IAsyncDisposable _listener = listener; _useBinaryProtocol = useBinaryProtocol; _clientCert = clientCert; + _isPlainSocket = false; // TLS-wrapped stream } public async Task RunAsync(CancellationToken ct) @@ -80,6 +95,9 @@ public sealed class MqttConnection : IAsyncDisposable private async Task RunBinaryAsync(CancellationToken ct) { + // Start the write loop alongside the read loop + var writeTask = RunMqttWriteLoopAsync(ct); + var pipeReader = PipeReader.Create(_stream, new StreamPipeReaderOptions(leaveOpen: true)); try @@ -147,6 +165,10 @@ public sealed class MqttConnection : IAsyncDisposable { await pipeReader.CompleteAsync(); + // Signal write loop to exit and wait for it + _flushSignal.Writer.TryComplete(); + try { await writeTask; } catch { /* write loop may throw on cancel */ } + // Publish will message if not cleanly disconnected if (_connected && !_willCleared && _connectInfo.WillTopic != null) { @@ -492,6 +514,98 @@ public sealed class MqttConnection : IAsyncDisposable return WriteLineAsync($"MSG {topic} {payload}", ct); } + /// + /// Enqueues an MQTT PUBLISH packet into the direct buffer under SpinLock. + /// Zero-allocation hot path — formats the packet directly into the buffer. + /// Called synchronously from the NATS delivery path (DeliverMessage). + /// + public void EnqueuePublishNoFlush(ReadOnlySpan topicUtf8, ReadOnlyMemory payload, + byte qos = 0, bool retain = false, ushort packetId = 0) + { + var totalLen = MqttPacketWriter.MeasurePublish(topicUtf8.Length, payload.Length, qos); + + var lockTaken = false; + _directBufLock.Enter(ref lockTaken); + try + { + // Grow buffer if needed + var needed = _directBufUsed + totalLen; + if (needed > _directBuf.Length) + { + var newSize = Math.Max(_directBuf.Length * 2, needed); + var newBuf = new byte[newSize]; + _directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf); + _directBuf = newBuf; + } + + MqttPacketWriter.WritePublishTo( + _directBuf.AsSpan(_directBufUsed), + topicUtf8, + payload.Span, + qos, retain, dup: false, packetId); + + _directBufUsed += totalLen; + } + finally + { + if (lockTaken) _directBufLock.Exit(); + } + } + + /// + /// Signals the write loop to flush buffered MQTT packets. + /// + public void SignalMqttFlush() => _flushSignal.Writer.TryWrite(0); + + /// + /// Write loop that drains the direct buffer and writes to the stream in batches. + /// Mirrors NatsClient.RunWriteLoopAsync — swap buffers under SpinLock, single write+flush. + /// + private async Task RunMqttWriteLoopAsync(CancellationToken ct) + { + var flushReader = _flushSignal.Reader; + + try + { + while (await flushReader.WaitToReadAsync(ct)) + { + // Drain all pending signals + while (flushReader.TryRead(out _)) { } + + // Swap buffers under SpinLock + int directLen = 0; + var lockTaken = false; + _directBufLock.Enter(ref lockTaken); + try + { + if (_directBufUsed > 0) + { + (_directBuf, _writeBuf) = (_writeBuf, _directBuf); + directLen = _directBufUsed; + _directBufUsed = 0; + } + } + finally + { + if (lockTaken) _directBufLock.Exit(); + } + + if (directLen > 0) + { + await _stream.WriteAsync(_writeBuf.AsMemory(0, directLen), ct); + + // For plain TCP sockets (NetworkStream), TCP auto-flushes — skip FlushAsync. + // For TLS/wrapped streams, flush once per batch. + if (!_isPlainSocket) + await _stream.FlushAsync(ct); + } + } + } + catch (OperationCanceledException) { } + catch (IOException) { } + catch (ObjectDisposedException) { } + } + public async ValueTask DisposeAsync() { // Clean up adapter subscriptions and unregister from listener diff --git a/src/NATS.Server/Mqtt/MqttNatsClientAdapter.cs b/src/NATS.Server/Mqtt/MqttNatsClientAdapter.cs index af57ba8..a651857 100644 --- a/src/NATS.Server/Mqtt/MqttNatsClientAdapter.cs +++ b/src/NATS.Server/Mqtt/MqttNatsClientAdapter.cs @@ -5,7 +5,6 @@ using NATS.Server.Auth; using NATS.Server.Protocol; using NATS.Server.Subscriptions; -using System.Text; namespace NATS.Server.Mqtt; @@ -35,27 +34,32 @@ public sealed class MqttNatsClientAdapter : INatsClient /// /// Delivers a NATS message to this MQTT client by translating the NATS subject - /// to an MQTT topic and writing a binary PUBLISH packet. + /// to an MQTT topic and enqueueing a PUBLISH packet into the direct buffer. /// public void SendMessage(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) { - var mqttTopic = MqttTopicMapper.NatsToMqtt(subject); - // Fire-and-forget async send; MQTT delivery is best-effort for QoS 0 - _ = _connection.SendBinaryPublishAsync(mqttTopic, payload, qos: 0, - retain: false, packetId: 0, CancellationToken.None); + SendMessageNoFlush(subject, sid, replyTo, headers, payload); + SignalFlush(); } + /// + /// Enqueues an MQTT PUBLISH into the connection's direct buffer without flushing. + /// Uses cached topic bytes to avoid re-encoding. Zero allocation on the hot path. + /// public void SendMessageNoFlush(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) { - // MQTT has no concept of deferred flush — deliver immediately - SendMessage(subject, sid, replyTo, headers, payload); + var topicBytes = MqttTopicMapper.NatsToMqttBytes(subject); + _connection.EnqueuePublishNoFlush(topicBytes, payload, qos: 0, retain: false, packetId: 0); } + /// + /// Signals the MQTT connection's write loop to flush buffered packets. + /// public void SignalFlush() { - // No-op for MQTT — each packet is written and flushed immediately + _connection.SignalMqttFlush(); } public bool QueueOutbound(ReadOnlyMemory data) diff --git a/src/NATS.Server/Mqtt/MqttPacketWriter.cs b/src/NATS.Server/Mqtt/MqttPacketWriter.cs index e27c288..73ca3f3 100644 --- a/src/NATS.Server/Mqtt/MqttPacketWriter.cs +++ b/src/NATS.Server/Mqtt/MqttPacketWriter.cs @@ -146,6 +146,92 @@ public static class MqttPacketWriter return Write(MqttControlPacketType.Publish, totalPayload, flags); } + /// + /// Writes a complete MQTT PUBLISH packet directly into a destination span. + /// Returns the number of bytes written. Zero-allocation hot path for message delivery. + /// + public static int WritePublishTo(Span dest, ReadOnlySpan topicUtf8, + ReadOnlySpan payload, byte qos = 0, bool retain = false, bool dup = false, ushort packetId = 0) + { + // Calculate remaining length: 2 (topic len) + topic + optional 2 (packet id) + payload + var remainingLength = 2 + topicUtf8.Length + (qos > 0 ? 2 : 0) + payload.Length; + + // Encode remaining length into scratch + Span rlScratch = stackalloc byte[4]; + var rlLen = EncodeRemainingLengthTo(rlScratch, remainingLength); + + var totalLen = 1 + rlLen + remainingLength; + + // Fixed header byte + byte flags = 0; + if (dup) flags |= 0x08; + flags |= (byte)((qos & 0x03) << 1); + if (retain) flags |= 0x01; + dest[0] = (byte)(((byte)MqttControlPacketType.Publish << 4) | flags); + + var pos = 1; + + // Remaining length + rlScratch[..rlLen].CopyTo(dest[pos..]); + pos += rlLen; + + // Topic name (length-prefixed) + BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], (ushort)topicUtf8.Length); + pos += 2; + topicUtf8.CopyTo(dest[pos..]); + pos += topicUtf8.Length; + + // Packet ID (only for QoS > 0) + if (qos > 0) + { + BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], packetId); + pos += 2; + } + + // Application payload + payload.CopyTo(dest[pos..]); + pos += payload.Length; + + return totalLen; + } + + /// + /// Calculates the total wire size of a PUBLISH packet without writing it. + /// + public static int MeasurePublish(int topicLen, int payloadLen, byte qos) + { + var remainingLength = 2 + topicLen + (qos > 0 ? 2 : 0) + payloadLen; + var rlLen = MeasureRemainingLength(remainingLength); + return 1 + rlLen + remainingLength; + } + + internal static int EncodeRemainingLengthTo(Span dest, int value) + { + var index = 0; + do + { + var digit = (byte)(value % 128); + value /= 128; + if (value > 0) + digit |= 0x80; + dest[index++] = digit; + } while (value > 0); + + return index; + } + + internal static int MeasureRemainingLength(int value) + { + var count = 0; + do + { + value /= 128; + count++; + } while (value > 0); + + return count; + } + internal static byte[] EncodeRemainingLength(int value) { if (value < 0 || value > MqttProtocolConstants.MaxPayloadSize) diff --git a/src/NATS.Server/Mqtt/MqttTopicMapper.cs b/src/NATS.Server/Mqtt/MqttTopicMapper.cs index 076c7d6..32797c2 100644 --- a/src/NATS.Server/Mqtt/MqttTopicMapper.cs +++ b/src/NATS.Server/Mqtt/MqttTopicMapper.cs @@ -15,6 +15,7 @@ // '*' → '+' // '>' → '#' +using System.Collections.Concurrent; using System.Text; namespace NATS.Server.Mqtt; @@ -25,6 +26,36 @@ namespace NATS.Server.Mqtt; /// public static class MqttTopicMapper { + private const int MaxCacheEntries = 4096; + private static readonly ConcurrentDictionary TopicBytesCache = new(StringComparer.Ordinal); + private static int _cacheCount; + + /// + /// Returns the MQTT topic as pre-encoded UTF-8 bytes, using a bounded cache + /// to avoid repeated string translation and encoding on the hot path. + /// + public static byte[] NatsToMqttBytes(string natsSubject) + { + if (TopicBytesCache.TryGetValue(natsSubject, out var cached)) + return cached; + + var mqttTopic = NatsToMqtt(natsSubject); + var bytes = Encoding.UTF8.GetBytes(mqttTopic); + + // Bounded cache — stop adding after limit to avoid unbounded growth + if (Interlocked.Increment(ref _cacheCount) <= MaxCacheEntries) + { + if (!TopicBytesCache.TryAdd(natsSubject, bytes)) + Interlocked.Decrement(ref _cacheCount); + } + else + { + Interlocked.Decrement(ref _cacheCount); + } + + return bytes; + } + // Escape sequence for dots that appear in MQTT topic names. // Go uses _DOT_ internally to represent a literal dot in the NATS subject. private const string DotEscape = "_DOT_"; diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs index 370d3cb..92c86ef 100644 --- a/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs @@ -5,3 +5,12 @@ public class BenchmarkCoreCollection : ICollectionFixture [CollectionDefinition("Benchmark-JetStream")] public class BenchmarkJetStreamCollection : ICollectionFixture; + +[CollectionDefinition("Benchmark-Mqtt")] +public class BenchmarkMqttCollection : ICollectionFixture; + +[CollectionDefinition("Benchmark-Tls")] +public class BenchmarkTlsCollection : ICollectionFixture; + +[CollectionDefinition("Benchmark-WebSocket")] +public class BenchmarkWebSocketCollection : ICollectionFixture; diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/MqttServerFixture.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/MqttServerFixture.cs new file mode 100644 index 0000000..56d382e --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/MqttServerFixture.cs @@ -0,0 +1,93 @@ +using NATS.Client.Core; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Starts both a Go and .NET NATS server with MQTT and JetStream enabled for MQTT benchmarks. +/// Shared across all tests in the "Benchmark-Mqtt" collection. +/// +public sealed class MqttServerFixture : IAsyncLifetime +{ + private GoServerProcess? _goServer; + private DotNetServerProcess? _dotNetServer; + private string? _goStoreDir; + private string? _dotNetStoreDir; + + public int GoNatsPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started"); + public int GoMqttPort { get; private set; } + public int DotNetNatsPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started"); + public int DotNetMqttPort { get; private set; } + public bool GoAvailable => _goServer is not null; + + public async Task InitializeAsync() + { + DotNetMqttPort = PortAllocator.AllocateFreePort(); + _dotNetStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-dotnet-mqtt-" + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(_dotNetStoreDir); + + var dotNetConfig = $$""" + jetstream { + store_dir: "{{_dotNetStoreDir}}" + max_mem_store: 64mb + max_file_store: 256mb + } + mqtt { + listen: 127.0.0.1:{{DotNetMqttPort}} + } + """; + + _dotNetServer = new DotNetServerProcess(dotNetConfig); + var dotNetTask = _dotNetServer.StartAsync(); + + if (GoServerProcess.IsAvailable()) + { + GoMqttPort = PortAllocator.AllocateFreePort(); + _goStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-go-mqtt-" + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(_goStoreDir); + + var goConfig = $$""" + jetstream { + store_dir: "{{_goStoreDir}}" + max_mem_store: 64mb + max_file_store: 256mb + } + mqtt { + listen: 127.0.0.1:{{GoMqttPort}} + } + """; + + _goServer = new GoServerProcess(goConfig); + await Task.WhenAll(dotNetTask, _goServer.StartAsync()); + } + else + { + await dotNetTask; + } + } + + public async Task DisposeAsync() + { + if (_goServer is not null) + await _goServer.DisposeAsync(); + if (_dotNetServer is not null) + await _dotNetServer.DisposeAsync(); + + CleanupDir(_goStoreDir); + CleanupDir(_dotNetStoreDir); + } + + public NatsConnection CreateGoNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{GoNatsPort}" }); + + public NatsConnection CreateDotNetNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetNatsPort}" }); + + private static void CleanupDir(string? dir) + { + if (dir is not null && Directory.Exists(dir)) + { + try { Directory.Delete(dir, recursive: true); } + catch { /* best-effort cleanup */ } + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/TlsServerFixture.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/TlsServerFixture.cs new file mode 100644 index 0000000..8b8446b --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/TlsServerFixture.cs @@ -0,0 +1,122 @@ +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using NATS.Client.Core; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Starts both a Go and .NET NATS server with TLS enabled for transport overhead benchmarks. +/// Shared across all tests in the "Benchmark-Tls" collection. +/// +public sealed class TlsServerFixture : IAsyncLifetime +{ + private GoServerProcess? _goServer; + private DotNetServerProcess? _dotNetServer; + private string? _tempDir; + + public int GoPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started"); + public int DotNetPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started"); + public bool GoAvailable => _goServer is not null; + + public async Task InitializeAsync() + { + _tempDir = Path.Combine(Path.GetTempPath(), $"nats-bench-tls-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_tempDir); + + var caCertPath = Path.Combine(_tempDir, "ca.pem"); + var serverCertPath = Path.Combine(_tempDir, "server-cert.pem"); + var serverKeyPath = Path.Combine(_tempDir, "server-key.pem"); + + GenerateCertificates(caCertPath, serverCertPath, serverKeyPath); + + var config = $$""" + tls { + cert_file: "{{serverCertPath}}" + key_file: "{{serverKeyPath}}" + ca_file: "{{caCertPath}}" + } + """; + + _dotNetServer = new DotNetServerProcess(config); + var dotNetTask = _dotNetServer.StartAsync(); + + if (GoServerProcess.IsAvailable()) + { + _goServer = new GoServerProcess(config); + await Task.WhenAll(dotNetTask, _goServer.StartAsync()); + } + else + { + await dotNetTask; + } + } + + public async Task DisposeAsync() + { + if (_goServer is not null) + await _goServer.DisposeAsync(); + if (_dotNetServer is not null) + await _dotNetServer.DisposeAsync(); + + if (_tempDir is not null && Directory.Exists(_tempDir)) + { + try { Directory.Delete(_tempDir, recursive: true); } + catch { /* best-effort cleanup */ } + } + } + + public NatsConnection CreateGoTlsClient() + => CreateTlsClient(GoPort); + + public NatsConnection CreateDotNetTlsClient() + => CreateTlsClient(DotNetPort); + + private static NatsConnection CreateTlsClient(int port) + { + var opts = new NatsOpts + { + Url = $"nats://127.0.0.1:{port}", + TlsOpts = new NatsTlsOpts + { + Mode = TlsMode.Require, + InsecureSkipVerify = true, + }, + }; + return new NatsConnection(opts); + } + + private static void GenerateCertificates(string caCertPath, string serverCertPath, string serverKeyPath) + { + using var caKey = RSA.Create(2048); + var caReq = new CertificateRequest( + "CN=Benchmark Test CA", + caKey, + HashAlgorithmName.SHA256, + RSASignaturePadding.Pkcs1); + caReq.CertificateExtensions.Add( + new X509BasicConstraintsExtension(certificateAuthority: true, hasPathLengthConstraint: false, pathLengthConstraint: 0, critical: true)); + + var now = DateTimeOffset.UtcNow; + using var caCert = caReq.CreateSelfSigned(now.AddMinutes(-5), now.AddDays(1)); + + using var serverKey = RSA.Create(2048); + var serverReq = new CertificateRequest( + "CN=localhost", + serverKey, + HashAlgorithmName.SHA256, + RSASignaturePadding.Pkcs1); + + var sanBuilder = new SubjectAlternativeNameBuilder(); + sanBuilder.AddIpAddress(System.Net.IPAddress.Loopback); + sanBuilder.AddDnsName("localhost"); + serverReq.CertificateExtensions.Add(sanBuilder.Build()); + serverReq.CertificateExtensions.Add( + new X509BasicConstraintsExtension(certificateAuthority: false, hasPathLengthConstraint: false, pathLengthConstraint: 0, critical: false)); + + using var serverCert = serverReq.Create(caCert, now.AddMinutes(-5), now.AddDays(1), [1, 2, 3, 4]); + + File.WriteAllText(caCertPath, caCert.ExportCertificatePem()); + File.WriteAllText(serverCertPath, serverCert.ExportCertificatePem()); + File.WriteAllText(serverKeyPath, serverKey.ExportRSAPrivateKeyPem()); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/WebSocketServerFixture.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/WebSocketServerFixture.cs new file mode 100644 index 0000000..027d611 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/WebSocketServerFixture.cs @@ -0,0 +1,67 @@ +using NATS.Client.Core; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Starts both a Go and .NET NATS server with WebSocket enabled for transport overhead benchmarks. +/// Shared across all tests in the "Benchmark-WebSocket" collection. +/// +public sealed class WebSocketServerFixture : IAsyncLifetime +{ + private GoServerProcess? _goServer; + private DotNetServerProcess? _dotNetServer; + + public int GoNatsPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started"); + public int GoWsPort { get; private set; } + public int DotNetNatsPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started"); + public int DotNetWsPort { get; private set; } + public bool GoAvailable => _goServer is not null; + + public async Task InitializeAsync() + { + DotNetWsPort = PortAllocator.AllocateFreePort(); + + var dotNetConfig = $$""" + websocket { + listen: 127.0.0.1:{{DotNetWsPort}} + no_tls: true + } + """; + + _dotNetServer = new DotNetServerProcess(dotNetConfig); + var dotNetTask = _dotNetServer.StartAsync(); + + if (GoServerProcess.IsAvailable()) + { + GoWsPort = PortAllocator.AllocateFreePort(); + + var goConfig = $$""" + websocket { + listen: 127.0.0.1:{{GoWsPort}} + no_tls: true + } + """; + + _goServer = new GoServerProcess(goConfig); + await Task.WhenAll(dotNetTask, _goServer.StartAsync()); + } + else + { + await dotNetTask; + } + } + + public async Task DisposeAsync() + { + if (_goServer is not null) + await _goServer.DisposeAsync(); + if (_dotNetServer is not null) + await _dotNetServer.DisposeAsync(); + } + + public NatsConnection CreateGoNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{GoNatsPort}" }); + + public NatsConnection CreateDotNetNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetNatsPort}" }); +} diff --git a/tests/NATS.Server.Benchmark.Tests/Mqtt/MqttThroughputTests.cs b/tests/NATS.Server.Benchmark.Tests/Mqtt/MqttThroughputTests.cs new file mode 100644 index 0000000..dc89f30 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Mqtt/MqttThroughputTests.cs @@ -0,0 +1,184 @@ +using MQTTnet; +using MQTTnet.Client; +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.Mqtt; + +[Collection("Benchmark-Mqtt")] +public class MqttThroughputTests(MqttServerFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task MqttPubSub_128B() + { + const int payloadSize = 128; + const int messageCount = 5_000; + + var dotnetResult = await RunMqttPubSub("MQTT PubSub (128B)", "DotNet", fixture.DotNetMqttPort, payloadSize, messageCount); + + if (fixture.GoAvailable) + { + var goResult = await RunMqttPubSub("MQTT PubSub (128B)", "Go", fixture.GoMqttPort, payloadSize, messageCount); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public async Task MqttCrossProtocol_NatsPub_MqttSub_128B() + { + const int payloadSize = 128; + const int messageCount = 5_000; + + var dotnetResult = await RunCrossProtocol("Cross-Protocol NATS→MQTT (128B)", "DotNet", fixture.DotNetMqttPort, fixture.CreateDotNetNatsClient, payloadSize, messageCount); + + if (fixture.GoAvailable) + { + var goResult = await RunCrossProtocol("Cross-Protocol NATS→MQTT (128B)", "Go", fixture.GoMqttPort, fixture.CreateGoNatsClient, payloadSize, messageCount); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunMqttPubSub(string name, string serverType, int mqttPort, int payloadSize, int messageCount) + { + var payload = new byte[payloadSize]; + var topic = $"bench/mqtt/pubsub/{Guid.NewGuid():N}"; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + var factory = new MqttFactory(); + using var subscriber = factory.CreateMqttClient(); + using var publisher = factory.CreateMqttClient(); + + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", mqttPort) + .WithClientId($"bench-sub-{Guid.NewGuid():N}") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + var pubOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", mqttPort) + .WithClientId($"bench-pub-{Guid.NewGuid():N}") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await subscriber.ConnectAsync(subOpts, cts.Token); + await publisher.ConnectAsync(pubOpts, cts.Token); + + var received = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + subscriber.ApplicationMessageReceivedAsync += _ => + { + if (Interlocked.Increment(ref received) >= messageCount) + tcs.TrySetResult(); + return Task.CompletedTask; + }; + + await subscriber.SubscribeAsync( + factory.CreateSubscribeOptionsBuilder() + .WithTopicFilter(topic) + .Build(), + cts.Token); + + await Task.Delay(200, cts.Token); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + { + await publisher.PublishAsync( + new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(payload) + .Build(), + cts.Token); + } + + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + await subscriber.DisconnectAsync(cancellationToken: cts.Token); + await publisher.DisconnectAsync(cancellationToken: cts.Token); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } + + private static async Task RunCrossProtocol(string name, string serverType, int mqttPort, Func createNatsClient, int payloadSize, int messageCount) + { + var payload = new byte[payloadSize]; + var natsSubject = $"bench.mqtt.cross.{Guid.NewGuid():N}"; + var mqttTopic = natsSubject.Replace('.', '/'); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + var factory = new MqttFactory(); + using var mqttSub = factory.CreateMqttClient(); + + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", mqttPort) + .WithClientId($"bench-cross-sub-{Guid.NewGuid():N}") + .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) + .Build(); + + await mqttSub.ConnectAsync(subOpts, cts.Token); + + var received = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + mqttSub.ApplicationMessageReceivedAsync += _ => + { + if (Interlocked.Increment(ref received) >= messageCount) + tcs.TrySetResult(); + return Task.CompletedTask; + }; + + await mqttSub.SubscribeAsync( + factory.CreateSubscribeOptionsBuilder() + .WithTopicFilter(mqttTopic) + .Build(), + cts.Token); + + await Task.Delay(200, cts.Token); + + await using var natsPub = createNatsClient(); + await natsPub.ConnectAsync(); + await natsPub.PingAsync(cts.Token); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + await natsPub.PublishAsync(natsSubject, payload, cancellationToken: cts.Token); + await natsPub.PingAsync(cts.Token); + + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + await mqttSub.DisconnectAsync(cancellationToken: cts.Token); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj index 4655016..e507b91 100644 --- a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj +++ b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/tests/NATS.Server.Benchmark.Tests/README.md b/tests/NATS.Server.Benchmark.Tests/README.md index 012465c..0ad6036 100644 --- a/tests/NATS.Server.Benchmark.Tests/README.md +++ b/tests/NATS.Server.Benchmark.Tests/README.md @@ -25,6 +25,12 @@ dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&Fully # JetStream only dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~JetStream" -v normal +# MQTT benchmarks +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~Mqtt" -v normal + +# Transport benchmarks (TLS + WebSocket) +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~Transport" -v normal + # A single benchmark by name dotnet test tests/NATS.Server.Benchmark.Tests --filter "FullyQualifiedName=NATS.Server.Benchmark.Tests.CorePubSub.SinglePublisherThroughputTests.PubNoSub_16B" -v normal ``` @@ -50,6 +56,12 @@ Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison | `FileStoreAppendBenchmarks` | `FileStore_PurgeEx_Trim_Overhead` | FileStore purge/trim maintenance overhead under repeated updates | | `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput | | `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput | +| `MqttThroughputTests` | `MqttPubSub_128B` | MQTT pub/sub throughput, 128-byte payload, QoS 0 | +| `MqttThroughputTests` | `MqttCrossProtocol_NatsPub_MqttSub_128B` | Cross-protocol NATS→MQTT routing throughput | +| `TlsPubSubTests` | `TlsPubSub1To1_128B` | TLS pub/sub 1:1 throughput, 128-byte payload | +| `TlsPubSubTests` | `TlsPubNoSub_128B` | TLS publish-only throughput, 128-byte payload | +| `WebSocketPubSubTests` | `WsPubSub1To1_128B` | WebSocket pub/sub 1:1 throughput, 128-byte payload | +| `WebSocketPubSubTests` | `WsPubNoSub_128B` | WebSocket publish-only throughput, 128-byte payload | ## Output Format @@ -97,6 +109,9 @@ Infrastructure/ GoServerProcess.cs # Builds + launches golang/nats-server CoreServerPairFixture.cs # IAsyncLifetime: Go + .NET servers for core tests JetStreamServerPairFixture # IAsyncLifetime: Go + .NET servers with JetStream + MqttServerFixture.cs # IAsyncLifetime: .NET server with MQTT + JetStream + TlsServerFixture.cs # IAsyncLifetime: .NET server with TLS + WebSocketServerFixture.cs # IAsyncLifetime: .NET server with WebSocket Collections.cs # xUnit collection definitions Harness/ diff --git a/tests/NATS.Server.Benchmark.Tests/Transport/TlsPubSubTests.cs b/tests/NATS.Server.Benchmark.Tests/Transport/TlsPubSubTests.cs new file mode 100644 index 0000000..308f02f --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Transport/TlsPubSubTests.cs @@ -0,0 +1,116 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.Transport; + +[Collection("Benchmark-Tls")] +public class TlsPubSubTests(TlsServerFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task TlsPubSub1To1_128B() + { + const int payloadSize = 128; + const int messageCount = 10_000; + + var dotnetResult = await RunTlsPubSub("TLS PubSub 1:1 (128B)", "DotNet", fixture.CreateDotNetTlsClient, payloadSize, messageCount); + + if (fixture.GoAvailable) + { + var goResult = await RunTlsPubSub("TLS PubSub 1:1 (128B)", "Go", fixture.CreateGoTlsClient, payloadSize, messageCount); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public async Task TlsPubNoSub_128B() + { + const int payloadSize = 128; + + var dotnetResult = await RunTlsPubOnly("TLS Pub-Only (128B)", "DotNet", fixture.CreateDotNetTlsClient, payloadSize); + + if (fixture.GoAvailable) + { + var goResult = await RunTlsPubOnly("TLS Pub-Only (128B)", "Go", fixture.CreateGoTlsClient, payloadSize); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunTlsPubSub(string name, string serverType, Func createClient, int payloadSize, int messageCount) + { + var payload = new byte[payloadSize]; + var subject = $"bench.tls.pubsub.{Guid.NewGuid():N}"; + + await using var pubClient = createClient(); + await using var subClient = createClient(); + await pubClient.ConnectAsync(); + await subClient.ConnectAsync(); + + var received = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var sub = await subClient.SubscribeCoreAsync(subject); + + await subClient.PingAsync(); + await pubClient.PingAsync(); + + var subTask = Task.Run(async () => + { + await foreach (var msg in sub.Msgs.ReadAllAsync()) + { + if (Interlocked.Increment(ref received) >= messageCount) + { + tcs.TrySetResult(); + return; + } + } + }); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + await pubClient.PublishAsync(subject, payload); + await pubClient.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + await sub.UnsubscribeAsync(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } + + private static async Task RunTlsPubOnly(string name, string serverType, Func createClient, int payloadSize) + { + var subject = $"bench.tls.pubonly.{Guid.NewGuid():N}"; + + await using var client = createClient(); + await client.ConnectAsync(); + + var runner = new BenchmarkRunner { WarmupCount = 1_000, MeasurementCount = 100_000 }; + + return await runner.MeasureThroughputAsync( + name, + serverType, + payloadSize, + async _ => await client.PublishAsync(subject, new byte[payloadSize])); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Transport/WebSocketPubSubTests.cs b/tests/NATS.Server.Benchmark.Tests/Transport/WebSocketPubSubTests.cs new file mode 100644 index 0000000..3cfe686 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Transport/WebSocketPubSubTests.cs @@ -0,0 +1,209 @@ +using System.Net.WebSockets; +using System.Text; +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.Transport; + +[Collection("Benchmark-WebSocket")] +public class WebSocketPubSubTests(WebSocketServerFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task WsPubSub1To1_128B() + { + const int payloadSize = 128; + const int messageCount = 5_000; + + var dotnetResult = await RunWsPubSub("WebSocket PubSub 1:1 (128B)", "DotNet", fixture.DotNetWsPort, fixture.CreateDotNetNatsClient, payloadSize, messageCount); + + if (fixture.GoAvailable) + { + var goResult = await RunWsPubSub("WebSocket PubSub 1:1 (128B)", "Go", fixture.GoWsPort, fixture.CreateGoNatsClient, payloadSize, messageCount); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public async Task WsPubNoSub_128B() + { + const int payloadSize = 128; + const int messageCount = 10_000; + + var dotnetResult = await RunWsPubOnly("WebSocket Pub-Only (128B)", "DotNet", fixture.DotNetWsPort, payloadSize, messageCount); + + if (fixture.GoAvailable) + { + var goResult = await RunWsPubOnly("WebSocket Pub-Only (128B)", "Go", fixture.GoWsPort, payloadSize, messageCount); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunWsPubSub(string name, string serverType, int wsPort, Func createNatsClient, int payloadSize, int messageCount) + { + var payload = new byte[payloadSize]; + var subject = $"bench.ws.pubsub.{Guid.NewGuid():N}"; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + using var ws = new ClientWebSocket(); + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{wsPort}"), cts.Token); + + var reader = new WsLineReader(ws); + + // Read INFO + await reader.ReadLineAsync(cts.Token); + + // Send CONNECT + SUB + PING + await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token); + await WsSend(ws, $"SUB {subject} 1\r\n", cts.Token); + await WsSend(ws, "PING\r\n", cts.Token); + await WaitForPong(reader, cts.Token); + + // NATS publisher + await using var natsPub = createNatsClient(); + await natsPub.ConnectAsync(); + await natsPub.PingAsync(cts.Token); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + + for (var i = 0; i < messageCount; i++) + await natsPub.PublishAsync(subject, payload, cancellationToken: cts.Token); + await natsPub.PingAsync(cts.Token); + + // Read all MSG responses from WebSocket + var received = 0; + while (received < messageCount) + { + var line = await reader.ReadLineAsync(cts.Token); + if (line.StartsWith("MSG ", StringComparison.Ordinal)) + { + await reader.ReadLineAsync(cts.Token); + received++; + } + } + + sw.Stop(); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } + + private static async Task RunWsPubOnly(string name, string serverType, int wsPort, int payloadSize, int messageCount) + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + using var ws = new ClientWebSocket(); + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{wsPort}"), cts.Token); + + var reader = new WsLineReader(ws); + + // Read INFO + await reader.ReadLineAsync(cts.Token); + + // Send CONNECT + await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token); + await WsSend(ws, "PING\r\n", cts.Token); + await WaitForPong(reader, cts.Token); + + // Build a PUB command with raw binary payload + var subject = $"bench.ws.pubonly.{Guid.NewGuid():N}"; + var pubLine = $"PUB {subject} {payloadSize}\r\n"; + var pubPayload = new byte[payloadSize]; + var pubCmd = Encoding.ASCII.GetBytes(pubLine) + .Concat(pubPayload) + .Concat(Encoding.ASCII.GetBytes("\r\n")) + .ToArray(); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + await ws.SendAsync(pubCmd, WebSocketMessageType.Binary, true, cts.Token); + + // Flush with PING/PONG + await WsSend(ws, "PING\r\n", cts.Token); + await WaitForPong(reader, cts.Token); + sw.Stop(); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } + + /// + /// Reads lines until PONG is received, skipping any INFO lines + /// (Go server sends a second INFO after CONNECT with connect_info:true). + /// + private static async Task WaitForPong(WsLineReader reader, CancellationToken ct) + { + while (true) + { + var line = await reader.ReadLineAsync(ct); + if (line == "PONG") + return; + } + } + + private static async Task WsSend(ClientWebSocket ws, string data, CancellationToken ct) + { + var bytes = Encoding.ASCII.GetBytes(data); + await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, ct); + } + + /// + /// Buffers incoming WebSocket frames and returns one NATS protocol line at a time. + /// + private sealed class WsLineReader(ClientWebSocket ws) + { + private readonly byte[] _recvBuffer = new byte[65536]; + private readonly StringBuilder _pending = new(); + + public async Task ReadLineAsync(CancellationToken ct) + { + while (true) + { + var full = _pending.ToString(); + var crlfIdx = full.IndexOf("\r\n", StringComparison.Ordinal); + if (crlfIdx >= 0) + { + var line = full[..crlfIdx]; + _pending.Clear(); + _pending.Append(full[(crlfIdx + 2)..]); + return line; + } + + var result = await ws.ReceiveAsync(_recvBuffer, ct); + if (result.MessageType == WebSocketMessageType.Close) + throw new InvalidOperationException("WebSocket closed unexpectedly while reading"); + + var chunk = Encoding.ASCII.GetString(_recvBuffer, 0, result.Count); + _pending.Append(chunk); + } + } + } +}