Files
Joseph Doherty 11e01b9026 perf: optimize MQTT cross-protocol path (0.30x → 0.78x Go)
Replace per-message async fire-and-forget with direct-buffer write loop
mirroring NatsClient pattern: SpinLock-guarded buffer append, double-
buffer swap, single WriteAsync per batch.

- MqttConnection: add _directBuf/_writeBuf + RunMqttWriteLoopAsync
- MqttConnection: add EnqueuePublishNoFlush (zero-alloc PUBLISH format)
- MqttPacketWriter: add WritePublishTo(Span<byte>) + MeasurePublish
- MqttTopicMapper: add NatsToMqttBytes with bounded ConcurrentDictionary
- MqttNatsClientAdapter: synchronous SendMessageNoFlush + SignalFlush
- Skip FlushAsync on plain TCP sockets (TCP auto-flushes)
2026-03-13 14:25:13 -04:00

68 lines
2.1 KiB
C#

using NATS.Client.Core;
namespace NATS.Server.Benchmark.Tests.Infrastructure;
/// <summary>
/// Starts both a Go and .NET NATS server with WebSocket enabled for transport overhead benchmarks.
/// Shared across all tests in the "Benchmark-WebSocket" collection.
/// </summary>
public sealed class WebSocketServerFixture : IAsyncLifetime
{
private GoServerProcess? _goServer;
private DotNetServerProcess? _dotNetServer;
public int GoNatsPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started");
public int GoWsPort { get; private set; }
public int DotNetNatsPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started");
public int DotNetWsPort { get; private set; }
public bool GoAvailable => _goServer is not null;
public async Task InitializeAsync()
{
DotNetWsPort = PortAllocator.AllocateFreePort();
var dotNetConfig = $$"""
websocket {
listen: 127.0.0.1:{{DotNetWsPort}}
no_tls: true
}
""";
_dotNetServer = new DotNetServerProcess(dotNetConfig);
var dotNetTask = _dotNetServer.StartAsync();
if (GoServerProcess.IsAvailable())
{
GoWsPort = PortAllocator.AllocateFreePort();
var goConfig = $$"""
websocket {
listen: 127.0.0.1:{{GoWsPort}}
no_tls: true
}
""";
_goServer = new GoServerProcess(goConfig);
await Task.WhenAll(dotNetTask, _goServer.StartAsync());
}
else
{
await dotNetTask;
}
}
public async Task DisposeAsync()
{
if (_goServer is not null)
await _goServer.DisposeAsync();
if (_dotNetServer is not null)
await _dotNetServer.DisposeAsync();
}
public NatsConnection CreateGoNatsClient()
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{GoNatsPort}" });
public NatsConnection CreateDotNetNatsClient()
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetNatsPort}" });
}