diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index d9ec103..c1cae7f 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -113,6 +113,8 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "bench-stream-events" => await BenchStreamEventsAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) @@ -787,6 +789,276 @@ public static class MxGatewayClientCli } } + /// + /// Single-client event-rate stress benchmark. Opens its own session, + /// subscribes to --bulk-size tags spanning the dev galaxy's + /// TestMachine_NNN.* set, then drains the gateway's StreamEvents + /// server-stream as fast as it can for --duration-seconds. + /// Tracks events received per second, end-to-end latency from + /// event.worker_timestamp to client receive time, and any + /// worker faults emitted by the gateway over the same window. + /// + /// The companion --all-attributes flag (default true) + /// subscribes to all six TestMachine attributes per machine number + /// so the bench can drive event volume past one-attribute-per-machine + /// when the dev galaxy's TestChangingInt is the only churning tag. + /// + /// + private static async Task BenchStreamEventsAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + int durationSeconds = arguments.GetInt32("duration-seconds", 30); + int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); + int bulkSize = arguments.GetInt32("bulk-size", 60); + int tagStart = arguments.GetInt32("tag-start", 1); + int sessionCount = arguments.GetInt32("session-count", 1); + // Concurrent OpenSession calls all the way up the stack force the + // gateway to spawn N x86 workers simultaneously; that hits the + // 30-second worker startup timeout around 6-8 concurrent opens on + // a dev rig. The stagger spreads the opens out so per-session + // OpenSession+Register completes inside that budget before the + // next session starts spawning. The steady-state event window + // begins only once all sessions are open and subscribed. + int sessionStartStaggerMs = arguments.GetInt32("session-start-stagger-ms", sessionCount > 1 ? 750 : 0); + string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; + bool allAttributes = !arguments.HasFlag("single-attribute"); + string singleAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; + string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench-events"; + + if (sessionCount < 1) + { + throw new ArgumentException("--session-count must be >= 1."); + } + + // Build the tag set. With --all-attributes (default) we rotate through + // all six TestMachine attributes to drive more event volume from a + // smaller machine range; with --single-attribute we use the same + // attribute on contiguous machine numbers (the original bench-read- + // bulk shape). + string[] attributes = allAttributes + ? new[] { "TestChangingInt", "ProtectedValue", "TestBoolArray[]", "TestIntArray[]", "TestDateTimeArray[]", "TestStringArray[]" } + : new[] { singleAttribute }; + List tags = new(capacity: bulkSize); + for (int i = 0; i < bulkSize; i++) + { + int machineNumber = tagStart + (i / attributes.Length); + string attribute = attributes[i % attributes.Length]; + tags.Add($"{tagPrefix}{machineNumber:D3}.{attribute}"); + } + + long warmupEvents = 0; + long steadyEvents = 0; + long steadyDataChangeEvents = 0; + List endToEndLatencyMs = new(capacity: 262_144); + object latencyLock = new(); + DateTime? firstSteadyEventUtc = null; + DateTime? lastSteadyEventUtc = null; + int totalSubscribeFailures = 0; + int totalSubscribedTagCount = 0; + int totalDrainedFaultCount = 0; + DateTime warmupStart = default; + DateTime warmupEnd = default; + DateTime steadyEnd = default; + + // Phase 1: open + subscribe each session sequentially with a stagger, + // so worker spawn-up doesn't exceed the gateway's per-session startup + // timeout. Each session stashes its (sessionId, serverHandle, itemHandles) + // for phase 2. + var openedSessions = new List<(string SessionId, int ServerHandle, int[] ItemHandles, string ClientName)>(sessionCount); + for (int sessionIndex = 0; sessionIndex < sessionCount; sessionIndex++) + { + if (sessionIndex > 0 && sessionStartStaggerMs > 0) + { + await Task.Delay(sessionStartStaggerMs, cancellationToken).ConfigureAwait(false); + } + + string thisClientName = sessionCount == 1 + ? clientName + : $"{clientName}-{sessionIndex:D2}"; + OpenSessionReply openReply = await client.OpenSessionAsync( + new OpenSessionRequest { ClientSessionName = thisClientName, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + string sessionId = openReply.SessionId; + + MxCommandReply registerReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = thisClientName }, + }), + cancellationToken) + .ConfigureAwait(false); + int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value; + + SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; + subscribe.TagAddresses.Add(tags); + MxCommandReply subscribeReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.SubscribeBulk, + SubscribeBulk = subscribe, + }), + cancellationToken) + .ConfigureAwait(false); + SubscribeResult[] subscribeResults = subscribeReply.SubscribeBulk?.Results.ToArray() ?? []; + int[] itemHandles = subscribeResults.Where(r => r.WasSuccessful).Select(r => r.ItemHandle).ToArray(); + totalSubscribedTagCount += itemHandles.Length; + totalSubscribeFailures += subscribeResults.Count(r => !r.WasSuccessful); + openedSessions.Add((sessionId, serverHandle, itemHandles, thisClientName)); + } + + // Phase 2: now every session is open + advised. Start the measurement + // window and run each session's StreamEvents reader in parallel. + warmupStart = DateTime.UtcNow; + warmupEnd = warmupStart + TimeSpan.FromSeconds(warmupSeconds); + steadyEnd = warmupEnd + TimeSpan.FromSeconds(durationSeconds); + + async Task RunStreamAsync((string SessionId, int ServerHandle, int[] ItemHandles, string ClientName) ctx) + { + using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + Task streamTask = Task.Run(async () => + { + StreamEventsRequest streamRequest = new() { SessionId = ctx.SessionId }; + await foreach (MxEvent mxEvent in client.StreamEventsAsync(streamRequest, streamCts.Token) + .ConfigureAwait(false)) + { + DateTime nowUtc = DateTime.UtcNow; + if (nowUtc >= steadyEnd) + { + break; + } + + if (nowUtc < warmupEnd) + { + Interlocked.Increment(ref warmupEvents); + continue; + } + + if (firstSteadyEventUtc is null) + { + firstSteadyEventUtc = nowUtc; + } + lastSteadyEventUtc = nowUtc; + Interlocked.Increment(ref steadyEvents); + if (mxEvent.Family == MxEventFamily.OnDataChange) + { + Interlocked.Increment(ref steadyDataChangeEvents); + } + + if (mxEvent.WorkerTimestamp is { } workerStamp) + { + double latency = (nowUtc - workerStamp.ToDateTime()).TotalMilliseconds; + if (latency >= 0) + { + lock (latencyLock) { endToEndLatencyMs.Add(latency); } + } + } + } + }, streamCts.Token); + + await Task.Delay(steadyEnd - warmupStart, cancellationToken).ConfigureAwait(false); + streamCts.Cancel(); + try { await streamTask.ConfigureAwait(false); } + catch (OperationCanceledException) { } + catch (Grpc.Core.RpcException ex) when (ex.StatusCode is Grpc.Core.StatusCode.Cancelled) { } + + try + { + MxCommandReply drainReply = await client.InvokeAsync( + CreateCommandRequest(ctx.SessionId, new MxCommand + { + Kind = MxCommandKind.DrainEvents, + DrainEvents = new DrainEventsCommand { MaxEvents = 16 }, + }), + cancellationToken) + .ConfigureAwait(false); + Interlocked.Add(ref totalDrainedFaultCount, drainReply.DrainEvents?.Events.Count ?? 0); + } + catch { /* fault probe is best-effort */ } + + if (ctx.ItemHandles.Length > 0) + { + try + { + UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = ctx.ServerHandle }; + unsubscribe.ItemHandles.Add(ctx.ItemHandles); + _ = await client.InvokeAsync( + CreateCommandRequest(ctx.SessionId, new MxCommand + { + Kind = MxCommandKind.UnsubscribeBulk, + UnsubscribeBulk = unsubscribe, + }), + cancellationToken) + .ConfigureAwait(false); + } + catch { /* best-effort */ } + } + + try + { + await client.CloseSessionAsync( + new CloseSessionRequest { SessionId = ctx.SessionId, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + } + catch { /* best-effort */ } + } + + Task[] streamTasks = openedSessions.Select(RunStreamAsync).ToArray(); + await Task.WhenAll(streamTasks).ConfigureAwait(false); + + double steadyElapsedSeconds = (firstSteadyEventUtc.HasValue && lastSteadyEventUtc.HasValue) + ? (lastSteadyEventUtc.Value - firstSteadyEventUtc.Value).TotalSeconds + : 0; + double eventsPerSecond = steadyElapsedSeconds > 0 ? steadyEvents / steadyElapsedSeconds : 0; + double dataChangeEventsPerSecond = steadyElapsedSeconds > 0 + ? steadyDataChangeEvents / steadyElapsedSeconds + : 0; + + double[] latencySnapshot; + lock (latencyLock) { latencySnapshot = endToEndLatencyMs.ToArray(); } + + object stats = new + { + language = "dotnet", + command = "bench-stream-events", + endpoint = arguments.GetOptional("endpoint") ?? "(default)", + clientName, + sessionCount, + bulkSize, + durationSeconds, + warmupSeconds, + allAttributes, + steadyElapsedSeconds = Math.Round(steadyElapsedSeconds, 3), + subscribedTagCount = totalSubscribedTagCount, + subscribeFailures = totalSubscribeFailures, + warmupEvents, + steadyEvents, + steadyDataChangeEvents, + eventsPerSecond = Math.Round(eventsPerSecond, 2), + dataChangeEventsPerSecond = Math.Round(dataChangeEventsPerSecond, 2), + drainedFaultCount = totalDrainedFaultCount, + endToEndLatencyMs = new + { + p50 = Percentile(latencySnapshot, 0.50), + p95 = Percentile(latencySnapshot, 0.95), + p99 = Percentile(latencySnapshot, 0.99), + max = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Max(), 3) : 0, + mean = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Average(), 3) : 0, + sampleCount = latencySnapshot.Length, + }, + }; + output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); + return 0; + } + /// /// Computes the requested percentile from an unsorted latency sample using /// nearest-rank with linear interpolation. Rounds to 3 decimal places to @@ -1473,6 +1745,7 @@ public static class MxGatewayClientCli or "write-secured-bulk" or "write-secured2-bulk" or "bench-read-bulk" + or "bench-stream-events" or "stream-events" or "write" or "write2"