From c06b56172a97cc549ce09deee0a3066a0c85c42e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 05:59:38 -0500 Subject: [PATCH] Add E2E benchmark project and throughput scenarios for Surreal-backed peers --- CBDDC.slnx | 27 +-- .../BenchmarkPeerNode.cs | 176 ++++++++++++++++++ .../E2EThroughputBenchmarks.cs | 135 ++++++++++++++ .../Program.cs | 11 ++ ...ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests.csproj | 25 +++ 5 files changed, 361 insertions(+), 13 deletions(-) create mode 100644 tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs create mode 100644 tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs create mode 100644 tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/Program.cs create mode 100644 tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests.csproj diff --git a/CBDDC.slnx b/CBDDC.slnx index 9c7bc09..5c22ff5 100644 --- a/CBDDC.slnx +++ b/CBDDC.slnx @@ -1,23 +1,24 @@ - - - + + + - + - - - - + + + + - - - - - + + + + + + diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs new file mode 100644 index 0000000..9357578 --- /dev/null +++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs @@ -0,0 +1,176 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.CBDDC.Core.Network; +using ZB.MOM.WW.CBDDC.Network; +using ZB.MOM.WW.CBDDC.Network.Security; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; +using ZB.MOM.WW.CBDDC.Sample.Console; + +namespace ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests; + +internal sealed class BenchmarkPeerNode : IAsyncDisposable +{ + private readonly ICBDDCNode _node; + private readonly ServiceProvider _serviceProvider; + private readonly string _workDir; + private bool _started; + + private BenchmarkPeerNode( + ServiceProvider serviceProvider, + ICBDDCNode node, + SampleDbContext context, + string workDir) + { + _serviceProvider = serviceProvider; + _node = node; + Context = context; + _workDir = workDir; + } + + public SampleDbContext Context { get; } + + public static BenchmarkPeerNode Create( + string nodeId, + int tcpPort, + string authToken, + IReadOnlyList knownPeers) + { + string workDir = Path.Combine(Path.GetTempPath(), $"cbddc-benchmark-{nodeId}-{Guid.NewGuid():N}"); + Directory.CreateDirectory(workDir); + + string dbPath = Path.Combine(workDir, "node.rocksdb"); + string databaseName = nodeId.Replace("-", "_", StringComparison.Ordinal); + + var configurationProvider = new StaticPeerNodeConfigurationProvider(new PeerNodeConfiguration + { + NodeId = nodeId, + TcpPort = tcpPort, + AuthToken = authToken, + KnownPeers = knownPeers.ToList(), + RetryDelayMs = 25, + RetryAttempts = 5 + }); + + var services = new ServiceCollection(); + services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.Warning)); + services.AddSingleton(configurationProvider); + services.AddSingleton(configurationProvider); + services.AddSingleton(); + services.AddSingleton(); + + services.AddCBDDCCore() + .AddCBDDCSurrealEmbedded(_ => new CBDDCSurrealEmbeddedOptions + { + Endpoint = "rocksdb://local", + DatabasePath = dbPath, + Namespace = "cbddc_benchmark", + Database = databaseName, + Cdc = new CBDDCSurrealCdcOptions + { + Enabled = true, + ConsumerId = $"{nodeId}-benchmark", + PollingInterval = TimeSpan.FromMilliseconds(50), + EnableLiveSelectAccelerator = true + } + }) + .AddCBDDCNetwork(false); + + // Benchmark runs use explicit known peers; disable UDP discovery and handshake overhead. + services.AddSingleton(); + services.AddSingleton(); + + ServiceProvider provider = services.BuildServiceProvider(); + ICBDDCNode node = provider.GetRequiredService(); + SampleDbContext context = provider.GetRequiredService(); + + return new BenchmarkPeerNode(provider, node, context, workDir); + } + + public async Task StartAsync() + { + if (_started) return; + await _node.Start(); + _started = true; + } + + public async Task StopAsync() + { + if (!_started) return; + + try + { + await _node.Stop(); + } + catch (ObjectDisposedException) + { + } + catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is ObjectDisposedException)) + { + } + + _started = false; + } + + public async Task UpsertUserAsync(User user) + { + User? existing = Context.Users.Find(u => u.Id == user.Id).FirstOrDefault(); + if (existing == null) + await Context.Users.InsertAsync(user); + else + await Context.Users.UpdateAsync(user); + + await Context.SaveChangesAsync(); + } + + public bool ContainsUser(string userId) + { + return Context.Users.Find(u => u.Id == userId).Any(); + } + + public async ValueTask DisposeAsync() + { + try + { + await StopAsync(); + } + finally + { + _serviceProvider.Dispose(); + TryDeleteDirectory(_workDir); + } + } + + private static void TryDeleteDirectory(string path) + { + if (!Directory.Exists(path)) return; + + for (var attempt = 0; attempt < 5; attempt++) + try + { + Directory.Delete(path, true); + return; + } + catch when (attempt < 4) + { + Thread.Sleep(50); + } + } + + private sealed class PassiveDiscoveryService : IDiscoveryService + { + public IEnumerable GetActivePeers() + { + return Array.Empty(); + } + + public Task Start() + { + return Task.CompletedTask; + } + + public Task Stop() + { + return Task.CompletedTask; + } + } +} diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs new file mode 100644 index 0000000..6a01d65 --- /dev/null +++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs @@ -0,0 +1,135 @@ +using BenchmarkDotNet.Attributes; +using System.Net; +using System.Net.Sockets; +using ZB.MOM.WW.CBDDC.Core.Network; +using ZB.MOM.WW.CBDDC.Sample.Console; + +namespace ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests; + +[MemoryDiagnoser] +[SimpleJob(launchCount: 1, warmupCount: 1, iterationCount: 3)] +public class E2EThroughputBenchmarks +{ + private const int BatchSize = 50; + private BenchmarkPeerNode _nodeA = null!; + private BenchmarkPeerNode _nodeB = null!; + private int _sequence; + + [GlobalSetup] + public async Task GlobalSetupAsync() + { + int nodeAPort = GetAvailableTcpPort(); + int nodeBPort = GetAvailableTcpPort(); + while (nodeBPort == nodeAPort) + nodeBPort = GetAvailableTcpPort(); + + string clusterToken = Guid.NewGuid().ToString("N"); + + _nodeA = BenchmarkPeerNode.Create( + "benchmark-node-a", + nodeAPort, + clusterToken, + [ + new KnownPeerConfiguration + { + NodeId = "benchmark-node-b", + Host = "127.0.0.1", + Port = nodeBPort + } + ]); + + _nodeB = BenchmarkPeerNode.Create( + "benchmark-node-b", + nodeBPort, + clusterToken, + [ + new KnownPeerConfiguration + { + NodeId = "benchmark-node-a", + Host = "127.0.0.1", + Port = nodeAPort + } + ]); + + await _nodeA.StartAsync(); + await _nodeB.StartAsync(); + + // Allow initial network loop to settle before measurements. + await Task.Delay(500); + } + + [GlobalCleanup] + public Task GlobalCleanupAsync() + { + // Explicit Surreal embedded disposal can race native callbacks in benchmark child processes. + // Benchmarks run out-of-process, so process teardown is used for cleanup stability. + return Task.CompletedTask; + } + + [Benchmark(Description = "Local write throughput", OperationsPerInvoke = BatchSize)] + public async Task LocalWriteThroughput() + { + IReadOnlyList userIds = NextUserIds("local"); + foreach (string userId in userIds) + await _nodeA.UpsertUserAsync(CreateUser(userId)); + } + + [Benchmark(Description = "Cross-node replicated throughput", OperationsPerInvoke = BatchSize)] + public async Task ReplicatedWriteThroughput() + { + IReadOnlyList userIds = NextUserIds("replicated"); + foreach (string userId in userIds) + await _nodeA.UpsertUserAsync(CreateUser(userId)); + + await WaitForReplicationAsync(userIds, TimeSpan.FromSeconds(30)); + } + + private IReadOnlyList NextUserIds(string prefix) + { + int start = Interlocked.Add(ref _sequence, BatchSize) - BatchSize; + string[] ids = new string[BatchSize]; + for (var i = 0; i < BatchSize; i++) + ids[i] = $"{prefix}-{start + i:D8}"; + + return ids; + } + + private static User CreateUser(string userId) + { + return new User + { + Id = userId, + Name = $"user-{userId}", + Age = 30, + Address = new Address { City = "BenchmarkCity" } + }; + } + + private async Task WaitForReplicationAsync(IReadOnlyList userIds, TimeSpan timeout) + { + DateTime deadline = DateTime.UtcNow.Add(timeout); + while (DateTime.UtcNow < deadline) + { + bool allPresent = true; + foreach (string userId in userIds) + if (!_nodeB.ContainsUser(userId)) + { + allPresent = false; + break; + } + + if (allPresent) return; + + await Task.Delay(25); + } + + throw new TimeoutException($"Timed out waiting for replication of {userIds.Count} users."); + } + + private static int GetAvailableTcpPort() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + return ((IPEndPoint)listener.LocalEndpoint).Port; + } +} diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/Program.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/Program.cs new file mode 100644 index 0000000..0f257cd --- /dev/null +++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/Program.cs @@ -0,0 +1,11 @@ +using BenchmarkDotNet.Running; + +namespace ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests; + +internal static class Program +{ + private static void Main(string[] args) + { + BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); + } +} diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests.csproj b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests.csproj new file mode 100644 index 0000000..0eb2d51 --- /dev/null +++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests.csproj @@ -0,0 +1,25 @@ + + + + ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests + ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests + ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests + Exe + net10.0 + enable + enable + false + + + + + + + + + + + + + +