using System.Buffers; using System.Diagnostics; using System.Text; using NATS.Server.Protocol; using Xunit.Abstractions; namespace NATS.Server.Benchmark.Tests.Protocol; public class ParserHotPathBenchmarks(ITestOutputHelper output) { [Fact] [Trait("Category", "Benchmark")] public void Parser_PING_Throughput() { var payload = "PING\r\n"u8.ToArray(); MeasureSingleChunk("Parser PING", payload, iterations: 500_000); } [Fact] [Trait("Category", "Benchmark")] public void Parser_PUB_Throughput() { var payload = "PUB bench.subject 16\r\n0123456789ABCDEF\r\n"u8.ToArray(); MeasureSingleChunk("Parser PUB", payload, iterations: 250_000); } [Fact] [Trait("Category", "Benchmark")] public void Parser_HPUB_Throughput() { var payload = "HPUB bench.subject 12 28\r\nNATS/1.0\r\n\r\n0123456789ABCDEF\r\n"u8.ToArray(); MeasureSingleChunk("Parser HPUB", payload, iterations: 200_000); } [Fact] [Trait("Category", "Benchmark")] public void Parser_PUB_SplitPayload_Throughput() { var firstChunk = "PUB bench.subject 16\r\n01234567"u8.ToArray(); var secondChunk = "89ABCDEF\r\n"u8.ToArray(); MeasureSplitPayload("Parser PUB split payload", firstChunk, secondChunk, iterations: 200_000); } private void MeasureSingleChunk(string name, byte[] commandBytes, int iterations) { GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var parser = new NatsParser(); var totalBytes = (long)commandBytes.Length * iterations; var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); var stopwatch = Stopwatch.StartNew(); for (var i = 0; i < iterations; i++) { ReadOnlySequence buffer = new(commandBytes); if (!parser.TryParseView(ref buffer, out var command)) throw new InvalidOperationException($"{name} did not produce a parsed command."); if (command.Type is CommandType.Pub or CommandType.HPub) { var payload = command.GetPayloadMemory(); if (payload.IsEmpty) throw new InvalidOperationException($"{name} produced an empty payload unexpectedly."); } } stopwatch.Stop(); var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - beforeAlloc; WriteResult(name, iterations, totalBytes, stopwatch.Elapsed, allocatedBytes); } private void MeasureSplitPayload(string name, byte[] firstChunkBytes, byte[] secondChunkBytes, int iterations) { GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); var parser = new NatsParser(); var totalBytes = (long)(firstChunkBytes.Length + secondChunkBytes.Length) * iterations; var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); var stopwatch = Stopwatch.StartNew(); for (var i = 0; i < iterations; i++) { ReadOnlySequence firstChunk = new(firstChunkBytes); if (parser.TryParseView(ref firstChunk, out _)) throw new InvalidOperationException($"{name} should wait for the second payload chunk."); ReadOnlySequence secondChunk = CreateSequence(firstChunk.First, secondChunkBytes); if (!parser.TryParseView(ref secondChunk, out var command)) throw new InvalidOperationException($"{name} did not complete after the second payload chunk."); if (command.GetPayloadMemory().Length != 16) throw new InvalidOperationException($"{name} produced the wrong payload length."); } stopwatch.Stop(); var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - beforeAlloc; WriteResult(name, iterations, totalBytes, stopwatch.Elapsed, allocatedBytes); } private void WriteResult(string name, int iterations, long totalBytes, TimeSpan elapsed, long allocatedBytes) { var operationsPerSecond = iterations / elapsed.TotalSeconds; var megabytesPerSecond = totalBytes / elapsed.TotalSeconds / (1024.0 * 1024.0); var bytesPerOperation = allocatedBytes / (double)iterations; output.WriteLine($"=== {name} ==="); output.WriteLine($"Ops: {operationsPerSecond:N0} ops/s"); output.WriteLine($"Data: {megabytesPerSecond:F1} MB/s"); output.WriteLine($"Alloc: {bytesPerOperation:F1} B/op"); output.WriteLine($"Elapsed: {elapsed.TotalMilliseconds:F0} ms"); output.WriteLine(""); } private static ReadOnlySequence CreateSequence(ReadOnlyMemory remainingBytes, byte[] secondChunk) { var first = new BufferSegment(remainingBytes); var second = first.Append(secondChunk); return new ReadOnlySequence(first, 0, second, second.Memory.Length); } private sealed class BufferSegment : ReadOnlySequenceSegment { public BufferSegment(ReadOnlyMemory memory) { Memory = memory; } public BufferSegment Append(ReadOnlyMemory memory) { var next = new BufferSegment(memory) { RunningIndex = RunningIndex + Memory.Length, }; Next = next; return next; } } }