This captures the iterative CommentChecker cleanup plus updated snapshot/report outputs used to validate and benchmark the latest JetStream and transport work.
109 lines
3.6 KiB
C#
109 lines
3.6 KiB
C#
using System.Diagnostics;
|
|
using NATS.Client.Core;
|
|
using NATS.Client.JetStream;
|
|
using NATS.Client.JetStream.Models;
|
|
using NATS.Server.Benchmark.Tests.Harness;
|
|
using NATS.Server.Benchmark.Tests.Infrastructure;
|
|
using Xunit.Abstractions;
|
|
|
|
namespace NATS.Server.Benchmark.Tests.JetStream;
|
|
|
|
[Collection("Benchmark-JetStream")]
|
|
public class OrderedConsumerTests(JetStreamServerPairFixture fixture, ITestOutputHelper output)
|
|
{
|
|
[Fact]
|
|
[Trait("Category", "Benchmark")]
|
|
public async Task JSOrderedConsumer_Throughput()
|
|
{
|
|
const int payloadSize = 128;
|
|
const int messageCount = 200_000;
|
|
|
|
BenchmarkResult? dotnetResult = null;
|
|
try
|
|
{
|
|
dotnetResult = await RunOrderedConsume("JS Ordered Consumer (128B)", "DotNet", payloadSize, messageCount, fixture.CreateDotNetClient);
|
|
}
|
|
catch (Exception ex) when (ex.GetType().Name.Contains("NatsJS"))
|
|
{
|
|
output.WriteLine($"[DotNet] Ordered consumer not fully supported: {ex.Message}");
|
|
}
|
|
|
|
if (fixture.GoAvailable)
|
|
{
|
|
var goResult = await RunOrderedConsume("JS Ordered Consumer (128B)", "Go", payloadSize, messageCount, fixture.CreateGoClient);
|
|
if (dotnetResult is not null)
|
|
BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult);
|
|
else
|
|
BenchmarkResultWriter.WriteSingle(output, goResult);
|
|
}
|
|
else if (dotnetResult is not null)
|
|
{
|
|
BenchmarkResultWriter.WriteSingle(output, dotnetResult);
|
|
}
|
|
}
|
|
|
|
private static async Task<BenchmarkResult> RunOrderedConsume(string name, string serverType, int payloadSize, int messageCount, Func<NatsConnection> createClient)
|
|
{
|
|
var payload = new byte[payloadSize];
|
|
var streamName = $"BENCH_ORD_{serverType.ToUpperInvariant()}_{Guid.NewGuid():N}"[..30];
|
|
var subject = $"bench.js.ordered.{serverType.ToLowerInvariant()}";
|
|
|
|
await using var nats = createClient();
|
|
await nats.ConnectAsync();
|
|
var js = new NatsJSContext(nats);
|
|
|
|
await js.CreateStreamAsync(new StreamConfig(streamName, [subject])
|
|
{
|
|
Storage = StreamConfigStorage.Memory,
|
|
Retention = StreamConfigRetention.Limits,
|
|
MaxMsgs = 10_000_000,
|
|
});
|
|
|
|
try
|
|
{
|
|
// Pre-populate stream
|
|
var pubTasks = new List<ValueTask<PubAckResponse>>(1000);
|
|
for (var i = 0; i < messageCount; i++)
|
|
{
|
|
pubTasks.Add(js.PublishAsync(subject, payload));
|
|
if (pubTasks.Count >= 1000)
|
|
{
|
|
foreach (var t in pubTasks)
|
|
await t;
|
|
pubTasks.Clear();
|
|
}
|
|
}
|
|
|
|
foreach (var t in pubTasks)
|
|
await t;
|
|
|
|
// Consume via ordered consumer
|
|
var consumer = await js.CreateOrderedConsumerAsync(streamName);
|
|
var received = 0;
|
|
|
|
var sw = Stopwatch.StartNew();
|
|
await foreach (var msg in consumer.ConsumeAsync<byte[]>())
|
|
{
|
|
received++;
|
|
if (received >= messageCount)
|
|
break;
|
|
}
|
|
|
|
sw.Stop();
|
|
|
|
return new BenchmarkResult
|
|
{
|
|
Name = name,
|
|
ServerType = serverType,
|
|
TotalMessages = received,
|
|
TotalBytes = (long)received * payloadSize,
|
|
Duration = sw.Elapsed,
|
|
};
|
|
}
|
|
finally
|
|
{
|
|
await js.DeleteStreamAsync(streamName);
|
|
}
|
|
}
|
|
}
|