using System.Globalization; using System.Text.Json; using Google.Protobuf; using MxGateway.Client; using MxGateway.Contracts.Proto; using MxGateway.Contracts.Proto.Galaxy; namespace MxGateway.Client.Cli; /// Command-line interface for the MXAccess Gateway client, supporting session and command operations. public static class MxGatewayClientCli { private const uint MaxAggregateEvents = 10_000; private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default; private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); /// Runs the CLI synchronously with the given arguments, writing output and errors. /// Command-line arguments (command name followed by options). /// TextWriter for command output. /// TextWriter for error messages. public static int Run( string[] args, TextWriter standardOutput, TextWriter standardError) { return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null) .GetAwaiter() .GetResult(); } /// Runs the CLI asynchronously with the given arguments, writing output and errors. /// Command-line arguments (command name followed by options). /// TextWriter for command output. /// TextWriter for error messages. /// Optional factory to create the gateway client; defaults to MxGatewayClient.Create. /// Optional TextReader for batch-mode stdin; defaults to . public static Task RunAsync( string[] args, TextWriter standardOutput, TextWriter standardError, Func? clientFactory = null, TextReader? standardInput = null) { ArgumentNullException.ThrowIfNull(args); ArgumentNullException.ThrowIfNull(standardOutput); ArgumentNullException.ThrowIfNull(standardError); return RunCoreAsync( args, standardOutput, standardError, clientFactory ?? CreateDefaultClient, standardInput ?? Console.In); } private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__"; private static async Task RunCoreAsync( string[] args, TextWriter standardOutput, TextWriter standardError, Func clientFactory, TextReader standardInput, bool forceJsonErrors = false) { if (args.Length is 0 || IsHelp(args[0])) { WriteUsage(standardOutput); return 0; } string command = args[0].ToLowerInvariant(); if (command is "batch") { return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false); } CliArguments arguments = new(args.Skip(1)); try { if (command is "version") { WriteVersion(arguments, standardOutput); return 0; } if (!IsKnownGatewayCommand(command)) { return WriteUnknownCommand(command, standardError); } await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments)); using CancellationTokenSource cancellation = CreateCancellation(arguments, command); return command switch { "open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "subscribe-bulk" => await SubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "bench-stream-events" => await BenchStreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "galaxy-test-connection" => await GalaxyTestConnectionAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "galaxy-last-deploy" => await GalaxyLastDeployAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "galaxy-discover" => await GalaxyDiscoverAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "galaxy-watch" => await GalaxyWatchAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), _ => WriteUnknownCommand(command, standardError), }; } catch (Exception exception) when (exception is not OperationCanceledException) { // Redact the effective API key — whether it came from --api-key or from // the (documented default) --api-key-env environment variable — so a // transport error message that echoes the bearer token is never printed. string? apiKey = TryResolveApiKey(arguments); string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey); if (forceJsonErrors || arguments.HasFlag("json")) { standardError.WriteLine(JsonSerializer.Serialize( new { error = message, type = exception.GetType().Name }, JsonOptions)); } else { standardError.WriteLine(message); } return 1; } } private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options) { return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options)); } private static MxGatewayClientOptions CreateOptions(CliArguments arguments) { string endpoint = arguments.GetOptional("endpoint") ?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT") ?? "http://localhost:5000"; string apiKey = ResolveApiKey(arguments); return new MxGatewayClientOptions { Endpoint = new Uri(endpoint, UriKind.Absolute), ApiKey = apiKey, UseTls = arguments.HasFlag("tls") || endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase), DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)), ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)), CaCertificatePath = arguments.GetOptional("ca-file"), ServerNameOverride = arguments.GetOptional("server-name"), }; } private static string ResolveApiKey(CliArguments arguments) { string? apiKey = TryResolveApiKey(arguments); if (!string.IsNullOrWhiteSpace(apiKey)) { return apiKey; } string apiKeyEnvironmentName = arguments.GetOptional("api-key-env") ?? "MXGATEWAY_API_KEY"; throw new ArgumentException( $"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}."); } /// /// Resolves the effective API key from --api-key or, failing that, the /// environment variable named by --api-key-env (default /// MXGATEWAY_API_KEY). Returns when no key is /// configured; used for redaction where a missing key must not throw. /// private static string? TryResolveApiKey(CliArguments arguments) { string? apiKey = arguments.GetOptional("api-key"); if (!string.IsNullOrWhiteSpace(apiKey)) { return apiKey; } string apiKeyEnvironmentName = arguments.GetOptional("api-key-env") ?? "MXGATEWAY_API_KEY"; return Environment.GetEnvironmentVariable(apiKeyEnvironmentName); } private static CancellationTokenSource CreateCancellation(CliArguments arguments, string command) { var cancellation = new CancellationTokenSource(); // Long-running streaming / bench commands run until they finish (or Ctrl+C) // by default; a caller-supplied --timeout still applies if present. The // bench commands default to --duration-seconds=30 --warmup-seconds=3 plus // a per-session stagger, which already exceeds the default 30 s wall-clock // budget, so applying that budget would cancel them mid-window and emit a // zero-throughput JSON payload (see Client.Dotnet-015). bool isLongRunning = command is "galaxy-watch" or "bench-read-bulk" or "bench-stream-events"; string? rawTimeout = arguments.GetOptional("timeout"); if (isLongRunning && string.IsNullOrWhiteSpace(rawTimeout)) { return cancellation; } TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)); cancellation.CancelAfter(timeout); return cancellation; } /// /// Runs the CLI in batch mode: reads one command line at a time from /// , dispatches it through the normal /// routing, writes all output to , and /// then appends as a sentinel so the /// caller can delimit command results. Continues on failure; errors are /// written as JSON to (not stderr) so /// that the harness sees them inside the same delimited block. Exits 0 /// on EOF or empty line. /// private static async Task RunBatchAsync( TextWriter standardOutput, Func clientFactory, TextReader standardInput) { while (true) { string? line = await standardInput.ReadLineAsync().ConfigureAwait(false); // EOF or empty line signals clean exit. if (line is null || line.Length is 0) { return 0; } // Split on runs of ASCII whitespace — no quoting support by design. string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries); // Per-command output is buffered so we can redirect errors to stdout. using StringWriter commandOutput = new(); // Errors in batch mode go to stdout (same delimited block), formatted as JSON. // We use a capturing error writer and re-emit through commandOutput after the // command returns, so the EOR sentinel always follows the complete result. using StringWriter commandError = new(); try { await RunCoreAsync( lineArgs, commandOutput, commandError, clientFactory, standardInput, forceJsonErrors: true) .ConfigureAwait(false); } catch (Exception exception) when (exception is not OperationCanceledException) { // Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe). commandError.WriteLine(JsonSerializer.Serialize( new { error = exception.Message, type = exception.GetType().Name }, JsonOptions)); } // Write any buffered normal output first. string commandOutputText = commandOutput.ToString(); if (commandOutputText.Length > 0) { standardOutput.Write(commandOutputText); } // Then any error output — in batch mode it belongs on stdout so the harness // sees it inside the delimited record. string commandErrorText = commandError.ToString(); if (commandErrorText.Length > 0) { standardOutput.Write(commandErrorText); } // Write the end-of-record sentinel and flush so the harness can unblock. standardOutput.WriteLine(BatchEndOfRecord); await standardOutput.FlushAsync().ConfigureAwait(false); } } private static Task OpenSessionAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli", ClientCorrelationId = CreateCorrelationId(), RequestedBackend = arguments.GetOptional("backend") ?? string.Empty, }, cancellationToken), arguments, output); } private static Task CloseSessionAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.CloseSessionAsync( new CloseSessionRequest { SessionId = arguments.GetRequired("session-id"), ClientCorrelationId = CreateCorrelationId(), }, cancellationToken), arguments, output); } private static Task PingAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" }, }, cancellationToken); } private static Task RegisterAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" }, }, cancellationToken); } private static Task AddItemAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemDefinition = arguments.GetRequired("item"), }, }, cancellationToken); } private static Task AdviseAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), }, }, cancellationToken); } private static Task SubscribeBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { SubscribeBulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items"))); return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = command, }, cancellationToken); } private static Task UnsubscribeBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { UnsubscribeBulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; command.ItemHandles.Add(ParseInt32List(arguments.GetRequired("item-handles"))); return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = command, }, cancellationToken); } private static Task ReadBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { ReadBulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0), }; command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items"))); return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.ReadBulk, ReadBulk = command, }, cancellationToken); } private static Task WriteBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { WriteBulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); IReadOnlyList values = ParseValuesList(arguments); int userId = arguments.GetInt32("user-id", 0); EnsureSameLength(handles.Count, values.Count); for (int i = 0; i < handles.Count; i++) { command.Entries.Add(new WriteBulkEntry { ItemHandle = handles[i], Value = values[i], UserId = userId, }); } return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.WriteBulk, WriteBulk = command, }, cancellationToken); } private static Task Write2BulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { Write2BulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); IReadOnlyList values = ParseValuesList(arguments); MxValue timestampValue = ParseTimestampValue(arguments); int userId = arguments.GetInt32("user-id", 0); EnsureSameLength(handles.Count, values.Count); for (int i = 0; i < handles.Count; i++) { command.Entries.Add(new Write2BulkEntry { ItemHandle = handles[i], Value = values[i], TimestampValue = timestampValue, UserId = userId, }); } return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Write2Bulk, Write2Bulk = command, }, cancellationToken); } private static Task WriteSecuredBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { WriteSecuredBulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); IReadOnlyList values = ParseValuesList(arguments); int currentUserId = arguments.GetInt32("current-user-id"); int verifierUserId = arguments.GetInt32("verifier-user-id", 0); EnsureSameLength(handles.Count, values.Count); for (int i = 0; i < handles.Count; i++) { command.Entries.Add(new WriteSecuredBulkEntry { ItemHandle = handles[i], Value = values[i], CurrentUserId = currentUserId, VerifierUserId = verifierUserId, }); } return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.WriteSecuredBulk, WriteSecuredBulk = command, }, cancellationToken); } private static Task WriteSecured2BulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { WriteSecured2BulkCommand command = new() { ServerHandle = arguments.GetInt32("server-handle"), }; IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); IReadOnlyList values = ParseValuesList(arguments); MxValue timestampValue = ParseTimestampValue(arguments); int currentUserId = arguments.GetInt32("current-user-id"); int verifierUserId = arguments.GetInt32("verifier-user-id", 0); EnsureSameLength(handles.Count, values.Count); for (int i = 0; i < handles.Count; i++) { command.Entries.Add(new WriteSecured2BulkEntry { ItemHandle = handles[i], Value = values[i], TimestampValue = timestampValue, CurrentUserId = currentUserId, VerifierUserId = verifierUserId, }); } return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.WriteSecured2Bulk, WriteSecured2Bulk = command, }, cancellationToken); } /// /// Cross-language stress benchmark for ReadBulk. Opens its own session, /// subscribes to N tags so the worker's MxAccessValueCache populates from /// real OnDataChange events, then hammers ReadBulk in a tight in-process /// loop with per-call Stopwatch timing. Emits a single JSON object on /// stdout that the scripts/bench-read-bulk.ps1 driver collates across /// all five language clients. /// private static async Task BenchReadBulkAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { int durationSeconds = arguments.GetInt32("duration-seconds", 30); int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); int bulkSize = arguments.GetInt32("bulk-size", 6); int tagStart = arguments.GetInt32("tag-start", 1); string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500); string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench"; string[] tags = new string[bulkSize]; for (int i = 0; i < bulkSize; i++) { // TestMachine_NNN., three-digit machine numbers matching // the existing e2e tag-discovery convention. tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}"; } // Open + register + subscribe-bulk so the cache populates before the // measurement window opens. OpenSessionReply openReply = await client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() }, cancellationToken) .ConfigureAwait(false); string sessionId = openReply.SessionId; try { MxCommandReply registerReply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = clientName }, }), cancellationToken) .ConfigureAwait(false); int serverHandle = RequireRegisterServerHandle(registerReply); SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; subscribe.TagAddresses.Add(tags); MxCommandReply subscribeReply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = subscribe, }), cancellationToken) .ConfigureAwait(false); int[] itemHandles = subscribeReply.SubscribeBulk?.Results .Where(r => r.WasSuccessful) .Select(r => r.ItemHandle) .ToArray() ?? []; // Warm-up: drive the same call shape so the JIT / connection // pipelines settle before the measurement window opens. DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds); ReadBulkCommand readBulkCommand = new() { ServerHandle = serverHandle, TimeoutMs = timeoutMs, }; readBulkCommand.TagAddresses.Add(tags); MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand }; while (DateTime.UtcNow < warmupDeadline) { _ = await client.InvokeAsync( CreateCommandRequest(sessionId, readBulkMxCommand), cancellationToken) .ConfigureAwait(false); } // Steady state — capture per-call wall latency with a high-res // Stopwatch so the resolution is sub-millisecond on modern Windows. List latencyMillis = new(capacity: 65536); long totalReadResults = 0; long cachedReadResults = 0; int successfulCalls = 0; int failedCalls = 0; DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds); DateTime steadyStart = DateTime.UtcNow; while (DateTime.UtcNow < steadyDeadline) { System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew(); MxCommandReply reply; try { reply = await client.InvokeAsync( CreateCommandRequest(sessionId, readBulkMxCommand), cancellationToken) .ConfigureAwait(false); sw.Stop(); } catch { sw.Stop(); failedCalls++; latencyMillis.Add(sw.Elapsed.TotalMilliseconds); continue; } latencyMillis.Add(sw.Elapsed.TotalMilliseconds); if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok) { failedCalls++; continue; } successfulCalls++; if (reply.ReadBulk is not null) { foreach (BulkReadResult r in reply.ReadBulk.Results) { totalReadResults++; if (r.WasCached) { cachedReadResults++; } } } } double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds; if (itemHandles.Length > 0) { UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle }; unsubscribe.ItemHandles.Add(itemHandles); _ = await client.InvokeAsync( CreateCommandRequest(sessionId, new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = unsubscribe, }), cancellationToken) .ConfigureAwait(false); } int totalCalls = successfulCalls + failedCalls; double callsPerSecond = steadyElapsedSeconds > 0 ? totalCalls / steadyElapsedSeconds : 0; object stats = new { language = "dotnet", command = "bench-read-bulk", endpoint = arguments.GetOptional("endpoint") ?? "(default)", clientName, bulkSize, durationSeconds, warmupSeconds, durationMs = (long)(steadyElapsedSeconds * 1000), tags, totalCalls, successfulCalls, failedCalls, totalReadResults, cachedReadResults, callsPerSecond = Math.Round(callsPerSecond, 2), latencyMs = new { p50 = Percentile(latencyMillis, 0.50), p95 = Percentile(latencyMillis, 0.95), p99 = Percentile(latencyMillis, 0.99), max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0, mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0, }, }; output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); return 0; } finally { try { await client.CloseSessionAsync( new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() }, cancellationToken) .ConfigureAwait(false); } catch { // Closing the session is best-effort — never let it mask a real bench error. } } } /// /// Single-client event-rate stress benchmark. Opens its own session, /// subscribes to --bulk-size tags spanning the dev galaxy's /// TestMachine_NNN.* set, then drains the gateway's StreamEvents /// server-stream as fast as it can for --duration-seconds. /// Tracks events received per second, end-to-end latency from /// event.worker_timestamp to client receive time, and any /// worker faults emitted by the gateway over the same window. /// /// The companion --all-attributes flag (default true) /// subscribes to all six TestMachine attributes per machine number /// so the bench can drive event volume past one-attribute-per-machine /// when the dev galaxy's TestChangingInt is the only churning tag. /// /// private static async Task BenchStreamEventsAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { int durationSeconds = arguments.GetInt32("duration-seconds", 30); int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); int bulkSize = arguments.GetInt32("bulk-size", 60); int tagStart = arguments.GetInt32("tag-start", 1); int sessionCount = arguments.GetInt32("session-count", 1); // Concurrent OpenSession calls all the way up the stack force the // gateway to spawn N x86 workers simultaneously; that hits the // 30-second worker startup timeout around 6-8 concurrent opens on // a dev rig. The stagger spreads the opens out so per-session // OpenSession+Register completes inside that budget before the // next session starts spawning. The steady-state event window // begins only once all sessions are open and subscribed. int sessionStartStaggerMs = arguments.GetInt32("session-start-stagger-ms", sessionCount > 1 ? 750 : 0); string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; bool allAttributes = !arguments.HasFlag("single-attribute"); string singleAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench-events"; if (sessionCount < 1) { throw new ArgumentException("--session-count must be >= 1."); } // Build the tag set. With --all-attributes (default) we rotate through // all six TestMachine attributes to drive more event volume from a // smaller machine range; with --single-attribute we use the same // attribute on contiguous machine numbers (the original bench-read- // bulk shape). string[] attributes = allAttributes ? new[] { "TestChangingInt", "ProtectedValue", "TestBoolArray[]", "TestIntArray[]", "TestDateTimeArray[]", "TestStringArray[]" } : new[] { singleAttribute }; List tags = new(capacity: bulkSize); for (int i = 0; i < bulkSize; i++) { int machineNumber = tagStart + (i / attributes.Length); string attribute = attributes[i % attributes.Length]; tags.Add($"{tagPrefix}{machineNumber:D3}.{attribute}"); } long warmupEvents = 0; long steadyEvents = 0; long steadyDataChangeEvents = 0; List endToEndLatencyMs = new(capacity: 262_144); object latencyLock = new(); DateTime? firstSteadyEventUtc = null; DateTime? lastSteadyEventUtc = null; int totalSubscribeFailures = 0; int totalSubscribedTagCount = 0; int totalDrainedFaultCount = 0; DateTime warmupStart = default; DateTime warmupEnd = default; DateTime steadyEnd = default; // Phase 1: open + subscribe each session sequentially with a stagger, // so worker spawn-up doesn't exceed the gateway's per-session startup // timeout. Each session stashes its (sessionId, serverHandle, itemHandles) // for phase 2. var openedSessions = new List<(string SessionId, int ServerHandle, int[] ItemHandles, string ClientName)>(sessionCount); for (int sessionIndex = 0; sessionIndex < sessionCount; sessionIndex++) { if (sessionIndex > 0 && sessionStartStaggerMs > 0) { await Task.Delay(sessionStartStaggerMs, cancellationToken).ConfigureAwait(false); } string thisClientName = sessionCount == 1 ? clientName : $"{clientName}-{sessionIndex:D2}"; OpenSessionReply openReply = await client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = thisClientName, ClientCorrelationId = CreateCorrelationId() }, cancellationToken) .ConfigureAwait(false); string sessionId = openReply.SessionId; MxCommandReply registerReply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = thisClientName }, }), cancellationToken) .ConfigureAwait(false); int serverHandle = RequireRegisterServerHandle(registerReply); SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; subscribe.TagAddresses.Add(tags); MxCommandReply subscribeReply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = subscribe, }), cancellationToken) .ConfigureAwait(false); SubscribeResult[] subscribeResults = subscribeReply.SubscribeBulk?.Results.ToArray() ?? []; int[] itemHandles = subscribeResults.Where(r => r.WasSuccessful).Select(r => r.ItemHandle).ToArray(); totalSubscribedTagCount += itemHandles.Length; totalSubscribeFailures += subscribeResults.Count(r => !r.WasSuccessful); openedSessions.Add((sessionId, serverHandle, itemHandles, thisClientName)); } // Phase 2: now every session is open + advised. Start the measurement // window and run each session's StreamEvents reader in parallel. warmupStart = DateTime.UtcNow; warmupEnd = warmupStart + TimeSpan.FromSeconds(warmupSeconds); steadyEnd = warmupEnd + TimeSpan.FromSeconds(durationSeconds); async Task RunStreamAsync((string SessionId, int ServerHandle, int[] ItemHandles, string ClientName) ctx) { using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); Task streamTask = Task.Run(async () => { StreamEventsRequest streamRequest = new() { SessionId = ctx.SessionId }; await foreach (MxEvent mxEvent in client.StreamEventsAsync(streamRequest, streamCts.Token) .ConfigureAwait(false)) { DateTime nowUtc = DateTime.UtcNow; if (nowUtc >= steadyEnd) { break; } if (nowUtc < warmupEnd) { Interlocked.Increment(ref warmupEvents); continue; } // Guarded by latencyLock so parallel sessions can't tear a 64-bit // DateTime? read or stomp an already-set firstSteadyEventUtc with // a later timestamp from a slower-to-start session. The lock is // already held by the latency append a few lines below, so the // extra cost is one uncontended lock acquisition per event. lock (latencyLock) { firstSteadyEventUtc ??= nowUtc; lastSteadyEventUtc = nowUtc; } Interlocked.Increment(ref steadyEvents); if (mxEvent.Family == MxEventFamily.OnDataChange) { Interlocked.Increment(ref steadyDataChangeEvents); } if (mxEvent.WorkerTimestamp is { } workerStamp) { double latency = (nowUtc - workerStamp.ToDateTime()).TotalMilliseconds; if (latency >= 0) { lock (latencyLock) { endToEndLatencyMs.Add(latency); } } } } }, streamCts.Token); // The inner streamTask MUST be observed on every path — including when // the outer cancellationToken cancels during the Task.Delay below — or // its fault surfaces as a TaskScheduler.UnobservedTaskException after // GC. Use try/finally so the cancel + await pair always runs (see // Client.Dotnet-016). RpcException(Cancelled) never reaches here in // production because GrpcMxGatewayClientTransport.StreamEventsAsync // routes through RpcExceptionMapper.Map, which returns OCE for // StatusCode.Cancelled. try { await Task.Delay(steadyEnd - warmupStart, cancellationToken).ConfigureAwait(false); } finally { streamCts.Cancel(); try { await streamTask.ConfigureAwait(false); } catch (OperationCanceledException) { } catch (MxGatewayException) { } } try { MxCommandReply drainReply = await client.InvokeAsync( CreateCommandRequest(ctx.SessionId, new MxCommand { Kind = MxCommandKind.DrainEvents, DrainEvents = new DrainEventsCommand { MaxEvents = 16 }, }), cancellationToken) .ConfigureAwait(false); Interlocked.Add(ref totalDrainedFaultCount, drainReply.DrainEvents?.Events.Count ?? 0); } catch { /* fault probe is best-effort */ } if (ctx.ItemHandles.Length > 0) { try { UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = ctx.ServerHandle }; unsubscribe.ItemHandles.Add(ctx.ItemHandles); _ = await client.InvokeAsync( CreateCommandRequest(ctx.SessionId, new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = unsubscribe, }), cancellationToken) .ConfigureAwait(false); } catch { /* best-effort */ } } try { await client.CloseSessionAsync( new CloseSessionRequest { SessionId = ctx.SessionId, ClientCorrelationId = CreateCorrelationId() }, cancellationToken) .ConfigureAwait(false); } catch { /* best-effort */ } } Task[] streamTasks = openedSessions.Select(RunStreamAsync).ToArray(); await Task.WhenAll(streamTasks).ConfigureAwait(false); double steadyElapsedSeconds = (firstSteadyEventUtc.HasValue && lastSteadyEventUtc.HasValue) ? (lastSteadyEventUtc.Value - firstSteadyEventUtc.Value).TotalSeconds : 0; double eventsPerSecond = steadyElapsedSeconds > 0 ? steadyEvents / steadyElapsedSeconds : 0; double dataChangeEventsPerSecond = steadyElapsedSeconds > 0 ? steadyDataChangeEvents / steadyElapsedSeconds : 0; double[] latencySnapshot; lock (latencyLock) { latencySnapshot = endToEndLatencyMs.ToArray(); } object stats = new { language = "dotnet", command = "bench-stream-events", endpoint = arguments.GetOptional("endpoint") ?? "(default)", clientName, sessionCount, bulkSize, durationSeconds, warmupSeconds, allAttributes, steadyElapsedSeconds = Math.Round(steadyElapsedSeconds, 3), subscribedTagCount = totalSubscribedTagCount, subscribeFailures = totalSubscribeFailures, warmupEvents, steadyEvents, steadyDataChangeEvents, eventsPerSecond = Math.Round(eventsPerSecond, 2), dataChangeEventsPerSecond = Math.Round(dataChangeEventsPerSecond, 2), drainedFaultCount = totalDrainedFaultCount, endToEndLatencyMs = new { p50 = Percentile(latencySnapshot, 0.50), p95 = Percentile(latencySnapshot, 0.95), p99 = Percentile(latencySnapshot, 0.99), max = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Max(), 3) : 0, mean = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Average(), 3) : 0, sampleCount = latencySnapshot.Length, }, }; output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); return 0; } /// /// Computes the requested percentile from an unsorted latency sample using /// nearest-rank with linear interpolation. Rounds to 3 decimal places to /// match the JSON schema the PS driver collates. /// private static double Percentile(IReadOnlyList sample, double quantile) { if (sample.Count == 0) { return 0; } double[] sorted = sample.ToArray(); Array.Sort(sorted); if (sorted.Length == 1) { return Math.Round(sorted[0], 3); } double rank = quantile * (sorted.Length - 1); int lower = (int)Math.Floor(rank); int upper = (int)Math.Ceiling(rank); double fraction = rank - lower; double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction; return Math.Round(value, 3); } /// /// Parses the bulk-write CLI's --values list. All entries share /// the single --type argument; the comma-separated values are /// each parsed via on a per-entry basis. This /// keeps the CLI simple for e2e use (one type, N values) — callers /// that need heterogeneous types per entry should drive the library /// directly. /// private static IReadOnlyList ParseValuesList(CliArguments arguments) { string type = arguments.GetRequired("type"); string[] values = ParseStringList(arguments.GetRequired("values")).ToArray(); MxValue[] result = new MxValue[values.Length]; for (int i = 0; i < values.Length; i++) { result[i] = ParseValue(type, values[i]); } return result; } private static void EnsureSameLength(int handles, int values) { if (handles != values) { throw new ArgumentException( $"Bulk write requires the same number of --item-handles ({handles}) and --values ({values})."); } } private static Task WriteAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), }, }, cancellationToken); } private static Task Write2Async( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Write2, Write2 = new Write2Command { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), TimestampValue = ParseTimestampValue(arguments), }, }, cancellationToken); } private static async Task StreamEventsAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { uint maxEvents = arguments.GetUInt32("max-events", 0); bool json = arguments.HasFlag("json"); bool jsonLines = arguments.HasFlag("jsonl"); if (json && !jsonLines && maxEvents is 0) { throw new ArgumentException("--json stream-events requires --max-events to bound aggregate output."); } if (maxEvents > MaxAggregateEvents) { throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); } var events = json && !jsonLines ? new List(checked((int)maxEvents)) : []; uint eventCount = 0; var request = new StreamEventsRequest { SessionId = arguments.GetRequired("session-id"), AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0), }; try { await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken) .WithCancellation(cancellationToken) .ConfigureAwait(false)) { if (jsonLines) { output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); } else if (json) { events.Add(gatewayEvent); } else { output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); } eventCount++; if (maxEvents > 0 && eventCount >= maxEvents) { break; } } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Client.Dotnet-017: the supplied cancellation token covers both the // user's --timeout wall-clock budget (via CreateCancellation's // CancelAfter) and external Ctrl+C / parent CTS cancellation. All // three are graceful completion modes for a finite-window event // collector: emit the events that arrived before the window closed // and exit 0. The events list is well-formed at this point; the // aggregate JSON below still runs. This matches how the Go, Rust, // Python, and Java CLIs treat their equivalent timeouts. } if (json && !jsonLines) { output.WriteLine(JsonSerializer.Serialize( new { events = events.Select(EventToJsonElement).ToArray() }, JsonOptions)); } return 0; } private static async Task StreamAlarmsAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { uint maxEvents = arguments.GetUInt32("max-events", 0); bool json = arguments.HasFlag("json"); bool jsonLines = arguments.HasFlag("jsonl"); if (json && !jsonLines && maxEvents is 0) { throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output."); } if (maxEvents > MaxAggregateEvents) { throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); } var messages = json && !jsonLines ? new List(checked((int)maxEvents)) : []; uint messageCount = 0; var request = new StreamAlarmsRequest { ClientCorrelationId = CreateCorrelationId(), AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty, }; try { await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken) .WithCancellation(cancellationToken) .ConfigureAwait(false)) { if (jsonLines) { output.WriteLine(ProtobufJsonFormatter.Format(feedMessage)); } else if (json) { messages.Add(feedMessage); } else { output.WriteLine(FormatAlarmFeedMessage(feedMessage)); } messageCount++; if (maxEvents > 0 && messageCount >= maxEvents) { break; } } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Mirrors stream-events (Client.Dotnet-017): the supplied token covers // the user's --timeout wall-clock budget and external Ctrl+C / parent // CTS cancellation. All are graceful completion modes for a // finite-window alarm-feed collector: emit what arrived and exit 0. } if (json && !jsonLines) { output.WriteLine(JsonSerializer.Serialize( new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() }, JsonOptions)); } return 0; } private static Task AcknowledgeAlarmAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { var request = new AcknowledgeAlarmRequest { ClientCorrelationId = CreateCorrelationId(), AlarmFullReference = arguments.GetRequired("reference"), Comment = arguments.GetOptional("comment") ?? string.Empty, OperatorUser = arguments.GetOptional("operator") ?? string.Empty, }; return WriteReplyAsync( client.AcknowledgeAlarmAsync(request, cancellationToken), arguments, output); } /// /// Renders one for the human-readable /// (non-JSON) stream-alarms output, distinguishing the payload oneof /// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live /// transition. /// private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage) { return feedMessage.PayloadCase switch { AlarmFeedMessage.PayloadOneofCase.ActiveAlarm => $"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}", AlarmFeedMessage.PayloadOneofCase.SnapshotComplete => $"snapshot-complete {feedMessage.SnapshotComplete}", AlarmFeedMessage.PayloadOneofCase.Transition => $"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}", _ => $"unknown-payload {feedMessage.PayloadCase}", }; } private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage) { return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone(); } private static async Task SmokeAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { OpenSessionReply? openReply = null; CloseSessionReply? closeReply = null; var commandReplies = new List(); var events = new List(); try { openReply = await client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke", ClientCorrelationId = CreateCorrelationId(), }, cancellationToken) .ConfigureAwait(false); int serverHandle = await InvokeForHandleAsync( arguments, client, openReply.SessionId, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" }, }, RequireRegisterServerHandle, commandReplies, cancellationToken) .ConfigureAwait(false); int itemHandle = await InvokeForHandleAsync( arguments, client, openReply.SessionId, new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = arguments.GetRequired("item"), }, }, RequireAddItemItemHandle, commandReplies, cancellationToken) .ConfigureAwait(false); commandReplies.Add(await InvokeAndEnsureAsync( client, CreateCommandRequest( openReply.SessionId, new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }), cancellationToken) .ConfigureAwait(false)); if (arguments.GetOptional("value") is not null) { commandReplies.Add(await InvokeAndEnsureAsync( client, CreateCommandRequest( openReply.SessionId, new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), }, }), cancellationToken) .ConfigureAwait(false)); } using CancellationTokenSource streamCancellation = CancellationTokenSource .CreateLinkedTokenSource(cancellationToken); streamCancellation.CancelAfter(arguments.GetDuration( "event-timeout", TimeSpan.FromSeconds(2))); try { await foreach (MxEvent gatewayEvent in client.StreamEventsAsync( new StreamEventsRequest { SessionId = openReply.SessionId }, streamCancellation.Token) .WithCancellation(streamCancellation.Token) .ConfigureAwait(false)) { events.Add(gatewayEvent); if (events.Count >= arguments.GetUInt32("max-events", 1)) { break; } } } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { } } finally { if (openReply is not null) { closeReply = await client.CloseSessionAsync( new CloseSessionRequest { SessionId = openReply.SessionId, ClientCorrelationId = CreateCorrelationId(), }, CancellationToken.None) .ConfigureAwait(false); } } WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events); return 0; } private static async Task InvokeAndWriteAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, MxCommand command, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeAndEnsureAsync( client, CreateCommandRequest(arguments.GetRequired("session-id"), command), cancellationToken) .ConfigureAwait(false); WriteMessage(arguments, output, reply); return 0; } private static async Task InvokeForHandleAsync( CliArguments arguments, IMxGatewayCliClient client, string sessionId, MxCommand command, Func handleSelector, List replies, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, command), cancellationToken) .ConfigureAwait(false); replies.Add(reply); return handleSelector(reply); } private static async Task InvokeAndEnsureAsync( IMxGatewayCliClient client, MxCommandRequest request, CancellationToken cancellationToken) { MxCommandReply reply = await client.InvokeAsync(request, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply; } /// /// Returns the server handle from a successful register reply, or throws /// when the typed /// payload is absent. Mirrors the SDK-level /// contract: a successful reply without the typed payload is a gateway protocol /// error, not a license to fall through to ReturnValue.Int32Value (which is 0 /// when the reply carries no return value). /// private static int RequireRegisterServerHandle(MxCommandReply reply) { return reply.Register?.ServerHandle ?? throw CreateMissingPayloadException(reply, "register"); } /// /// Returns the item handle from a successful add_item reply, or throws /// when the typed /// payload is absent. See for the rationale. /// private static int RequireAddItemItemHandle(MxCommandReply reply) { return reply.AddItem?.ItemHandle ?? throw CreateMissingPayloadException(reply, "add_item"); } private static MxGatewayException CreateMissingPayloadException( MxCommandReply reply, string expectedPayload) { return new MxGatewayException( $"Gateway reply for command kind={reply.Kind} reported success but is missing " + $"the required '{expectedPayload}' payload; cannot resolve a handle. " + $"session={reply.SessionId}; correlation={reply.CorrelationId}"); } private static MxCommandRequest CreateCommandRequest( string sessionId, MxCommand command) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId(), Command = command, }; } private static async Task WriteReplyAsync( Task replyTask, CliArguments arguments, TextWriter output) where TReply : IMessage { TReply reply = await replyTask.ConfigureAwait(false); WriteMessage(arguments, output, reply); return 0; } private static void WriteVersion(CliArguments arguments, TextWriter output) { if (arguments.HasFlag("json")) { output.WriteLine(JsonSerializer.Serialize( new { gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion, workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion, }, JsonOptions)); return; } output.WriteLine( $"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}"); output.WriteLine( $"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}"); } private static void WriteMessage( CliArguments arguments, TextWriter output, IMessage message) { output.WriteLine(arguments.HasFlag("json") ? ProtobufJsonFormatter.Format(message) : message.ToString()); } private static void WriteSmokeResult( CliArguments arguments, TextWriter output, OpenSessionReply? openReply, CloseSessionReply? closeReply, IReadOnlyList commandReplies, IReadOnlyList events) { if (!arguments.HasFlag("json")) { output.WriteLine($"session-id={openReply?.SessionId}"); output.WriteLine($"commands={commandReplies.Count}"); output.WriteLine($"events={events.Count}"); output.WriteLine($"closed={closeReply is not null}"); return; } output.WriteLine(JsonSerializer.Serialize( new { sessionId = openReply?.SessionId, closed = closeReply is not null, commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(), events = events.Select(EventToJsonElement).ToArray(), }, JsonOptions)); } private static JsonElement CommandReplyToJsonElement(MxCommandReply reply) { return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone(); } private static JsonElement EventToJsonElement(MxEvent gatewayEvent) { return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone(); } private static MxValue ParseValue(CliArguments arguments) { return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value")); } private static MxValue ParseValue(string typeName, string value) { string type = typeName.ToLowerInvariant(); string[] values = value.Split(',', StringSplitOptions.TrimEntries); return type switch { "bool" or "boolean" => bool.Parse(value).ToMxValue(), "bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(), "int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "string" => value.ToMxValue(), "string-array" => values.ToMxValue(), "time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(), "time-array" or "timestamp-array" => values .Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal)) .ToArray() .ToMxValue(), _ => throw new ArgumentException($"Unsupported MX value type '{type}'."), }; } private static MxValue ParseTimestampValue(CliArguments arguments) { string timestamp = arguments.GetOptional("timestamp") ?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture); return DateTimeOffset.Parse( timestamp, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal) .ToMxValue(); } private static Task GalaxyTestConnectionAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.GalaxyTestConnectionAsync(new TestConnectionRequest(), cancellationToken), arguments, output); } private static Task GalaxyLastDeployAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.GalaxyGetLastDeployTimeAsync(new GetLastDeployTimeRequest(), cancellationToken), arguments, output); } private static async Task GalaxyDiscoverAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken) .ConfigureAwait(false); if (arguments.HasFlag("json")) { output.WriteLine(ProtobufJsonFormatter.Format(reply)); return 0; } output.WriteLine($"objects={reply.Objects.Count}"); foreach (GalaxyObject galaxyObject in reply.Objects) { output.WriteLine($"- gobject_id={galaxyObject.GobjectId} tag_name={galaxyObject.TagName} contained_name={galaxyObject.ContainedName} parent={galaxyObject.ParentGobjectId} attributes={galaxyObject.Attributes.Count}"); } return 0; } private static async Task DiscoverAllGalaxyHierarchyAsync( IMxGatewayCliClient client, CancellationToken cancellationToken) { DiscoverHierarchyReply aggregate = new(); HashSet seenPageTokens = new(StringComparer.Ordinal); string pageToken = string.Empty; do { DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync( new DiscoverHierarchyRequest { PageSize = 5000, PageToken = pageToken, }, cancellationToken) .ConfigureAwait(false); aggregate.Objects.Add(page.Objects); aggregate.TotalObjectCount = page.TotalObjectCount; pageToken = page.NextPageToken; if (!string.IsNullOrWhiteSpace(pageToken) && !seenPageTokens.Add(pageToken)) { throw new MxGatewayException( $"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'."); } } while (!string.IsNullOrWhiteSpace(pageToken)); return aggregate; } private static async Task GalaxyWatchAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { bool json = arguments.HasFlag("json"); uint maxEvents = arguments.GetUInt32("max-events", 0); if (maxEvents > MaxAggregateEvents) { throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); } WatchDeployEventsRequest request = new(); string? lastSeen = arguments.GetOptional("last-seen-deploy-time"); if (!string.IsNullOrWhiteSpace(lastSeen)) { DateTimeOffset parsed = DateTimeOffset.Parse( lastSeen, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal); request.LastSeenDeployTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(parsed); } using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); ConsoleCancelEventHandler handler = (_, eventArgs) => { eventArgs.Cancel = true; try { linked.Cancel(); } catch (ObjectDisposedException) { } }; Console.CancelKeyPress += handler; uint emitted = 0; try { await foreach (DeployEvent deployEvent in client .GalaxyWatchDeployEventsAsync(request, linked.Token) .WithCancellation(linked.Token) .ConfigureAwait(false)) { if (json) { output.WriteLine(ProtobufJsonFormatter.Format(deployEvent)); } else { output.WriteLine(FormatDeployEvent(deployEvent)); } emitted++; if (maxEvents > 0 && emitted >= maxEvents) { break; } } } catch (OperationCanceledException) when (linked.IsCancellationRequested && !cancellationToken.IsCancellationRequested) { // Ctrl+C-driven cancellation is a clean exit. } finally { Console.CancelKeyPress -= handler; } return 0; } private static string FormatDeployEvent(DeployEvent deployEvent) { string deployTime = deployEvent.TimeOfLastDeployPresent && deployEvent.TimeOfLastDeploy is not null ? deployEvent.TimeOfLastDeploy .ToDateTimeOffset() .ToString("O", CultureInfo.InvariantCulture) : ""; string observed = deployEvent.ObservedAt is not null ? deployEvent.ObservedAt .ToDateTimeOffset() .ToString("O", CultureInfo.InvariantCulture) : ""; return $"sequence={deployEvent.Sequence} observed_at={observed} time_of_last_deploy={deployTime} objects={deployEvent.ObjectCount} attributes={deployEvent.AttributeCount}"; } private static int WriteUnknownCommand(string command, TextWriter standardError) { standardError.WriteLine($"Unknown command '{command}'."); WriteUsage(standardError); return 2; } private static bool IsHelp(string value) { return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase) || string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase) || string.Equals(value, "help", StringComparison.OrdinalIgnoreCase); } private static bool IsKnownGatewayCommand(string command) { return command is "open-session" or "close-session" or "ping" or "register" or "add-item" or "advise" or "subscribe-bulk" or "unsubscribe-bulk" or "read-bulk" or "write-bulk" or "write2-bulk" or "write-secured-bulk" or "write-secured2-bulk" or "bench-read-bulk" or "bench-stream-events" or "stream-events" or "stream-alarms" or "acknowledge-alarm" or "write" or "write2" or "smoke" or "galaxy-test-connection" or "galaxy-last-deploy" or "galaxy-discover" or "galaxy-watch"; } private static IReadOnlyList ParseStringList(string value) { string[] items = value .Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); if (items.Length is 0) { throw new ArgumentException("At least one item is required."); } return items; } private static IReadOnlyList ParseInt32List(string value) { string[] items = value .Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); if (items.Length is 0) { throw new ArgumentException("At least one item handle is required."); } return items .Select(item => int.Parse(item, CultureInfo.InvariantCulture)) .ToArray(); } private static string CreateCorrelationId() { return Guid.NewGuid().ToString("N"); } private static void WriteUsage(TextWriter writer) { writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)"); writer.WriteLine("mxgw-dotnet version [--json]"); writer.WriteLine("mxgw-dotnet ping --session-id [--json]"); writer.WriteLine("mxgw-dotnet open-session [--client-name ] [--json]"); writer.WriteLine("mxgw-dotnet close-session --session-id [--json]"); writer.WriteLine("mxgw-dotnet register --session-id --client-name [--json]"); writer.WriteLine("mxgw-dotnet add-item --session-id --server-handle --item [--json]"); writer.WriteLine("mxgw-dotnet advise --session-id --server-handle --item-handle [--json]"); writer.WriteLine("mxgw-dotnet read-bulk --session-id --server-handle --items [--timeout-ms ] [--json]"); writer.WriteLine("mxgw-dotnet write-bulk --session-id --server-handle --item-handles --type --values [--user-id ] [--json]"); writer.WriteLine("mxgw-dotnet write2-bulk --session-id --server-handle --item-handles --type --values [--timestamp ] [--user-id ] [--json]"); writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id --server-handle --item-handles --type --values --current-user-id [--verifier-user-id ] [--json]"); writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id --server-handle --item-handles --type --values [--timestamp ] --current-user-id [--verifier-user-id ] [--json]"); writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id --server-handle --items [--json]"); writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id --server-handle --item-handles [--json]"); writer.WriteLine("mxgw-dotnet stream-events --session-id [--max-events ] [--json]"); writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix ] [--max-events ] [--json] [--jsonl]"); writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference [--comment ] [--operator ] [--json]"); writer.WriteLine("mxgw-dotnet write --session-id --server-handle --item-handle --type --value [--json]"); writer.WriteLine("mxgw-dotnet write2 --session-id --server-handle --item-handle --type --value [--timestamp ] [--json]"); writer.WriteLine("mxgw-dotnet smoke --item [--value --type ] [--json]"); writer.WriteLine("mxgw-dotnet galaxy-test-connection [--json]"); writer.WriteLine("mxgw-dotnet galaxy-last-deploy [--json]"); writer.WriteLine("mxgw-dotnet galaxy-discover [--json]"); writer.WriteLine("mxgw-dotnet galaxy-watch [--last-seen-deploy-time ] [--max-events ] [--json]"); } }