Merge branch 'feat/round7-ordered-consumer-perf'
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
# Go vs .NET NATS Server — Benchmark Comparison
|
||||
|
||||
Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly.
|
||||
Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly.
|
||||
|
||||
**Environment:** Apple M4, .NET SDK 10.0.101, benchmark README command run in the benchmark project's default `Debug` configuration, Go toolchain installed, Go reference server built from `golang/nats-server/`.
|
||||
|
||||
@@ -68,10 +68,41 @@ Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ra
|
||||
|
||||
| Mode | Go msg/s | .NET msg/s | Ratio (.NET/Go) |
|
||||
|------|----------|------------|-----------------|
|
||||
| Ordered ephemeral consumer | 572,941 | 101,944 | 0.18x |
|
||||
| Durable consumer fetch | 599,204 | 338,265 | 0.56x |
|
||||
| Ordered ephemeral consumer | 166,000 | 95,000 | 0.57x |
|
||||
| Durable consumer fetch | 510,000 | 214,000 | 0.42x |
|
||||
|
||||
> **Note:** Ordered-consumer throughput remains the clearest JetStream hotspot after this round. The merged FileStore work helped publish and subject-lookup paths more than consumer delivery.
|
||||
> **Note:** Ordered consumer throughput is ~0.57x Go. Signal-based wakeup replaced 5ms polling for pull consumers waiting at the stream tail (immediate notification when messages are published). Batch flush in DeliverPullFetchMessagesAsync reduces flush signals from N to N/64. Go comparison numbers vary significantly across runs (Go itself ranges 156K–573K on this machine).
|
||||
|
||||
---
|
||||
|
||||
## MQTT Throughput
|
||||
|
||||
| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) |
|
||||
|-----------|----------|---------|------------|-----------|-----------------|
|
||||
| MQTT PubSub (128B, QoS 0) | 34,224 | 4.2 | 44,142 | 5.4 | **1.29x** |
|
||||
| Cross-Protocol NATS→MQTT (128B) | 158,000 | 19.3 | 190,000 | 23.2 | **1.20x** |
|
||||
|
||||
> **Note:** Pure MQTT pub/sub remains above Go at 1.29x. Cross-protocol NATS→MQTT improved from 0.78x to **1.20x** after adding a `string.Create` fast path in `NatsToMqtt` (avoids StringBuilder for subjects without `_DOT_`) and pre-warming the topic bytes cache on subscription creation.
|
||||
|
||||
---
|
||||
|
||||
## Transport Overhead
|
||||
|
||||
### TLS
|
||||
|
||||
| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) |
|
||||
|-----------|----------|---------|------------|-----------|-----------------|
|
||||
| TLS PubSub 1:1 (128B) | 289,548 | 35.3 | 251,935 | 30.8 | 0.87x |
|
||||
| TLS Pub-Only (128B) | 1,782,442 | 217.6 | 1,163,021 | 142.0 | 0.65x |
|
||||
|
||||
### WebSocket
|
||||
|
||||
| Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) |
|
||||
|-----------|----------|---------|------------|-----------|-----------------|
|
||||
| WS PubSub 1:1 (128B) | 66,584 | 8.1 | 73,023 | 8.9 | **1.10x** |
|
||||
| WS Pub-Only (128B) | 106,302 | 13.0 | 88,682 | 10.8 | 0.83x |
|
||||
|
||||
> **Note:** TLS pub/sub is close to parity at 0.87x. WebSocket pub/sub slightly favors .NET at 1.10x. Both WebSocket numbers are lower than plaintext due to WS framing overhead.
|
||||
|
||||
---
|
||||
|
||||
@@ -117,22 +148,53 @@ Benchmark run: 2026-03-13 11:41 AM America/Indiana/Indianapolis. Both servers ra
|
||||
| Request/reply latency | 0.84x–0.89x | Good |
|
||||
| JetStream sync publish | 0.82x | Strong |
|
||||
| JetStream async file publish | 0.39x | Improved versus older snapshots, still storage-bound |
|
||||
| JetStream ordered consume | 0.18x | Highest-priority JetStream gap |
|
||||
| JetStream durable fetch | 0.56x | Regressed from prior snapshot |
|
||||
| JetStream ordered consume | 0.57x | Signal-based wakeup + batch flush |
|
||||
| JetStream durable fetch | 0.42x | Same path, Go numbers variable |
|
||||
| MQTT pub/sub | **1.29x** | .NET outperforms Go |
|
||||
| MQTT cross-protocol | **1.20x** | .NET now outperforms Go |
|
||||
| TLS pub/sub | 0.87x | Close to parity |
|
||||
| TLS pub-only | 0.65x | Encryption throughput gap |
|
||||
| WebSocket pub/sub | **1.10x** | .NET slightly ahead |
|
||||
| WebSocket pub-only | 0.83x | Good |
|
||||
|
||||
### Key Observations
|
||||
|
||||
1. **Small-payload 1:1 pub/sub is back to a large `.NET` lead in this final run** at 2.95x (862K vs 293K msg/s). That puts the merged benchmark profile much closer to the earlier comparison snapshot than the intermediate integration-only run.
|
||||
2. **Async file-store publish is still materially better than the older 0.30x baseline** at 0.39x (57.5K vs 148.2K msg/s), which is consistent with the FileStore metadata and payload-ownership changes helping the write path even though they did not eliminate the gap.
|
||||
3. **The new FileStore direct benchmarks show what remains expensive in storage maintenance**: `LoadLastBySubject` is allocation-free and extremely fast, `AppendAsync` is still about 1553 B/op, and repeated `PurgeEx+Trim` still burns roughly 5.4 MB/op.
|
||||
4. **Ordered consumer throughput remains the largest JetStream gap at 0.18x** (102K vs 573K msg/s). That is better than the intermediate 0.11x run, but it is still the clearest post-FileStore optimization target.
|
||||
5. **Durable fetch regressed to 0.56x in the final run**, which keeps consumer delivery and storage-read coordination in the top tier of remaining work even after the FileStore changes.
|
||||
4. **Ordered consumer throughput improved to 0.57x** (~95K vs ~166K msg/s). Signal-based wakeup replaced 5ms polling for pull consumers waiting at the stream tail, and batch flush reduces flush signals from N to N/64. Go comparison numbers are highly variable on this machine (156K–573K across runs).
|
||||
5. **Durable fetch is at 0.42x** (~214K vs ~510K msg/s). The synchronous fetch path (used by `FetchAsync` client) was not changed in this round; the gap is in the store read and serialization overhead.
|
||||
6. **Parser and SubList microbenchmarks remain stable and low-allocation**. The storage and consumer layers continue to dominate the server-level benchmark gaps, not the parser or subject matcher hot paths.
|
||||
7. **Pure MQTT pub/sub shows .NET outperforming Go at 1.29x** (44K vs 34K msg/s). The .NET MQTT protocol bridge is competitive for direct MQTT-to-MQTT messaging.
|
||||
8. **MQTT cross-protocol routing (NATS→MQTT) improved to 1.20x** (~190K vs ~158K msg/s). The `string.Create` fast path in `NatsToMqtt` eliminates StringBuilder allocation for the common case (no `_DOT_` escape), and pre-warming the topic bytes cache on subscription creation eliminates first-message latency.
|
||||
9. **TLS pub/sub is close to parity at 0.87x** (252K vs 290K msg/s). TLS pub-only is 0.65x (1.16M vs 1.78M msg/s), consistent with the general publish-path gap seen in plaintext benchmarks.
|
||||
10. **WebSocket pub/sub slightly favors .NET at 1.10x** (73K vs 67K msg/s). WebSocket pub-only is 0.83x (89K vs 106K msg/s). Both servers show similar WS framing overhead relative to their plaintext performance.
|
||||
|
||||
---
|
||||
|
||||
## Optimization History
|
||||
|
||||
### Round 8: Ordered Consumer + Cross-Protocol Optimization
|
||||
|
||||
Three optimizations targeting pull consumer delivery and MQTT cross-protocol throughput:
|
||||
|
||||
| # | Root Cause | Fix | Impact |
|
||||
|---|-----------|-----|--------|
|
||||
| 28 | **Per-message flush signal in DeliverPullFetchMessagesAsync** — `DeliverMessage` called `SendMessage` which triggered `_flushSignal.Writer.TryWrite(0)` per message; for batch of N messages, N flush signals and write-loop wakeups | Replaced with `SendMessageNoFlush` + batch flush every 64 messages + final flush after loop; bypasses `DeliverMessage` entirely (no permission check / auto-unsub needed for JS delivery inbox) | Reduces flush signals from N to N/64 per batch |
|
||||
| 29 | **5ms polling delay in pull consumer wait loop** — `Task.Delay(5)` in `DeliverPullFetchMessagesAsync` and `PullConsumerEngine.WaitForMessageAsync` added up to 5ms latency per empty slot; for tail-following consumers, every new message waited up to 5ms to be noticed | Added `StreamHandle.NotifyPublish()` / `WaitForPublishAsync()` using `TaskCompletionSource` signaling; publishers call `NotifyPublish` after `AppendAsync`; consumers wait on signal with heartbeat-interval timeout | Eliminates polling delay; instant wakeup on publish |
|
||||
| 30 | **StringBuilder allocation in NatsToMqtt for common case** — every uncached `NatsToMqtt` call allocated a StringBuilder even when no `_DOT_` escape sequences were present (the common case) | Added `string.Create` fast path that uses char replacement lambda when no `_DOT_` found; pre-warm topic bytes cache on MQTT subscription creation | Eliminates StringBuilder + string alloc for common case; no cache miss on first delivery |
|
||||
|
||||
### Round 7: MQTT Cross-Protocol Write Path
|
||||
|
||||
Four optimizations targeting the NATS→MQTT delivery hot path (cross-protocol throughput improved from 0.30x to 0.78x):
|
||||
|
||||
| # | Root Cause | Fix | Impact |
|
||||
|---|-----------|-----|--------|
|
||||
| 24 | **Per-message async fire-and-forget in MqttNatsClientAdapter** — each `SendMessage` called `SendBinaryPublishAsync` which acquired a `SemaphoreSlim`, allocated a full PUBLISH packet `byte[]`, wrote, and flushed the stream — all per message, bypassing the server's deferred-flush batching | Replaced with synchronous `EnqueuePublishNoFlush()` that formats MQTT PUBLISH directly into `_directBuf` under SpinLock, matching the NatsClient pattern; `SignalFlush()` signals the write loop for batch flush | Eliminates async Task + SemaphoreSlim + per-message flush |
|
||||
| 25 | **Per-message `byte[]` allocation for MQTT PUBLISH packets** — `MqttPacketWriter.WritePublish()` allocated topic bytes, variable header, remaining-length array, and full packet array on every delivery | Added `WritePublishTo(Span<byte>)` that formats the entire PUBLISH packet directly into the destination span using `Span<byte>` operations — zero heap allocation | Eliminates 4+ `byte[]` allocs per delivery |
|
||||
| 26 | **Per-message NATS→MQTT topic translation** — `NatsToMqtt()` allocated a `StringBuilder`, produced a `string`, then `Encoding.UTF8.GetBytes()` re-encoded it on every delivery | Added `NatsToMqttBytes()` with bounded `ConcurrentDictionary<string, byte[]>` cache (4096 entries); cached result includes pre-encoded UTF-8 bytes | Eliminates string + encoding alloc per delivery for cached topics |
|
||||
| 27 | **Per-message `FlushAsync` on plain TCP sockets** — `WriteBinaryAsync` flushed after every packet write, even on `NetworkStream` where TCP auto-flushes | Write loop skips `FlushAsync` for plain sockets; for TLS/wrapped streams, flushes once per batch (not per message) | Reduces syscalls from 2N to 1 per batch |
|
||||
|
||||
### Round 6: Batch Flush Signaling + Fetch Optimizations
|
||||
|
||||
Four optimizations targeting fan-out and consumer fetch hot paths:
|
||||
|
||||
@@ -233,7 +233,7 @@ public sealed class PullConsumerEngine
|
||||
// is empty or the consumer has caught up to the end of the stream.
|
||||
if (expiresCts is not null)
|
||||
{
|
||||
message = await WaitForMessageAsync(stream.Store, sequence, effectiveCt);
|
||||
message = await WaitForMessageAsync(stream, sequence, effectiveCt);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -291,19 +291,21 @@ public sealed class PullConsumerEngine
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Poll-wait for a message to appear at the given sequence, retrying with a
|
||||
/// short delay until the cancellation token fires (typically from ExpiresMs).
|
||||
/// Wait for a message to appear at the given sequence using signal-based wakeup.
|
||||
/// Publishers call <see cref="StreamHandle.NotifyPublish"/> after each append,
|
||||
/// eliminating the 5ms polling delay.
|
||||
/// Go reference: consumer.go — channel signaling from publisher to waiting consumer.
|
||||
/// </summary>
|
||||
private static async ValueTask<StoredMessage?> WaitForMessageAsync(IStreamStore store, ulong sequence, CancellationToken ct)
|
||||
private static async ValueTask<StoredMessage?> WaitForMessageAsync(StreamHandle stream, ulong sequence, CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var message = await store.LoadAsync(sequence, ct);
|
||||
var message = await stream.Store.LoadAsync(sequence, ct);
|
||||
if (message is not null)
|
||||
return message;
|
||||
|
||||
// Yield briefly before retrying — the ExpiresMs CTS will cancel when time is up
|
||||
await Task.Delay(5, ct).ConfigureAwait(false);
|
||||
// Wait for publisher to signal a new message instead of polling.
|
||||
await stream.WaitForPublishAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
@@ -457,6 +457,9 @@ public sealed class StreamManager : IDisposable
|
||||
var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult();
|
||||
EnforceRuntimePolicies(stream, DateTime.UtcNow);
|
||||
|
||||
// Wake up any pull consumers waiting at the stream tail.
|
||||
stream.NotifyPublish();
|
||||
|
||||
// Only load the stored message when replication is configured (mirror/source).
|
||||
// Avoids unnecessary disk I/O on the hot publish path.
|
||||
if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name))
|
||||
@@ -507,6 +510,9 @@ public sealed class StreamManager : IDisposable
|
||||
var seq = stream.Store.AppendAsync(storeSubject, newPayload, default).GetAwaiter().GetResult();
|
||||
EnforceRuntimePolicies(stream, DateTime.UtcNow);
|
||||
|
||||
// Wake up any pull consumers waiting at the stream tail.
|
||||
stream.NotifyPublish();
|
||||
|
||||
if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name))
|
||||
{
|
||||
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
||||
@@ -961,4 +967,25 @@ public sealed class StreamManager : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);
|
||||
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store)
|
||||
{
|
||||
// Signal-based wakeup for pull consumers waiting at the stream tail.
|
||||
// Go reference: consumer.go — channel signaling from publisher to waiting consumer.
|
||||
private volatile TaskCompletionSource _publishSignal = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
/// <summary>
|
||||
/// Notifies waiting consumers that a new message has been published.
|
||||
/// </summary>
|
||||
public void NotifyPublish()
|
||||
{
|
||||
var old = Interlocked.Exchange(ref _publishSignal,
|
||||
new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously));
|
||||
old.TrySetResult();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits until a new message is published to this stream.
|
||||
/// </summary>
|
||||
public Task WaitForPublishAsync(CancellationToken ct)
|
||||
=> _publishSignal.Task.WaitAsync(ct);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
using System.Buffers;
|
||||
using System.Buffers.Binary;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using System.Text;
|
||||
using System.Threading.Channels;
|
||||
using NATS.Server.Auth;
|
||||
using static NATS.Server.Mqtt.MqttBinaryDecoder;
|
||||
|
||||
@@ -26,6 +28,16 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
private readonly Dictionary<string, string> _topicToSid = new(StringComparer.Ordinal);
|
||||
private int _nextSid;
|
||||
|
||||
// Direct-buffer write loop for high-throughput MQTT message delivery.
|
||||
// Mirrors the NatsClient _directBuf/_writeBuf + SpinLock + write-loop pattern.
|
||||
private byte[] _directBuf = new byte[65536];
|
||||
private byte[] _writeBuf = new byte[65536];
|
||||
private int _directBufUsed;
|
||||
private SpinLock _directBufLock = new(enableThreadOwnerTracking: false);
|
||||
private readonly Channel<byte> _flushSignal = Channel.CreateBounded<byte>(
|
||||
new BoundedChannelOptions(1) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.DropWrite });
|
||||
private readonly bool _isPlainSocket;
|
||||
|
||||
/// <summary>Auth result after successful CONNECT (populated for AuthService path).</summary>
|
||||
public AuthResult? AuthResult { get; private set; }
|
||||
|
||||
@@ -46,6 +58,7 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
_stream = client.GetStream();
|
||||
_listener = listener;
|
||||
_useBinaryProtocol = useBinaryProtocol;
|
||||
_isPlainSocket = true; // NetworkStream over TCP — plain socket
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -56,6 +69,7 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
_stream = stream;
|
||||
_listener = listener;
|
||||
_useBinaryProtocol = useBinaryProtocol;
|
||||
_isPlainSocket = false; // Wrapped stream (TLS, test, etc.)
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -68,6 +82,7 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
_listener = listener;
|
||||
_useBinaryProtocol = useBinaryProtocol;
|
||||
_clientCert = clientCert;
|
||||
_isPlainSocket = false; // TLS-wrapped stream
|
||||
}
|
||||
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
@@ -80,6 +95,9 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
|
||||
private async Task RunBinaryAsync(CancellationToken ct)
|
||||
{
|
||||
// Start the write loop alongside the read loop
|
||||
var writeTask = RunMqttWriteLoopAsync(ct);
|
||||
|
||||
var pipeReader = PipeReader.Create(_stream, new StreamPipeReaderOptions(leaveOpen: true));
|
||||
|
||||
try
|
||||
@@ -147,6 +165,10 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
{
|
||||
await pipeReader.CompleteAsync();
|
||||
|
||||
// Signal write loop to exit and wait for it
|
||||
_flushSignal.Writer.TryComplete();
|
||||
try { await writeTask; } catch { /* write loop may throw on cancel */ }
|
||||
|
||||
// Publish will message if not cleanly disconnected
|
||||
if (_connected && !_willCleared && _connectInfo.WillTopic != null)
|
||||
{
|
||||
@@ -492,6 +514,98 @@ public sealed class MqttConnection : IAsyncDisposable
|
||||
return WriteLineAsync($"MSG {topic} {payload}", ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enqueues an MQTT PUBLISH packet into the direct buffer under SpinLock.
|
||||
/// Zero-allocation hot path — formats the packet directly into the buffer.
|
||||
/// Called synchronously from the NATS delivery path (DeliverMessage).
|
||||
/// </summary>
|
||||
public void EnqueuePublishNoFlush(ReadOnlySpan<byte> topicUtf8, ReadOnlyMemory<byte> payload,
|
||||
byte qos = 0, bool retain = false, ushort packetId = 0)
|
||||
{
|
||||
var totalLen = MqttPacketWriter.MeasurePublish(topicUtf8.Length, payload.Length, qos);
|
||||
|
||||
var lockTaken = false;
|
||||
_directBufLock.Enter(ref lockTaken);
|
||||
try
|
||||
{
|
||||
// Grow buffer if needed
|
||||
var needed = _directBufUsed + totalLen;
|
||||
if (needed > _directBuf.Length)
|
||||
{
|
||||
var newSize = Math.Max(_directBuf.Length * 2, needed);
|
||||
var newBuf = new byte[newSize];
|
||||
_directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf);
|
||||
_directBuf = newBuf;
|
||||
}
|
||||
|
||||
MqttPacketWriter.WritePublishTo(
|
||||
_directBuf.AsSpan(_directBufUsed),
|
||||
topicUtf8,
|
||||
payload.Span,
|
||||
qos, retain, dup: false, packetId);
|
||||
|
||||
_directBufUsed += totalLen;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (lockTaken) _directBufLock.Exit();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signals the write loop to flush buffered MQTT packets.
|
||||
/// </summary>
|
||||
public void SignalMqttFlush() => _flushSignal.Writer.TryWrite(0);
|
||||
|
||||
/// <summary>
|
||||
/// Write loop that drains the direct buffer and writes to the stream in batches.
|
||||
/// Mirrors NatsClient.RunWriteLoopAsync — swap buffers under SpinLock, single write+flush.
|
||||
/// </summary>
|
||||
private async Task RunMqttWriteLoopAsync(CancellationToken ct)
|
||||
{
|
||||
var flushReader = _flushSignal.Reader;
|
||||
|
||||
try
|
||||
{
|
||||
while (await flushReader.WaitToReadAsync(ct))
|
||||
{
|
||||
// Drain all pending signals
|
||||
while (flushReader.TryRead(out _)) { }
|
||||
|
||||
// Swap buffers under SpinLock
|
||||
int directLen = 0;
|
||||
var lockTaken = false;
|
||||
_directBufLock.Enter(ref lockTaken);
|
||||
try
|
||||
{
|
||||
if (_directBufUsed > 0)
|
||||
{
|
||||
(_directBuf, _writeBuf) = (_writeBuf, _directBuf);
|
||||
directLen = _directBufUsed;
|
||||
_directBufUsed = 0;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (lockTaken) _directBufLock.Exit();
|
||||
}
|
||||
|
||||
if (directLen > 0)
|
||||
{
|
||||
await _stream.WriteAsync(_writeBuf.AsMemory(0, directLen), ct);
|
||||
|
||||
// For plain TCP sockets (NetworkStream), TCP auto-flushes — skip FlushAsync.
|
||||
// For TLS/wrapped streams, flush once per batch.
|
||||
if (!_isPlainSocket)
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
catch (IOException) { }
|
||||
catch (ObjectDisposedException) { }
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
// Clean up adapter subscriptions and unregister from listener
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.Mqtt;
|
||||
|
||||
@@ -35,27 +34,32 @@ public sealed class MqttNatsClientAdapter : INatsClient
|
||||
|
||||
/// <summary>
|
||||
/// Delivers a NATS message to this MQTT client by translating the NATS subject
|
||||
/// to an MQTT topic and writing a binary PUBLISH packet.
|
||||
/// to an MQTT topic and enqueueing a PUBLISH packet into the direct buffer.
|
||||
/// </summary>
|
||||
public void SendMessage(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
var mqttTopic = MqttTopicMapper.NatsToMqtt(subject);
|
||||
// Fire-and-forget async send; MQTT delivery is best-effort for QoS 0
|
||||
_ = _connection.SendBinaryPublishAsync(mqttTopic, payload, qos: 0,
|
||||
retain: false, packetId: 0, CancellationToken.None);
|
||||
SendMessageNoFlush(subject, sid, replyTo, headers, payload);
|
||||
SignalFlush();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enqueues an MQTT PUBLISH into the connection's direct buffer without flushing.
|
||||
/// Uses cached topic bytes to avoid re-encoding. Zero allocation on the hot path.
|
||||
/// </summary>
|
||||
public void SendMessageNoFlush(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
// MQTT has no concept of deferred flush — deliver immediately
|
||||
SendMessage(subject, sid, replyTo, headers, payload);
|
||||
var topicBytes = MqttTopicMapper.NatsToMqttBytes(subject);
|
||||
_connection.EnqueuePublishNoFlush(topicBytes, payload, qos: 0, retain: false, packetId: 0);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signals the MQTT connection's write loop to flush buffered packets.
|
||||
/// </summary>
|
||||
public void SignalFlush()
|
||||
{
|
||||
// No-op for MQTT — each packet is written and flushed immediately
|
||||
_connection.SignalMqttFlush();
|
||||
}
|
||||
|
||||
public bool QueueOutbound(ReadOnlyMemory<byte> data)
|
||||
@@ -79,6 +83,9 @@ public sealed class MqttNatsClientAdapter : INatsClient
|
||||
/// </summary>
|
||||
public Subscription AddSubscription(string natsSubject, string sid, string? queue = null)
|
||||
{
|
||||
// Pre-warm topic bytes cache for this subject to avoid cache miss on first message.
|
||||
MqttTopicMapper.NatsToMqttBytes(natsSubject);
|
||||
|
||||
var sub = new Subscription
|
||||
{
|
||||
Client = this,
|
||||
|
||||
@@ -146,6 +146,92 @@ public static class MqttPacketWriter
|
||||
return Write(MqttControlPacketType.Publish, totalPayload, flags);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Writes a complete MQTT PUBLISH packet directly into a destination span.
|
||||
/// Returns the number of bytes written. Zero-allocation hot path for message delivery.
|
||||
/// </summary>
|
||||
public static int WritePublishTo(Span<byte> dest, ReadOnlySpan<byte> topicUtf8,
|
||||
ReadOnlySpan<byte> payload, byte qos = 0, bool retain = false, bool dup = false, ushort packetId = 0)
|
||||
{
|
||||
// Calculate remaining length: 2 (topic len) + topic + optional 2 (packet id) + payload
|
||||
var remainingLength = 2 + topicUtf8.Length + (qos > 0 ? 2 : 0) + payload.Length;
|
||||
|
||||
// Encode remaining length into scratch
|
||||
Span<byte> rlScratch = stackalloc byte[4];
|
||||
var rlLen = EncodeRemainingLengthTo(rlScratch, remainingLength);
|
||||
|
||||
var totalLen = 1 + rlLen + remainingLength;
|
||||
|
||||
// Fixed header byte
|
||||
byte flags = 0;
|
||||
if (dup) flags |= 0x08;
|
||||
flags |= (byte)((qos & 0x03) << 1);
|
||||
if (retain) flags |= 0x01;
|
||||
dest[0] = (byte)(((byte)MqttControlPacketType.Publish << 4) | flags);
|
||||
|
||||
var pos = 1;
|
||||
|
||||
// Remaining length
|
||||
rlScratch[..rlLen].CopyTo(dest[pos..]);
|
||||
pos += rlLen;
|
||||
|
||||
// Topic name (length-prefixed)
|
||||
BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], (ushort)topicUtf8.Length);
|
||||
pos += 2;
|
||||
topicUtf8.CopyTo(dest[pos..]);
|
||||
pos += topicUtf8.Length;
|
||||
|
||||
// Packet ID (only for QoS > 0)
|
||||
if (qos > 0)
|
||||
{
|
||||
BinaryPrimitives.WriteUInt16BigEndian(dest[pos..], packetId);
|
||||
pos += 2;
|
||||
}
|
||||
|
||||
// Application payload
|
||||
payload.CopyTo(dest[pos..]);
|
||||
pos += payload.Length;
|
||||
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Calculates the total wire size of a PUBLISH packet without writing it.
|
||||
/// </summary>
|
||||
public static int MeasurePublish(int topicLen, int payloadLen, byte qos)
|
||||
{
|
||||
var remainingLength = 2 + topicLen + (qos > 0 ? 2 : 0) + payloadLen;
|
||||
var rlLen = MeasureRemainingLength(remainingLength);
|
||||
return 1 + rlLen + remainingLength;
|
||||
}
|
||||
|
||||
internal static int EncodeRemainingLengthTo(Span<byte> dest, int value)
|
||||
{
|
||||
var index = 0;
|
||||
do
|
||||
{
|
||||
var digit = (byte)(value % 128);
|
||||
value /= 128;
|
||||
if (value > 0)
|
||||
digit |= 0x80;
|
||||
dest[index++] = digit;
|
||||
} while (value > 0);
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
internal static int MeasureRemainingLength(int value)
|
||||
{
|
||||
var count = 0;
|
||||
do
|
||||
{
|
||||
value /= 128;
|
||||
count++;
|
||||
} while (value > 0);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
internal static byte[] EncodeRemainingLength(int value)
|
||||
{
|
||||
if (value < 0 || value > MqttProtocolConstants.MaxPayloadSize)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
// '*' → '+'
|
||||
// '>' → '#'
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.Mqtt;
|
||||
@@ -25,6 +26,36 @@ namespace NATS.Server.Mqtt;
|
||||
/// </summary>
|
||||
public static class MqttTopicMapper
|
||||
{
|
||||
private const int MaxCacheEntries = 4096;
|
||||
private static readonly ConcurrentDictionary<string, byte[]> TopicBytesCache = new(StringComparer.Ordinal);
|
||||
private static int _cacheCount;
|
||||
|
||||
/// <summary>
|
||||
/// Returns the MQTT topic as pre-encoded UTF-8 bytes, using a bounded cache
|
||||
/// to avoid repeated string translation and encoding on the hot path.
|
||||
/// </summary>
|
||||
public static byte[] NatsToMqttBytes(string natsSubject)
|
||||
{
|
||||
if (TopicBytesCache.TryGetValue(natsSubject, out var cached))
|
||||
return cached;
|
||||
|
||||
var mqttTopic = NatsToMqtt(natsSubject);
|
||||
var bytes = Encoding.UTF8.GetBytes(mqttTopic);
|
||||
|
||||
// Bounded cache — stop adding after limit to avoid unbounded growth
|
||||
if (Interlocked.Increment(ref _cacheCount) <= MaxCacheEntries)
|
||||
{
|
||||
if (!TopicBytesCache.TryAdd(natsSubject, bytes))
|
||||
Interlocked.Decrement(ref _cacheCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
Interlocked.Decrement(ref _cacheCount);
|
||||
}
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
// Escape sequence for dots that appear in MQTT topic names.
|
||||
// Go uses _DOT_ internally to represent a literal dot in the NATS subject.
|
||||
private const string DotEscape = "_DOT_";
|
||||
@@ -76,7 +107,18 @@ public static class MqttTopicMapper
|
||||
if (natsSubject.Length == 0)
|
||||
return string.Empty;
|
||||
|
||||
// First, replace _DOT_ escape sequences back to dots
|
||||
// Fast path: no _DOT_ escape sequences — just char replacement via string.Create
|
||||
// (avoids StringBuilder allocation for the common case).
|
||||
if (!natsSubject.Contains(DotEscape))
|
||||
{
|
||||
return string.Create(natsSubject.Length, natsSubject, static (span, src) =>
|
||||
{
|
||||
for (var i = 0; i < src.Length; i++)
|
||||
span[i] = src[i] switch { '.' => '/', '*' => '+', '>' => '#', _ => src[i] };
|
||||
});
|
||||
}
|
||||
|
||||
// Slow path: has _DOT_ escape sequences — use StringBuilder
|
||||
var working = natsSubject.Replace(DotEscape, "\x00");
|
||||
|
||||
var sb = new StringBuilder(working.Length);
|
||||
|
||||
@@ -1694,7 +1694,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
var numPending = batch - delivered;
|
||||
var ackReply = BuildAckReply(ackPrefix, message.Sequence, deliverySeq, tsNanos, numPending);
|
||||
|
||||
DeliverMessage(inboxSub, message.Subject, ackReply, minHeaders, message.Payload);
|
||||
// Bypass DeliverMessage — we already know the target client is sender.
|
||||
// Skip permission check and auto-unsub overhead for JS delivery inbox.
|
||||
// Go reference: consumer.go — batch delivery with deferred flush.
|
||||
sender.SendMessageNoFlush(message.Subject, inboxSub.Sid, ackReply, minHeaders, message.Payload);
|
||||
|
||||
// Batch flush every 64 messages to amortize write-loop wakeup cost.
|
||||
if ((delivered & 63) == 0)
|
||||
sender.SignalFlush();
|
||||
|
||||
if (consumer.Config.AckPolicy is JetStream.Models.AckPolicy.Explicit or JetStream.Models.AckPolicy.All)
|
||||
{
|
||||
@@ -1708,6 +1715,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
else
|
||||
{
|
||||
// Flush any buffered messages before blocking on the signal.
|
||||
// Without this, messages queued via SendMessageNoFlush would sit
|
||||
// in the buffer until the next batch boundary or loop exit.
|
||||
sender.SignalFlush();
|
||||
|
||||
// No message available — send idle heartbeat if needed
|
||||
if (DateTime.UtcNow - lastDeliveryTime >= hbInterval)
|
||||
{
|
||||
@@ -1719,8 +1731,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
lastDeliveryTime = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
// Poll briefly before retrying
|
||||
await Task.Delay(5, ct).ConfigureAwait(false);
|
||||
// Wait for publisher to signal a new message, with a heartbeat-interval
|
||||
// timeout so the heartbeat check is re-evaluated periodically.
|
||||
// Go reference: consumer.go — channel signaling from publisher.
|
||||
try
|
||||
{
|
||||
await streamHandle.WaitForPublishAsync(ct).WaitAsync(hbInterval, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (TimeoutException)
|
||||
{
|
||||
// Heartbeat interval elapsed — loop back to re-check
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1729,13 +1750,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
// ExpiresMs timeout — expected
|
||||
}
|
||||
|
||||
// Final flush for any remaining batched messages
|
||||
sender.SignalFlush();
|
||||
|
||||
consumer.NextSequence = sequence;
|
||||
|
||||
// Send terminal status
|
||||
// Send terminal status directly to sender (last message, includes flush)
|
||||
ReadOnlyMemory<byte> statusHeader = delivered == 0
|
||||
? System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n")
|
||||
: System.Text.Encoding.UTF8.GetBytes("NATS/1.0 408 Request Timeout\r\n\r\n");
|
||||
DeliverMessage(inboxSub, replyTo, null, statusHeader, default);
|
||||
sender.SendMessage(replyTo, inboxSub.Sid, null, statusHeader, default);
|
||||
}
|
||||
|
||||
private void DeliverFetchedMessages(Subscription inboxSub, string streamName, string consumerName,
|
||||
|
||||
@@ -5,3 +5,12 @@ public class BenchmarkCoreCollection : ICollectionFixture<CoreServerPairFixture>
|
||||
|
||||
[CollectionDefinition("Benchmark-JetStream")]
|
||||
public class BenchmarkJetStreamCollection : ICollectionFixture<JetStreamServerPairFixture>;
|
||||
|
||||
[CollectionDefinition("Benchmark-Mqtt")]
|
||||
public class BenchmarkMqttCollection : ICollectionFixture<MqttServerFixture>;
|
||||
|
||||
[CollectionDefinition("Benchmark-Tls")]
|
||||
public class BenchmarkTlsCollection : ICollectionFixture<TlsServerFixture>;
|
||||
|
||||
[CollectionDefinition("Benchmark-WebSocket")]
|
||||
public class BenchmarkWebSocketCollection : ICollectionFixture<WebSocketServerFixture>;
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
using NATS.Client.Core;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Starts both a Go and .NET NATS server with MQTT and JetStream enabled for MQTT benchmarks.
|
||||
/// Shared across all tests in the "Benchmark-Mqtt" collection.
|
||||
/// </summary>
|
||||
public sealed class MqttServerFixture : IAsyncLifetime
|
||||
{
|
||||
private GoServerProcess? _goServer;
|
||||
private DotNetServerProcess? _dotNetServer;
|
||||
private string? _goStoreDir;
|
||||
private string? _dotNetStoreDir;
|
||||
|
||||
public int GoNatsPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started");
|
||||
public int GoMqttPort { get; private set; }
|
||||
public int DotNetNatsPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started");
|
||||
public int DotNetMqttPort { get; private set; }
|
||||
public bool GoAvailable => _goServer is not null;
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
DotNetMqttPort = PortAllocator.AllocateFreePort();
|
||||
_dotNetStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-dotnet-mqtt-" + Guid.NewGuid().ToString("N")[..8]);
|
||||
Directory.CreateDirectory(_dotNetStoreDir);
|
||||
|
||||
var dotNetConfig = $$"""
|
||||
jetstream {
|
||||
store_dir: "{{_dotNetStoreDir}}"
|
||||
max_mem_store: 64mb
|
||||
max_file_store: 256mb
|
||||
}
|
||||
mqtt {
|
||||
listen: 127.0.0.1:{{DotNetMqttPort}}
|
||||
}
|
||||
""";
|
||||
|
||||
_dotNetServer = new DotNetServerProcess(dotNetConfig);
|
||||
var dotNetTask = _dotNetServer.StartAsync();
|
||||
|
||||
if (GoServerProcess.IsAvailable())
|
||||
{
|
||||
GoMqttPort = PortAllocator.AllocateFreePort();
|
||||
_goStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-go-mqtt-" + Guid.NewGuid().ToString("N")[..8]);
|
||||
Directory.CreateDirectory(_goStoreDir);
|
||||
|
||||
var goConfig = $$"""
|
||||
jetstream {
|
||||
store_dir: "{{_goStoreDir}}"
|
||||
max_mem_store: 64mb
|
||||
max_file_store: 256mb
|
||||
}
|
||||
mqtt {
|
||||
listen: 127.0.0.1:{{GoMqttPort}}
|
||||
}
|
||||
""";
|
||||
|
||||
_goServer = new GoServerProcess(goConfig);
|
||||
await Task.WhenAll(dotNetTask, _goServer.StartAsync());
|
||||
}
|
||||
else
|
||||
{
|
||||
await dotNetTask;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
if (_goServer is not null)
|
||||
await _goServer.DisposeAsync();
|
||||
if (_dotNetServer is not null)
|
||||
await _dotNetServer.DisposeAsync();
|
||||
|
||||
CleanupDir(_goStoreDir);
|
||||
CleanupDir(_dotNetStoreDir);
|
||||
}
|
||||
|
||||
public NatsConnection CreateGoNatsClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{GoNatsPort}" });
|
||||
|
||||
public NatsConnection CreateDotNetNatsClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetNatsPort}" });
|
||||
|
||||
private static void CleanupDir(string? dir)
|
||||
{
|
||||
if (dir is not null && Directory.Exists(dir))
|
||||
{
|
||||
try { Directory.Delete(dir, recursive: true); }
|
||||
catch { /* best-effort cleanup */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using NATS.Client.Core;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Starts both a Go and .NET NATS server with TLS enabled for transport overhead benchmarks.
|
||||
/// Shared across all tests in the "Benchmark-Tls" collection.
|
||||
/// </summary>
|
||||
public sealed class TlsServerFixture : IAsyncLifetime
|
||||
{
|
||||
private GoServerProcess? _goServer;
|
||||
private DotNetServerProcess? _dotNetServer;
|
||||
private string? _tempDir;
|
||||
|
||||
public int GoPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started");
|
||||
public int DotNetPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started");
|
||||
public bool GoAvailable => _goServer is not null;
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_tempDir = Path.Combine(Path.GetTempPath(), $"nats-bench-tls-{Guid.NewGuid():N}");
|
||||
Directory.CreateDirectory(_tempDir);
|
||||
|
||||
var caCertPath = Path.Combine(_tempDir, "ca.pem");
|
||||
var serverCertPath = Path.Combine(_tempDir, "server-cert.pem");
|
||||
var serverKeyPath = Path.Combine(_tempDir, "server-key.pem");
|
||||
|
||||
GenerateCertificates(caCertPath, serverCertPath, serverKeyPath);
|
||||
|
||||
var config = $$"""
|
||||
tls {
|
||||
cert_file: "{{serverCertPath}}"
|
||||
key_file: "{{serverKeyPath}}"
|
||||
ca_file: "{{caCertPath}}"
|
||||
}
|
||||
""";
|
||||
|
||||
_dotNetServer = new DotNetServerProcess(config);
|
||||
var dotNetTask = _dotNetServer.StartAsync();
|
||||
|
||||
if (GoServerProcess.IsAvailable())
|
||||
{
|
||||
_goServer = new GoServerProcess(config);
|
||||
await Task.WhenAll(dotNetTask, _goServer.StartAsync());
|
||||
}
|
||||
else
|
||||
{
|
||||
await dotNetTask;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
if (_goServer is not null)
|
||||
await _goServer.DisposeAsync();
|
||||
if (_dotNetServer is not null)
|
||||
await _dotNetServer.DisposeAsync();
|
||||
|
||||
if (_tempDir is not null && Directory.Exists(_tempDir))
|
||||
{
|
||||
try { Directory.Delete(_tempDir, recursive: true); }
|
||||
catch { /* best-effort cleanup */ }
|
||||
}
|
||||
}
|
||||
|
||||
public NatsConnection CreateGoTlsClient()
|
||||
=> CreateTlsClient(GoPort);
|
||||
|
||||
public NatsConnection CreateDotNetTlsClient()
|
||||
=> CreateTlsClient(DotNetPort);
|
||||
|
||||
private static NatsConnection CreateTlsClient(int port)
|
||||
{
|
||||
var opts = new NatsOpts
|
||||
{
|
||||
Url = $"nats://127.0.0.1:{port}",
|
||||
TlsOpts = new NatsTlsOpts
|
||||
{
|
||||
Mode = TlsMode.Require,
|
||||
InsecureSkipVerify = true,
|
||||
},
|
||||
};
|
||||
return new NatsConnection(opts);
|
||||
}
|
||||
|
||||
private static void GenerateCertificates(string caCertPath, string serverCertPath, string serverKeyPath)
|
||||
{
|
||||
using var caKey = RSA.Create(2048);
|
||||
var caReq = new CertificateRequest(
|
||||
"CN=Benchmark Test CA",
|
||||
caKey,
|
||||
HashAlgorithmName.SHA256,
|
||||
RSASignaturePadding.Pkcs1);
|
||||
caReq.CertificateExtensions.Add(
|
||||
new X509BasicConstraintsExtension(certificateAuthority: true, hasPathLengthConstraint: false, pathLengthConstraint: 0, critical: true));
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
using var caCert = caReq.CreateSelfSigned(now.AddMinutes(-5), now.AddDays(1));
|
||||
|
||||
using var serverKey = RSA.Create(2048);
|
||||
var serverReq = new CertificateRequest(
|
||||
"CN=localhost",
|
||||
serverKey,
|
||||
HashAlgorithmName.SHA256,
|
||||
RSASignaturePadding.Pkcs1);
|
||||
|
||||
var sanBuilder = new SubjectAlternativeNameBuilder();
|
||||
sanBuilder.AddIpAddress(System.Net.IPAddress.Loopback);
|
||||
sanBuilder.AddDnsName("localhost");
|
||||
serverReq.CertificateExtensions.Add(sanBuilder.Build());
|
||||
serverReq.CertificateExtensions.Add(
|
||||
new X509BasicConstraintsExtension(certificateAuthority: false, hasPathLengthConstraint: false, pathLengthConstraint: 0, critical: false));
|
||||
|
||||
using var serverCert = serverReq.Create(caCert, now.AddMinutes(-5), now.AddDays(1), [1, 2, 3, 4]);
|
||||
|
||||
File.WriteAllText(caCertPath, caCert.ExportCertificatePem());
|
||||
File.WriteAllText(serverCertPath, serverCert.ExportCertificatePem());
|
||||
File.WriteAllText(serverKeyPath, serverKey.ExportRSAPrivateKeyPem());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
using NATS.Client.Core;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Starts both a Go and .NET NATS server with WebSocket enabled for transport overhead benchmarks.
|
||||
/// Shared across all tests in the "Benchmark-WebSocket" collection.
|
||||
/// </summary>
|
||||
public sealed class WebSocketServerFixture : IAsyncLifetime
|
||||
{
|
||||
private GoServerProcess? _goServer;
|
||||
private DotNetServerProcess? _dotNetServer;
|
||||
|
||||
public int GoNatsPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started");
|
||||
public int GoWsPort { get; private set; }
|
||||
public int DotNetNatsPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started");
|
||||
public int DotNetWsPort { get; private set; }
|
||||
public bool GoAvailable => _goServer is not null;
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
DotNetWsPort = PortAllocator.AllocateFreePort();
|
||||
|
||||
var dotNetConfig = $$"""
|
||||
websocket {
|
||||
listen: 127.0.0.1:{{DotNetWsPort}}
|
||||
no_tls: true
|
||||
}
|
||||
""";
|
||||
|
||||
_dotNetServer = new DotNetServerProcess(dotNetConfig);
|
||||
var dotNetTask = _dotNetServer.StartAsync();
|
||||
|
||||
if (GoServerProcess.IsAvailable())
|
||||
{
|
||||
GoWsPort = PortAllocator.AllocateFreePort();
|
||||
|
||||
var goConfig = $$"""
|
||||
websocket {
|
||||
listen: 127.0.0.1:{{GoWsPort}}
|
||||
no_tls: true
|
||||
}
|
||||
""";
|
||||
|
||||
_goServer = new GoServerProcess(goConfig);
|
||||
await Task.WhenAll(dotNetTask, _goServer.StartAsync());
|
||||
}
|
||||
else
|
||||
{
|
||||
await dotNetTask;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
if (_goServer is not null)
|
||||
await _goServer.DisposeAsync();
|
||||
if (_dotNetServer is not null)
|
||||
await _dotNetServer.DisposeAsync();
|
||||
}
|
||||
|
||||
public NatsConnection CreateGoNatsClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{GoNatsPort}" });
|
||||
|
||||
public NatsConnection CreateDotNetNatsClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetNatsPort}" });
|
||||
}
|
||||
184
tests/NATS.Server.Benchmark.Tests/Mqtt/MqttThroughputTests.cs
Normal file
184
tests/NATS.Server.Benchmark.Tests/Mqtt/MqttThroughputTests.cs
Normal file
@@ -0,0 +1,184 @@
|
||||
using MQTTnet;
|
||||
using MQTTnet.Client;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Server.Benchmark.Tests.Harness;
|
||||
using NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Mqtt;
|
||||
|
||||
[Collection("Benchmark-Mqtt")]
|
||||
public class MqttThroughputTests(MqttServerFixture fixture, ITestOutputHelper output)
|
||||
{
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task MqttPubSub_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
const int messageCount = 5_000;
|
||||
|
||||
var dotnetResult = await RunMqttPubSub("MQTT PubSub (128B)", "DotNet", fixture.DotNetMqttPort, payloadSize, messageCount);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunMqttPubSub("MQTT PubSub (128B)", "Go", fixture.GoMqttPort, payloadSize, messageCount);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task MqttCrossProtocol_NatsPub_MqttSub_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
const int messageCount = 5_000;
|
||||
|
||||
var dotnetResult = await RunCrossProtocol("Cross-Protocol NATS→MQTT (128B)", "DotNet", fixture.DotNetMqttPort, fixture.CreateDotNetNatsClient, payloadSize, messageCount);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunCrossProtocol("Cross-Protocol NATS→MQTT (128B)", "Go", fixture.GoMqttPort, fixture.CreateGoNatsClient, payloadSize, messageCount);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunMqttPubSub(string name, string serverType, int mqttPort, int payloadSize, int messageCount)
|
||||
{
|
||||
var payload = new byte[payloadSize];
|
||||
var topic = $"bench/mqtt/pubsub/{Guid.NewGuid():N}";
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
|
||||
|
||||
var factory = new MqttFactory();
|
||||
using var subscriber = factory.CreateMqttClient();
|
||||
using var publisher = factory.CreateMqttClient();
|
||||
|
||||
var subOpts = new MqttClientOptionsBuilder()
|
||||
.WithTcpServer("127.0.0.1", mqttPort)
|
||||
.WithClientId($"bench-sub-{Guid.NewGuid():N}")
|
||||
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
|
||||
.Build();
|
||||
|
||||
var pubOpts = new MqttClientOptionsBuilder()
|
||||
.WithTcpServer("127.0.0.1", mqttPort)
|
||||
.WithClientId($"bench-pub-{Guid.NewGuid():N}")
|
||||
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
|
||||
.Build();
|
||||
|
||||
await subscriber.ConnectAsync(subOpts, cts.Token);
|
||||
await publisher.ConnectAsync(pubOpts, cts.Token);
|
||||
|
||||
var received = 0;
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
subscriber.ApplicationMessageReceivedAsync += _ =>
|
||||
{
|
||||
if (Interlocked.Increment(ref received) >= messageCount)
|
||||
tcs.TrySetResult();
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await subscriber.SubscribeAsync(
|
||||
factory.CreateSubscribeOptionsBuilder()
|
||||
.WithTopicFilter(topic)
|
||||
.Build(),
|
||||
cts.Token);
|
||||
|
||||
await Task.Delay(200, cts.Token);
|
||||
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
for (var i = 0; i < messageCount; i++)
|
||||
{
|
||||
await publisher.PublishAsync(
|
||||
new MqttApplicationMessageBuilder()
|
||||
.WithTopic(topic)
|
||||
.WithPayload(payload)
|
||||
.Build(),
|
||||
cts.Token);
|
||||
}
|
||||
|
||||
await tcs.Task.WaitAsync(cts.Token);
|
||||
sw.Stop();
|
||||
|
||||
await subscriber.DisconnectAsync(cancellationToken: cts.Token);
|
||||
await publisher.DisconnectAsync(cancellationToken: cts.Token);
|
||||
|
||||
return new BenchmarkResult
|
||||
{
|
||||
Name = name,
|
||||
ServerType = serverType,
|
||||
TotalMessages = messageCount,
|
||||
TotalBytes = (long)messageCount * payloadSize,
|
||||
Duration = sw.Elapsed,
|
||||
};
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunCrossProtocol(string name, string serverType, int mqttPort, Func<NatsConnection> createNatsClient, int payloadSize, int messageCount)
|
||||
{
|
||||
var payload = new byte[payloadSize];
|
||||
var natsSubject = $"bench.mqtt.cross.{Guid.NewGuid():N}";
|
||||
var mqttTopic = natsSubject.Replace('.', '/');
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
|
||||
|
||||
var factory = new MqttFactory();
|
||||
using var mqttSub = factory.CreateMqttClient();
|
||||
|
||||
var subOpts = new MqttClientOptionsBuilder()
|
||||
.WithTcpServer("127.0.0.1", mqttPort)
|
||||
.WithClientId($"bench-cross-sub-{Guid.NewGuid():N}")
|
||||
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
|
||||
.Build();
|
||||
|
||||
await mqttSub.ConnectAsync(subOpts, cts.Token);
|
||||
|
||||
var received = 0;
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
mqttSub.ApplicationMessageReceivedAsync += _ =>
|
||||
{
|
||||
if (Interlocked.Increment(ref received) >= messageCount)
|
||||
tcs.TrySetResult();
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await mqttSub.SubscribeAsync(
|
||||
factory.CreateSubscribeOptionsBuilder()
|
||||
.WithTopicFilter(mqttTopic)
|
||||
.Build(),
|
||||
cts.Token);
|
||||
|
||||
await Task.Delay(200, cts.Token);
|
||||
|
||||
await using var natsPub = createNatsClient();
|
||||
await natsPub.ConnectAsync();
|
||||
await natsPub.PingAsync(cts.Token);
|
||||
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
for (var i = 0; i < messageCount; i++)
|
||||
await natsPub.PublishAsync(natsSubject, payload, cancellationToken: cts.Token);
|
||||
await natsPub.PingAsync(cts.Token);
|
||||
|
||||
await tcs.Task.WaitAsync(cts.Token);
|
||||
sw.Stop();
|
||||
|
||||
await mqttSub.DisconnectAsync(cancellationToken: cts.Token);
|
||||
|
||||
return new BenchmarkResult
|
||||
{
|
||||
Name = name,
|
||||
ServerType = serverType,
|
||||
TotalMessages = messageCount,
|
||||
TotalBytes = (long)messageCount * payloadSize,
|
||||
Duration = sw.Elapsed,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@
|
||||
<PackageReference Include="coverlet.collector" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||
<PackageReference Include="NATS.Client.Core" />
|
||||
<PackageReference Include="MQTTnet" />
|
||||
<PackageReference Include="NATS.Client.JetStream" />
|
||||
<PackageReference Include="Shouldly" />
|
||||
<PackageReference Include="xunit" />
|
||||
|
||||
@@ -25,6 +25,12 @@ dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&Fully
|
||||
# JetStream only
|
||||
dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~JetStream" -v normal
|
||||
|
||||
# MQTT benchmarks
|
||||
dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~Mqtt" -v normal
|
||||
|
||||
# Transport benchmarks (TLS + WebSocket)
|
||||
dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~Transport" -v normal
|
||||
|
||||
# A single benchmark by name
|
||||
dotnet test tests/NATS.Server.Benchmark.Tests --filter "FullyQualifiedName=NATS.Server.Benchmark.Tests.CorePubSub.SinglePublisherThroughputTests.PubNoSub_16B" -v normal
|
||||
```
|
||||
@@ -50,6 +56,12 @@ Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison
|
||||
| `FileStoreAppendBenchmarks` | `FileStore_PurgeEx_Trim_Overhead` | FileStore purge/trim maintenance overhead under repeated updates |
|
||||
| `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput |
|
||||
| `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput |
|
||||
| `MqttThroughputTests` | `MqttPubSub_128B` | MQTT pub/sub throughput, 128-byte payload, QoS 0 |
|
||||
| `MqttThroughputTests` | `MqttCrossProtocol_NatsPub_MqttSub_128B` | Cross-protocol NATS→MQTT routing throughput |
|
||||
| `TlsPubSubTests` | `TlsPubSub1To1_128B` | TLS pub/sub 1:1 throughput, 128-byte payload |
|
||||
| `TlsPubSubTests` | `TlsPubNoSub_128B` | TLS publish-only throughput, 128-byte payload |
|
||||
| `WebSocketPubSubTests` | `WsPubSub1To1_128B` | WebSocket pub/sub 1:1 throughput, 128-byte payload |
|
||||
| `WebSocketPubSubTests` | `WsPubNoSub_128B` | WebSocket publish-only throughput, 128-byte payload |
|
||||
|
||||
## Output Format
|
||||
|
||||
@@ -97,6 +109,9 @@ Infrastructure/
|
||||
GoServerProcess.cs # Builds + launches golang/nats-server
|
||||
CoreServerPairFixture.cs # IAsyncLifetime: Go + .NET servers for core tests
|
||||
JetStreamServerPairFixture # IAsyncLifetime: Go + .NET servers with JetStream
|
||||
MqttServerFixture.cs # IAsyncLifetime: .NET server with MQTT + JetStream
|
||||
TlsServerFixture.cs # IAsyncLifetime: .NET server with TLS
|
||||
WebSocketServerFixture.cs # IAsyncLifetime: .NET server with WebSocket
|
||||
Collections.cs # xUnit collection definitions
|
||||
|
||||
Harness/
|
||||
|
||||
116
tests/NATS.Server.Benchmark.Tests/Transport/TlsPubSubTests.cs
Normal file
116
tests/NATS.Server.Benchmark.Tests/Transport/TlsPubSubTests.cs
Normal file
@@ -0,0 +1,116 @@
|
||||
using NATS.Client.Core;
|
||||
using NATS.Server.Benchmark.Tests.Harness;
|
||||
using NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Transport;
|
||||
|
||||
[Collection("Benchmark-Tls")]
|
||||
public class TlsPubSubTests(TlsServerFixture fixture, ITestOutputHelper output)
|
||||
{
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task TlsPubSub1To1_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
const int messageCount = 10_000;
|
||||
|
||||
var dotnetResult = await RunTlsPubSub("TLS PubSub 1:1 (128B)", "DotNet", fixture.CreateDotNetTlsClient, payloadSize, messageCount);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunTlsPubSub("TLS PubSub 1:1 (128B)", "Go", fixture.CreateGoTlsClient, payloadSize, messageCount);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task TlsPubNoSub_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
|
||||
var dotnetResult = await RunTlsPubOnly("TLS Pub-Only (128B)", "DotNet", fixture.CreateDotNetTlsClient, payloadSize);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunTlsPubOnly("TLS Pub-Only (128B)", "Go", fixture.CreateGoTlsClient, payloadSize);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunTlsPubSub(string name, string serverType, Func<NatsConnection> createClient, int payloadSize, int messageCount)
|
||||
{
|
||||
var payload = new byte[payloadSize];
|
||||
var subject = $"bench.tls.pubsub.{Guid.NewGuid():N}";
|
||||
|
||||
await using var pubClient = createClient();
|
||||
await using var subClient = createClient();
|
||||
await pubClient.ConnectAsync();
|
||||
await subClient.ConnectAsync();
|
||||
|
||||
var received = 0;
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var sub = await subClient.SubscribeCoreAsync<byte[]>(subject);
|
||||
|
||||
await subClient.PingAsync();
|
||||
await pubClient.PingAsync();
|
||||
|
||||
var subTask = Task.Run(async () =>
|
||||
{
|
||||
await foreach (var msg in sub.Msgs.ReadAllAsync())
|
||||
{
|
||||
if (Interlocked.Increment(ref received) >= messageCount)
|
||||
{
|
||||
tcs.TrySetResult();
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
for (var i = 0; i < messageCount; i++)
|
||||
await pubClient.PublishAsync(subject, payload);
|
||||
await pubClient.PingAsync();
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
|
||||
await tcs.Task.WaitAsync(cts.Token);
|
||||
sw.Stop();
|
||||
|
||||
await sub.UnsubscribeAsync();
|
||||
|
||||
return new BenchmarkResult
|
||||
{
|
||||
Name = name,
|
||||
ServerType = serverType,
|
||||
TotalMessages = messageCount,
|
||||
TotalBytes = (long)messageCount * payloadSize,
|
||||
Duration = sw.Elapsed,
|
||||
};
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunTlsPubOnly(string name, string serverType, Func<NatsConnection> createClient, int payloadSize)
|
||||
{
|
||||
var subject = $"bench.tls.pubonly.{Guid.NewGuid():N}";
|
||||
|
||||
await using var client = createClient();
|
||||
await client.ConnectAsync();
|
||||
|
||||
var runner = new BenchmarkRunner { WarmupCount = 1_000, MeasurementCount = 100_000 };
|
||||
|
||||
return await runner.MeasureThroughputAsync(
|
||||
name,
|
||||
serverType,
|
||||
payloadSize,
|
||||
async _ => await client.PublishAsync(subject, new byte[payloadSize]));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Server.Benchmark.Tests.Harness;
|
||||
using NATS.Server.Benchmark.Tests.Infrastructure;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace NATS.Server.Benchmark.Tests.Transport;
|
||||
|
||||
[Collection("Benchmark-WebSocket")]
|
||||
public class WebSocketPubSubTests(WebSocketServerFixture fixture, ITestOutputHelper output)
|
||||
{
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task WsPubSub1To1_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
const int messageCount = 5_000;
|
||||
|
||||
var dotnetResult = await RunWsPubSub("WebSocket PubSub 1:1 (128B)", "DotNet", fixture.DotNetWsPort, fixture.CreateDotNetNatsClient, payloadSize, messageCount);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunWsPubSub("WebSocket PubSub 1:1 (128B)", "Go", fixture.GoWsPort, fixture.CreateGoNatsClient, payloadSize, messageCount);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Category", "Benchmark")]
|
||||
public async Task WsPubNoSub_128B()
|
||||
{
|
||||
const int payloadSize = 128;
|
||||
const int messageCount = 10_000;
|
||||
|
||||
var dotnetResult = await RunWsPubOnly("WebSocket Pub-Only (128B)", "DotNet", fixture.DotNetWsPort, payloadSize, messageCount);
|
||||
|
||||
if (fixture.GoAvailable)
|
||||
{
|
||||
var goResult = await RunWsPubOnly("WebSocket Pub-Only (128B)", "Go", fixture.GoWsPort, payloadSize, messageCount);
|
||||
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
||||
}
|
||||
else
|
||||
{
|
||||
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunWsPubSub(string name, string serverType, int wsPort, Func<NatsConnection> createNatsClient, int payloadSize, int messageCount)
|
||||
{
|
||||
var payload = new byte[payloadSize];
|
||||
var subject = $"bench.ws.pubsub.{Guid.NewGuid():N}";
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
|
||||
|
||||
using var ws = new ClientWebSocket();
|
||||
await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{wsPort}"), cts.Token);
|
||||
|
||||
var reader = new WsLineReader(ws);
|
||||
|
||||
// Read INFO
|
||||
await reader.ReadLineAsync(cts.Token);
|
||||
|
||||
// Send CONNECT + SUB + PING
|
||||
await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token);
|
||||
await WsSend(ws, $"SUB {subject} 1\r\n", cts.Token);
|
||||
await WsSend(ws, "PING\r\n", cts.Token);
|
||||
await WaitForPong(reader, cts.Token);
|
||||
|
||||
// NATS publisher
|
||||
await using var natsPub = createNatsClient();
|
||||
await natsPub.ConnectAsync();
|
||||
await natsPub.PingAsync(cts.Token);
|
||||
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
|
||||
for (var i = 0; i < messageCount; i++)
|
||||
await natsPub.PublishAsync(subject, payload, cancellationToken: cts.Token);
|
||||
await natsPub.PingAsync(cts.Token);
|
||||
|
||||
// Read all MSG responses from WebSocket
|
||||
var received = 0;
|
||||
while (received < messageCount)
|
||||
{
|
||||
var line = await reader.ReadLineAsync(cts.Token);
|
||||
if (line.StartsWith("MSG ", StringComparison.Ordinal))
|
||||
{
|
||||
await reader.ReadLineAsync(cts.Token);
|
||||
received++;
|
||||
}
|
||||
}
|
||||
|
||||
sw.Stop();
|
||||
|
||||
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token);
|
||||
|
||||
return new BenchmarkResult
|
||||
{
|
||||
Name = name,
|
||||
ServerType = serverType,
|
||||
TotalMessages = messageCount,
|
||||
TotalBytes = (long)messageCount * payloadSize,
|
||||
Duration = sw.Elapsed,
|
||||
};
|
||||
}
|
||||
|
||||
private static async Task<BenchmarkResult> RunWsPubOnly(string name, string serverType, int wsPort, int payloadSize, int messageCount)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
|
||||
|
||||
using var ws = new ClientWebSocket();
|
||||
await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{wsPort}"), cts.Token);
|
||||
|
||||
var reader = new WsLineReader(ws);
|
||||
|
||||
// Read INFO
|
||||
await reader.ReadLineAsync(cts.Token);
|
||||
|
||||
// Send CONNECT
|
||||
await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token);
|
||||
await WsSend(ws, "PING\r\n", cts.Token);
|
||||
await WaitForPong(reader, cts.Token);
|
||||
|
||||
// Build a PUB command with raw binary payload
|
||||
var subject = $"bench.ws.pubonly.{Guid.NewGuid():N}";
|
||||
var pubLine = $"PUB {subject} {payloadSize}\r\n";
|
||||
var pubPayload = new byte[payloadSize];
|
||||
var pubCmd = Encoding.ASCII.GetBytes(pubLine)
|
||||
.Concat(pubPayload)
|
||||
.Concat(Encoding.ASCII.GetBytes("\r\n"))
|
||||
.ToArray();
|
||||
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
for (var i = 0; i < messageCount; i++)
|
||||
await ws.SendAsync(pubCmd, WebSocketMessageType.Binary, true, cts.Token);
|
||||
|
||||
// Flush with PING/PONG
|
||||
await WsSend(ws, "PING\r\n", cts.Token);
|
||||
await WaitForPong(reader, cts.Token);
|
||||
sw.Stop();
|
||||
|
||||
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token);
|
||||
|
||||
return new BenchmarkResult
|
||||
{
|
||||
Name = name,
|
||||
ServerType = serverType,
|
||||
TotalMessages = messageCount,
|
||||
TotalBytes = (long)messageCount * payloadSize,
|
||||
Duration = sw.Elapsed,
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads lines until PONG is received, skipping any INFO lines
|
||||
/// (Go server sends a second INFO after CONNECT with connect_info:true).
|
||||
/// </summary>
|
||||
private static async Task WaitForPong(WsLineReader reader, CancellationToken ct)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var line = await reader.ReadLineAsync(ct);
|
||||
if (line == "PONG")
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task WsSend(ClientWebSocket ws, string data, CancellationToken ct)
|
||||
{
|
||||
var bytes = Encoding.ASCII.GetBytes(data);
|
||||
await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Buffers incoming WebSocket frames and returns one NATS protocol line at a time.
|
||||
/// </summary>
|
||||
private sealed class WsLineReader(ClientWebSocket ws)
|
||||
{
|
||||
private readonly byte[] _recvBuffer = new byte[65536];
|
||||
private readonly StringBuilder _pending = new();
|
||||
|
||||
public async Task<string> ReadLineAsync(CancellationToken ct)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var full = _pending.ToString();
|
||||
var crlfIdx = full.IndexOf("\r\n", StringComparison.Ordinal);
|
||||
if (crlfIdx >= 0)
|
||||
{
|
||||
var line = full[..crlfIdx];
|
||||
_pending.Clear();
|
||||
_pending.Append(full[(crlfIdx + 2)..]);
|
||||
return line;
|
||||
}
|
||||
|
||||
var result = await ws.ReceiveAsync(_recvBuffer, ct);
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
throw new InvalidOperationException("WebSocket closed unexpectedly while reading");
|
||||
|
||||
var chunk = Encoding.ASCII.GetString(_recvBuffer, 0, result.Count);
|
||||
_pending.Append(chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user