.NET CLI: bench-stream-events for max event-throughput characterization

New subcommand drives the gateway''s StreamEvents server-stream as fast
as it can from a single client process. Subscribes to --bulk-size tags
(rotating through all six TestMachine attributes by default) and counts
events received over a --duration-seconds steady-state window. Tracks
events/sec, end-to-end latency (now - event.worker_timestamp), and any
worker faults observed via a post-run DrainEvents probe.

--session-count opens N independent gateway sessions from the same
client process — each session is independent at the gateway (own
worker, own event subscriber, own item handles) so this measures how
the gateway multiplexes concurrent event streams without needing
multiple client processes. Sessions are staggered open by default
(--session-start-stagger-ms 750) because firing N concurrent
OpenSession calls forces N concurrent worker x86 spawns, and on a dev
rig that exceeds the gateway''s 30-second worker startup timeout
around N >= 6-8. The stagger gives each worker headroom to init its
COM apartment + attach the event sink before the next one starts.

Phase 1 of the bench opens + subscribes every session sequentially;
phase 2 opens the steady-state window once everyone is advised, so
the measurement isn''t skewed by late-arriving sessions still in
warmup. The latency sample is shared across sessions (locked
List<double>); event counts use Interlocked.

Initial sweep at --bulk-size 120 against the dev galaxy (20 machines
x 6 attributes = 120 unique tags) showed:

  - Linear throughput scaling with subscribed-tag count: N=6→2 ev/s,
    N=24→8 ev/s, N=60→20 ev/s, N=120→41 ev/s. The dev galaxy is
    producer-bound at ~0.34 events/sec per advised tag — gateway has
    plenty of headroom.
  - Latency stayed at p50 ≈17ms, p95 ≈34ms across the entire range —
    no degradation with subscribed-tag count.
  - Zero queue-overflow faults; gateway 10k-event buffer never came
    close to filling at this producer rate.
  - Linear scaling with session count too (staggered open): 1→44, 2→81,
    4→130, 8→324 events/sec at p50 16ms across all session counts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-20 06:30:24 -04:00
parent 61644e63fb
commit 1cd51bbda3
@@ -113,6 +113,8 @@ public static class MxGatewayClientCli
.ConfigureAwait(false),
"bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"bench-stream-events" => await BenchStreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
@@ -787,6 +789,276 @@ public static class MxGatewayClientCli
}
}
/// <summary>
/// Single-client event-rate stress benchmark. Opens its own session,
/// subscribes to <c>--bulk-size</c> tags spanning the dev galaxy's
/// TestMachine_NNN.* set, then drains the gateway's StreamEvents
/// server-stream as fast as it can for <c>--duration-seconds</c>.
/// Tracks events received per second, end-to-end latency from
/// <c>event.worker_timestamp</c> to client receive time, and any
/// worker faults emitted by the gateway over the same window.
/// <para>
/// The companion <c>--all-attributes</c> flag (default <c>true</c>)
/// subscribes to all six TestMachine attributes per machine number
/// so the bench can drive event volume past one-attribute-per-machine
/// when the dev galaxy's TestChangingInt is the only churning tag.
/// </para>
/// </summary>
private static async Task<int> BenchStreamEventsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
int bulkSize = arguments.GetInt32("bulk-size", 60);
int tagStart = arguments.GetInt32("tag-start", 1);
int sessionCount = arguments.GetInt32("session-count", 1);
// Concurrent OpenSession calls all the way up the stack force the
// gateway to spawn N x86 workers simultaneously; that hits the
// 30-second worker startup timeout around 6-8 concurrent opens on
// a dev rig. The stagger spreads the opens out so per-session
// OpenSession+Register completes inside that budget before the
// next session starts spawning. The steady-state event window
// begins only once all sessions are open and subscribed.
int sessionStartStaggerMs = arguments.GetInt32("session-start-stagger-ms", sessionCount > 1 ? 750 : 0);
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
bool allAttributes = !arguments.HasFlag("single-attribute");
string singleAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench-events";
if (sessionCount < 1)
{
throw new ArgumentException("--session-count must be >= 1.");
}
// Build the tag set. With --all-attributes (default) we rotate through
// all six TestMachine attributes to drive more event volume from a
// smaller machine range; with --single-attribute we use the same
// attribute on contiguous machine numbers (the original bench-read-
// bulk shape).
string[] attributes = allAttributes
? new[] { "TestChangingInt", "ProtectedValue", "TestBoolArray[]", "TestIntArray[]", "TestDateTimeArray[]", "TestStringArray[]" }
: new[] { singleAttribute };
List<string> tags = new(capacity: bulkSize);
for (int i = 0; i < bulkSize; i++)
{
int machineNumber = tagStart + (i / attributes.Length);
string attribute = attributes[i % attributes.Length];
tags.Add($"{tagPrefix}{machineNumber:D3}.{attribute}");
}
long warmupEvents = 0;
long steadyEvents = 0;
long steadyDataChangeEvents = 0;
List<double> endToEndLatencyMs = new(capacity: 262_144);
object latencyLock = new();
DateTime? firstSteadyEventUtc = null;
DateTime? lastSteadyEventUtc = null;
int totalSubscribeFailures = 0;
int totalSubscribedTagCount = 0;
int totalDrainedFaultCount = 0;
DateTime warmupStart = default;
DateTime warmupEnd = default;
DateTime steadyEnd = default;
// Phase 1: open + subscribe each session sequentially with a stagger,
// so worker spawn-up doesn't exceed the gateway's per-session startup
// timeout. Each session stashes its (sessionId, serverHandle, itemHandles)
// for phase 2.
var openedSessions = new List<(string SessionId, int ServerHandle, int[] ItemHandles, string ClientName)>(sessionCount);
for (int sessionIndex = 0; sessionIndex < sessionCount; sessionIndex++)
{
if (sessionIndex > 0 && sessionStartStaggerMs > 0)
{
await Task.Delay(sessionStartStaggerMs, cancellationToken).ConfigureAwait(false);
}
string thisClientName = sessionCount == 1
? clientName
: $"{clientName}-{sessionIndex:D2}";
OpenSessionReply openReply = await client.OpenSessionAsync(
new OpenSessionRequest { ClientSessionName = thisClientName, ClientCorrelationId = CreateCorrelationId() },
cancellationToken)
.ConfigureAwait(false);
string sessionId = openReply.SessionId;
MxCommandReply registerReply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = thisClientName },
}),
cancellationToken)
.ConfigureAwait(false);
int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value;
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
subscribe.TagAddresses.Add(tags);
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = subscribe,
}),
cancellationToken)
.ConfigureAwait(false);
SubscribeResult[] subscribeResults = subscribeReply.SubscribeBulk?.Results.ToArray() ?? [];
int[] itemHandles = subscribeResults.Where(r => r.WasSuccessful).Select(r => r.ItemHandle).ToArray();
totalSubscribedTagCount += itemHandles.Length;
totalSubscribeFailures += subscribeResults.Count(r => !r.WasSuccessful);
openedSessions.Add((sessionId, serverHandle, itemHandles, thisClientName));
}
// Phase 2: now every session is open + advised. Start the measurement
// window and run each session's StreamEvents reader in parallel.
warmupStart = DateTime.UtcNow;
warmupEnd = warmupStart + TimeSpan.FromSeconds(warmupSeconds);
steadyEnd = warmupEnd + TimeSpan.FromSeconds(durationSeconds);
async Task RunStreamAsync((string SessionId, int ServerHandle, int[] ItemHandles, string ClientName) ctx)
{
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task streamTask = Task.Run(async () =>
{
StreamEventsRequest streamRequest = new() { SessionId = ctx.SessionId };
await foreach (MxEvent mxEvent in client.StreamEventsAsync(streamRequest, streamCts.Token)
.ConfigureAwait(false))
{
DateTime nowUtc = DateTime.UtcNow;
if (nowUtc >= steadyEnd)
{
break;
}
if (nowUtc < warmupEnd)
{
Interlocked.Increment(ref warmupEvents);
continue;
}
if (firstSteadyEventUtc is null)
{
firstSteadyEventUtc = nowUtc;
}
lastSteadyEventUtc = nowUtc;
Interlocked.Increment(ref steadyEvents);
if (mxEvent.Family == MxEventFamily.OnDataChange)
{
Interlocked.Increment(ref steadyDataChangeEvents);
}
if (mxEvent.WorkerTimestamp is { } workerStamp)
{
double latency = (nowUtc - workerStamp.ToDateTime()).TotalMilliseconds;
if (latency >= 0)
{
lock (latencyLock) { endToEndLatencyMs.Add(latency); }
}
}
}
}, streamCts.Token);
await Task.Delay(steadyEnd - warmupStart, cancellationToken).ConfigureAwait(false);
streamCts.Cancel();
try { await streamTask.ConfigureAwait(false); }
catch (OperationCanceledException) { }
catch (Grpc.Core.RpcException ex) when (ex.StatusCode is Grpc.Core.StatusCode.Cancelled) { }
try
{
MxCommandReply drainReply = await client.InvokeAsync(
CreateCommandRequest(ctx.SessionId, new MxCommand
{
Kind = MxCommandKind.DrainEvents,
DrainEvents = new DrainEventsCommand { MaxEvents = 16 },
}),
cancellationToken)
.ConfigureAwait(false);
Interlocked.Add(ref totalDrainedFaultCount, drainReply.DrainEvents?.Events.Count ?? 0);
}
catch { /* fault probe is best-effort */ }
if (ctx.ItemHandles.Length > 0)
{
try
{
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = ctx.ServerHandle };
unsubscribe.ItemHandles.Add(ctx.ItemHandles);
_ = await client.InvokeAsync(
CreateCommandRequest(ctx.SessionId, new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = unsubscribe,
}),
cancellationToken)
.ConfigureAwait(false);
}
catch { /* best-effort */ }
}
try
{
await client.CloseSessionAsync(
new CloseSessionRequest { SessionId = ctx.SessionId, ClientCorrelationId = CreateCorrelationId() },
cancellationToken)
.ConfigureAwait(false);
}
catch { /* best-effort */ }
}
Task[] streamTasks = openedSessions.Select(RunStreamAsync).ToArray();
await Task.WhenAll(streamTasks).ConfigureAwait(false);
double steadyElapsedSeconds = (firstSteadyEventUtc.HasValue && lastSteadyEventUtc.HasValue)
? (lastSteadyEventUtc.Value - firstSteadyEventUtc.Value).TotalSeconds
: 0;
double eventsPerSecond = steadyElapsedSeconds > 0 ? steadyEvents / steadyElapsedSeconds : 0;
double dataChangeEventsPerSecond = steadyElapsedSeconds > 0
? steadyDataChangeEvents / steadyElapsedSeconds
: 0;
double[] latencySnapshot;
lock (latencyLock) { latencySnapshot = endToEndLatencyMs.ToArray(); }
object stats = new
{
language = "dotnet",
command = "bench-stream-events",
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
clientName,
sessionCount,
bulkSize,
durationSeconds,
warmupSeconds,
allAttributes,
steadyElapsedSeconds = Math.Round(steadyElapsedSeconds, 3),
subscribedTagCount = totalSubscribedTagCount,
subscribeFailures = totalSubscribeFailures,
warmupEvents,
steadyEvents,
steadyDataChangeEvents,
eventsPerSecond = Math.Round(eventsPerSecond, 2),
dataChangeEventsPerSecond = Math.Round(dataChangeEventsPerSecond, 2),
drainedFaultCount = totalDrainedFaultCount,
endToEndLatencyMs = new
{
p50 = Percentile(latencySnapshot, 0.50),
p95 = Percentile(latencySnapshot, 0.95),
p99 = Percentile(latencySnapshot, 0.99),
max = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Max(), 3) : 0,
mean = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Average(), 3) : 0,
sampleCount = latencySnapshot.Length,
},
};
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
return 0;
}
/// <summary>
/// Computes the requested percentile from an unsorted latency sample using
/// nearest-rank with linear interpolation. Rounds to 3 decimal places to
@@ -1473,6 +1745,7 @@ public static class MxGatewayClientCli
or "write-secured-bulk"
or "write-secured2-bulk"
or "bench-read-bulk"
or "bench-stream-events"
or "stream-events"
or "write"
or "write2"