From 1cd51bbda3947e26d26ff9e6d88e4b3906efe7d7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 06:30:24 -0400 Subject: [PATCH] .NET CLI: bench-stream-events for max event-throughput characterization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New subcommand drives the gateway''s StreamEvents server-stream as fast as it can from a single client process. Subscribes to --bulk-size tags (rotating through all six TestMachine attributes by default) and counts events received over a --duration-seconds steady-state window. Tracks events/sec, end-to-end latency (now - event.worker_timestamp), and any worker faults observed via a post-run DrainEvents probe. --session-count opens N independent gateway sessions from the same client process — each session is independent at the gateway (own worker, own event subscriber, own item handles) so this measures how the gateway multiplexes concurrent event streams without needing multiple client processes. Sessions are staggered open by default (--session-start-stagger-ms 750) because firing N concurrent OpenSession calls forces N concurrent worker x86 spawns, and on a dev rig that exceeds the gateway''s 30-second worker startup timeout around N >= 6-8. The stagger gives each worker headroom to init its COM apartment + attach the event sink before the next one starts. Phase 1 of the bench opens + subscribes every session sequentially; phase 2 opens the steady-state window once everyone is advised, so the measurement isn''t skewed by late-arriving sessions still in warmup. The latency sample is shared across sessions (locked List); event counts use Interlocked. Initial sweep at --bulk-size 120 against the dev galaxy (20 machines x 6 attributes = 120 unique tags) showed: - Linear throughput scaling with subscribed-tag count: N=6→2 ev/s, N=24→8 ev/s, N=60→20 ev/s, N=120→41 ev/s. The dev galaxy is producer-bound at ~0.34 events/sec per advised tag — gateway has plenty of headroom. - Latency stayed at p50 ≈17ms, p95 ≈34ms across the entire range — no degradation with subscribed-tag count. - Zero queue-overflow faults; gateway 10k-event buffer never came close to filling at this producer rate. - Linear scaling with session count too (staggered open): 1→44, 2→81, 4→130, 8→324 events/sec at p50 16ms across all session counts. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MxGatewayClientCli.cs | 273 ++++++++++++++++++ 1 file changed, 273 insertions(+) diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index d9ec103..c1cae7f 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -113,6 +113,8 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "bench-stream-events" => await BenchStreamEventsAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) @@ -787,6 +789,276 @@ public static class MxGatewayClientCli } } + /// + /// Single-client event-rate stress benchmark. Opens its own session, + /// subscribes to --bulk-size tags spanning the dev galaxy's + /// TestMachine_NNN.* set, then drains the gateway's StreamEvents + /// server-stream as fast as it can for --duration-seconds. + /// Tracks events received per second, end-to-end latency from + /// event.worker_timestamp to client receive time, and any + /// worker faults emitted by the gateway over the same window. + /// + /// The companion --all-attributes flag (default true) + /// subscribes to all six TestMachine attributes per machine number + /// so the bench can drive event volume past one-attribute-per-machine + /// when the dev galaxy's TestChangingInt is the only churning tag. + /// + /// + private static async Task BenchStreamEventsAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + int durationSeconds = arguments.GetInt32("duration-seconds", 30); + int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); + int bulkSize = arguments.GetInt32("bulk-size", 60); + int tagStart = arguments.GetInt32("tag-start", 1); + int sessionCount = arguments.GetInt32("session-count", 1); + // Concurrent OpenSession calls all the way up the stack force the + // gateway to spawn N x86 workers simultaneously; that hits the + // 30-second worker startup timeout around 6-8 concurrent opens on + // a dev rig. The stagger spreads the opens out so per-session + // OpenSession+Register completes inside that budget before the + // next session starts spawning. The steady-state event window + // begins only once all sessions are open and subscribed. + int sessionStartStaggerMs = arguments.GetInt32("session-start-stagger-ms", sessionCount > 1 ? 750 : 0); + string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; + bool allAttributes = !arguments.HasFlag("single-attribute"); + string singleAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; + string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench-events"; + + if (sessionCount < 1) + { + throw new ArgumentException("--session-count must be >= 1."); + } + + // Build the tag set. With --all-attributes (default) we rotate through + // all six TestMachine attributes to drive more event volume from a + // smaller machine range; with --single-attribute we use the same + // attribute on contiguous machine numbers (the original bench-read- + // bulk shape). + string[] attributes = allAttributes + ? new[] { "TestChangingInt", "ProtectedValue", "TestBoolArray[]", "TestIntArray[]", "TestDateTimeArray[]", "TestStringArray[]" } + : new[] { singleAttribute }; + List tags = new(capacity: bulkSize); + for (int i = 0; i < bulkSize; i++) + { + int machineNumber = tagStart + (i / attributes.Length); + string attribute = attributes[i % attributes.Length]; + tags.Add($"{tagPrefix}{machineNumber:D3}.{attribute}"); + } + + long warmupEvents = 0; + long steadyEvents = 0; + long steadyDataChangeEvents = 0; + List endToEndLatencyMs = new(capacity: 262_144); + object latencyLock = new(); + DateTime? firstSteadyEventUtc = null; + DateTime? lastSteadyEventUtc = null; + int totalSubscribeFailures = 0; + int totalSubscribedTagCount = 0; + int totalDrainedFaultCount = 0; + DateTime warmupStart = default; + DateTime warmupEnd = default; + DateTime steadyEnd = default; + + // Phase 1: open + subscribe each session sequentially with a stagger, + // so worker spawn-up doesn't exceed the gateway's per-session startup + // timeout. Each session stashes its (sessionId, serverHandle, itemHandles) + // for phase 2. + var openedSessions = new List<(string SessionId, int ServerHandle, int[] ItemHandles, string ClientName)>(sessionCount); + for (int sessionIndex = 0; sessionIndex < sessionCount; sessionIndex++) + { + if (sessionIndex > 0 && sessionStartStaggerMs > 0) + { + await Task.Delay(sessionStartStaggerMs, cancellationToken).ConfigureAwait(false); + } + + string thisClientName = sessionCount == 1 + ? clientName + : $"{clientName}-{sessionIndex:D2}"; + OpenSessionReply openReply = await client.OpenSessionAsync( + new OpenSessionRequest { ClientSessionName = thisClientName, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + string sessionId = openReply.SessionId; + + MxCommandReply registerReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = thisClientName }, + }), + cancellationToken) + .ConfigureAwait(false); + int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value; + + SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; + subscribe.TagAddresses.Add(tags); + MxCommandReply subscribeReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.SubscribeBulk, + SubscribeBulk = subscribe, + }), + cancellationToken) + .ConfigureAwait(false); + SubscribeResult[] subscribeResults = subscribeReply.SubscribeBulk?.Results.ToArray() ?? []; + int[] itemHandles = subscribeResults.Where(r => r.WasSuccessful).Select(r => r.ItemHandle).ToArray(); + totalSubscribedTagCount += itemHandles.Length; + totalSubscribeFailures += subscribeResults.Count(r => !r.WasSuccessful); + openedSessions.Add((sessionId, serverHandle, itemHandles, thisClientName)); + } + + // Phase 2: now every session is open + advised. Start the measurement + // window and run each session's StreamEvents reader in parallel. + warmupStart = DateTime.UtcNow; + warmupEnd = warmupStart + TimeSpan.FromSeconds(warmupSeconds); + steadyEnd = warmupEnd + TimeSpan.FromSeconds(durationSeconds); + + async Task RunStreamAsync((string SessionId, int ServerHandle, int[] ItemHandles, string ClientName) ctx) + { + using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + Task streamTask = Task.Run(async () => + { + StreamEventsRequest streamRequest = new() { SessionId = ctx.SessionId }; + await foreach (MxEvent mxEvent in client.StreamEventsAsync(streamRequest, streamCts.Token) + .ConfigureAwait(false)) + { + DateTime nowUtc = DateTime.UtcNow; + if (nowUtc >= steadyEnd) + { + break; + } + + if (nowUtc < warmupEnd) + { + Interlocked.Increment(ref warmupEvents); + continue; + } + + if (firstSteadyEventUtc is null) + { + firstSteadyEventUtc = nowUtc; + } + lastSteadyEventUtc = nowUtc; + Interlocked.Increment(ref steadyEvents); + if (mxEvent.Family == MxEventFamily.OnDataChange) + { + Interlocked.Increment(ref steadyDataChangeEvents); + } + + if (mxEvent.WorkerTimestamp is { } workerStamp) + { + double latency = (nowUtc - workerStamp.ToDateTime()).TotalMilliseconds; + if (latency >= 0) + { + lock (latencyLock) { endToEndLatencyMs.Add(latency); } + } + } + } + }, streamCts.Token); + + await Task.Delay(steadyEnd - warmupStart, cancellationToken).ConfigureAwait(false); + streamCts.Cancel(); + try { await streamTask.ConfigureAwait(false); } + catch (OperationCanceledException) { } + catch (Grpc.Core.RpcException ex) when (ex.StatusCode is Grpc.Core.StatusCode.Cancelled) { } + + try + { + MxCommandReply drainReply = await client.InvokeAsync( + CreateCommandRequest(ctx.SessionId, new MxCommand + { + Kind = MxCommandKind.DrainEvents, + DrainEvents = new DrainEventsCommand { MaxEvents = 16 }, + }), + cancellationToken) + .ConfigureAwait(false); + Interlocked.Add(ref totalDrainedFaultCount, drainReply.DrainEvents?.Events.Count ?? 0); + } + catch { /* fault probe is best-effort */ } + + if (ctx.ItemHandles.Length > 0) + { + try + { + UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = ctx.ServerHandle }; + unsubscribe.ItemHandles.Add(ctx.ItemHandles); + _ = await client.InvokeAsync( + CreateCommandRequest(ctx.SessionId, new MxCommand + { + Kind = MxCommandKind.UnsubscribeBulk, + UnsubscribeBulk = unsubscribe, + }), + cancellationToken) + .ConfigureAwait(false); + } + catch { /* best-effort */ } + } + + try + { + await client.CloseSessionAsync( + new CloseSessionRequest { SessionId = ctx.SessionId, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + } + catch { /* best-effort */ } + } + + Task[] streamTasks = openedSessions.Select(RunStreamAsync).ToArray(); + await Task.WhenAll(streamTasks).ConfigureAwait(false); + + double steadyElapsedSeconds = (firstSteadyEventUtc.HasValue && lastSteadyEventUtc.HasValue) + ? (lastSteadyEventUtc.Value - firstSteadyEventUtc.Value).TotalSeconds + : 0; + double eventsPerSecond = steadyElapsedSeconds > 0 ? steadyEvents / steadyElapsedSeconds : 0; + double dataChangeEventsPerSecond = steadyElapsedSeconds > 0 + ? steadyDataChangeEvents / steadyElapsedSeconds + : 0; + + double[] latencySnapshot; + lock (latencyLock) { latencySnapshot = endToEndLatencyMs.ToArray(); } + + object stats = new + { + language = "dotnet", + command = "bench-stream-events", + endpoint = arguments.GetOptional("endpoint") ?? "(default)", + clientName, + sessionCount, + bulkSize, + durationSeconds, + warmupSeconds, + allAttributes, + steadyElapsedSeconds = Math.Round(steadyElapsedSeconds, 3), + subscribedTagCount = totalSubscribedTagCount, + subscribeFailures = totalSubscribeFailures, + warmupEvents, + steadyEvents, + steadyDataChangeEvents, + eventsPerSecond = Math.Round(eventsPerSecond, 2), + dataChangeEventsPerSecond = Math.Round(dataChangeEventsPerSecond, 2), + drainedFaultCount = totalDrainedFaultCount, + endToEndLatencyMs = new + { + p50 = Percentile(latencySnapshot, 0.50), + p95 = Percentile(latencySnapshot, 0.95), + p99 = Percentile(latencySnapshot, 0.99), + max = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Max(), 3) : 0, + mean = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Average(), 3) : 0, + sampleCount = latencySnapshot.Length, + }, + }; + output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); + return 0; + } + /// /// Computes the requested percentile from an unsorted latency sample using /// nearest-rank with linear interpolation. Rounds to 3 decimal places to @@ -1473,6 +1745,7 @@ public static class MxGatewayClientCli or "write-secured-bulk" or "write-secured2-bulk" or "bench-read-bulk" + or "bench-stream-events" or "stream-events" or "write" or "write2"