using NATS.Client.Core; using NATS.Server.Benchmark.Tests.Harness; using NATS.Server.Benchmark.Tests.Infrastructure; using Xunit.Abstractions; namespace NATS.Server.Benchmark.Tests.CorePubSub; [Collection("Benchmark-Core")] public class MultiPubSubTests(CoreServerPairFixture fixture, ITestOutputHelper output) { [Fact] [Trait("Category", "Benchmark")] public async Task MultiPubSub4x4_128B() { const int payloadSize = 128; const int messagesPerPublisher = 5_000; const int pubCount = 4; const int subCount = 4; var dotnetResult = await RunMultiPubSub("Multi 4Px4S (128B)", "DotNet", payloadSize, messagesPerPublisher, pubCount, subCount, fixture.CreateDotNetClient); if (fixture.GoAvailable) { var goResult = await RunMultiPubSub("Multi 4Px4S (128B)", "Go", payloadSize, messagesPerPublisher, pubCount, subCount, fixture.CreateGoClient); BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); } else { BenchmarkResultWriter.WriteSingle(output, dotnetResult); } } private static async Task RunMultiPubSub( string name, string serverType, int payloadSize, int messagesPerPublisher, int pubCount, int subCount, Func createClient) { var payload = new byte[payloadSize]; var totalMessages = messagesPerPublisher * pubCount; var runId = Guid.NewGuid().ToString("N")[..8]; var subjects = Enumerable.Range(0, pubCount).Select(i => $"bench.multi.{serverType.ToLowerInvariant()}.{runId}.{i}").ToArray(); // Create subscribers — one per subject var subClients = new NatsConnection[subCount]; var subs = new INatsSub[subCount]; var subTasks = new Task[subCount]; var totalReceived = 0; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); for (var i = 0; i < subCount; i++) { subClients[i] = createClient(); await subClients[i].ConnectAsync(); subs[i] = await subClients[i].SubscribeCoreAsync(subjects[i % subjects.Length]); } // Flush to ensure all subscriptions are propagated foreach (var client in subClients) await client.PingAsync(); // Start reading for (var i = 0; i < subCount; i++) { var sub = subs[i]; subTasks[i] = Task.Run(async () => { await foreach (var _ in sub.Msgs.ReadAllAsync()) { if (Interlocked.Increment(ref totalReceived) >= totalMessages) { tcs.TrySetResult(); return; } } }); } // Create publishers var pubClients = new NatsConnection[pubCount]; for (var i = 0; i < pubCount; i++) { pubClients[i] = createClient(); await pubClients[i].ConnectAsync(); } var sw = System.Diagnostics.Stopwatch.StartNew(); var pubTasks = new Task[pubCount]; for (var p = 0; p < pubCount; p++) { var client = pubClients[p]; var subject = subjects[p]; pubTasks[p] = Task.Run(async () => { for (var i = 0; i < messagesPerPublisher; i++) await client.PublishAsync(subject, payload); }); } await Task.WhenAll(pubTasks); // Flush all publishers foreach (var client in pubClients) await client.PingAsync(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(120)); await tcs.Task.WaitAsync(cts.Token); sw.Stop(); foreach (var sub in subs) await sub.UnsubscribeAsync(); foreach (var client in subClients) await client.DisposeAsync(); foreach (var client in pubClients) await client.DisposeAsync(); return new BenchmarkResult { Name = name, ServerType = serverType, TotalMessages = totalMessages, TotalBytes = (long)totalMessages * payloadSize, Duration = sw.Elapsed, }; } }