using System.Globalization; using System.Text.Json; using Google.Protobuf; using MxGateway.Client; using MxGateway.Contracts.Proto; namespace MxGateway.Client.Cli; public static class MxGatewayClientCli { private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default; private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); public static int Run( string[] args, TextWriter standardOutput, TextWriter standardError) { return RunAsync(args, standardOutput, standardError) .GetAwaiter() .GetResult(); } public static Task RunAsync( string[] args, TextWriter standardOutput, TextWriter standardError, Func? clientFactory = null) { ArgumentNullException.ThrowIfNull(args); ArgumentNullException.ThrowIfNull(standardOutput); ArgumentNullException.ThrowIfNull(standardError); return RunCoreAsync( args, standardOutput, standardError, clientFactory ?? CreateDefaultClient); } private static async Task RunCoreAsync( string[] args, TextWriter standardOutput, TextWriter standardError, Func clientFactory) { if (args.Length is 0 || IsHelp(args[0])) { WriteUsage(standardOutput); return 0; } string command = args[0].ToLowerInvariant(); CliArguments arguments = new(args.Skip(1)); try { if (command is "version") { WriteVersion(arguments, standardOutput); return 0; } if (!IsKnownGatewayCommand(command)) { return WriteUnknownCommand(command, standardError); } await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments)); using CancellationTokenSource cancellation = CreateCancellation(arguments); return command switch { "open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), _ => WriteUnknownCommand(command, standardError), }; } catch (Exception exception) when (exception is not OperationCanceledException) { string? apiKey = arguments.GetOptional("api-key"); string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey); if (arguments.HasFlag("json")) { standardError.WriteLine(JsonSerializer.Serialize( new { error = message, type = exception.GetType().Name }, JsonOptions)); } else { standardError.WriteLine(message); } return 1; } } private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options) { return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options)); } private static MxGatewayClientOptions CreateOptions(CliArguments arguments) { string endpoint = arguments.GetOptional("endpoint") ?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT") ?? "http://localhost:5000"; string apiKey = ResolveApiKey(arguments); return new MxGatewayClientOptions { Endpoint = new Uri(endpoint, UriKind.Absolute), ApiKey = apiKey, UseTls = arguments.HasFlag("tls") || endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase), DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)), ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)), CaCertificatePath = arguments.GetOptional("ca-file"), ServerNameOverride = arguments.GetOptional("server-name"), }; } private static string ResolveApiKey(CliArguments arguments) { string? apiKey = arguments.GetOptional("api-key"); if (!string.IsNullOrWhiteSpace(apiKey)) { return apiKey; } string apiKeyEnvironmentName = arguments.GetOptional("api-key-env") ?? "MXGATEWAY_API_KEY"; apiKey = Environment.GetEnvironmentVariable(apiKeyEnvironmentName); if (!string.IsNullOrWhiteSpace(apiKey)) { return apiKey; } throw new ArgumentException( $"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}."); } private static CancellationTokenSource CreateCancellation(CliArguments arguments) { var cancellation = new CancellationTokenSource(); TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)); cancellation.CancelAfter(timeout); return cancellation; } private static Task OpenSessionAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli", ClientCorrelationId = CreateCorrelationId(), RequestedBackend = arguments.GetOptional("backend") ?? string.Empty, }, cancellationToken), arguments, output); } private static Task CloseSessionAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return WriteReplyAsync( client.CloseSessionAsync( new CloseSessionRequest { SessionId = arguments.GetRequired("session-id"), ClientCorrelationId = CreateCorrelationId(), }, cancellationToken), arguments, output); } private static Task PingAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" }, }, cancellationToken); } private static Task RegisterAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" }, }, cancellationToken); } private static Task AddItemAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemDefinition = arguments.GetRequired("item"), }, }, cancellationToken); } private static Task AdviseAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), }, }, cancellationToken); } private static Task WriteAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), }, }, cancellationToken); } private static Task Write2Async( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { return InvokeAndWriteAsync( arguments, client, output, new MxCommand { Kind = MxCommandKind.Write2, Write2 = new Write2Command { ServerHandle = arguments.GetInt32("server-handle"), ItemHandle = arguments.GetInt32("item-handle"), UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), TimestampValue = ParseTimestampValue(arguments), }, }, cancellationToken); } private static async Task StreamEventsAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { var events = new List(); uint maxEvents = arguments.GetUInt32("max-events", 0); uint eventCount = 0; var request = new StreamEventsRequest { SessionId = arguments.GetRequired("session-id"), AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0), }; await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken) .WithCancellation(cancellationToken) .ConfigureAwait(false)) { if (arguments.HasFlag("json")) { events.Add(gatewayEvent); } else { output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); } eventCount++; if (maxEvents > 0 && eventCount >= maxEvents) { break; } } if (arguments.HasFlag("json")) { output.WriteLine(JsonSerializer.Serialize( new { events = events.Select(EventToJsonElement).ToArray() }, JsonOptions)); } return 0; } private static async Task SmokeAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, CancellationToken cancellationToken) { OpenSessionReply? openReply = null; CloseSessionReply? closeReply = null; var commandReplies = new List(); var events = new List(); try { openReply = await client.OpenSessionAsync( new OpenSessionRequest { ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke", ClientCorrelationId = CreateCorrelationId(), }, cancellationToken) .ConfigureAwait(false); int serverHandle = await InvokeForHandleAsync( arguments, client, openReply.SessionId, new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" }, }, reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value, commandReplies, cancellationToken) .ConfigureAwait(false); int itemHandle = await InvokeForHandleAsync( arguments, client, openReply.SessionId, new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = arguments.GetRequired("item"), }, }, reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value, commandReplies, cancellationToken) .ConfigureAwait(false); commandReplies.Add(await InvokeAndEnsureAsync( client, CreateCommandRequest( openReply.SessionId, new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }), cancellationToken) .ConfigureAwait(false)); if (arguments.GetOptional("value") is not null) { commandReplies.Add(await InvokeAndEnsureAsync( client, CreateCommandRequest( openReply.SessionId, new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, UserId = arguments.GetInt32("user-id", 0), Value = ParseValue(arguments), }, }), cancellationToken) .ConfigureAwait(false)); } using CancellationTokenSource streamCancellation = CancellationTokenSource .CreateLinkedTokenSource(cancellationToken); streamCancellation.CancelAfter(arguments.GetDuration( "event-timeout", TimeSpan.FromSeconds(2))); try { await foreach (MxEvent gatewayEvent in client.StreamEventsAsync( new StreamEventsRequest { SessionId = openReply.SessionId }, streamCancellation.Token) .WithCancellation(streamCancellation.Token) .ConfigureAwait(false)) { events.Add(gatewayEvent); if (events.Count >= arguments.GetUInt32("max-events", 1)) { break; } } } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { } } finally { if (openReply is not null) { closeReply = await client.CloseSessionAsync( new CloseSessionRequest { SessionId = openReply.SessionId, ClientCorrelationId = CreateCorrelationId(), }, CancellationToken.None) .ConfigureAwait(false); } } WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events); return 0; } private static async Task InvokeAndWriteAsync( CliArguments arguments, IMxGatewayCliClient client, TextWriter output, MxCommand command, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeAndEnsureAsync( client, CreateCommandRequest(arguments.GetRequired("session-id"), command), cancellationToken) .ConfigureAwait(false); WriteMessage(arguments, output, reply); return 0; } private static async Task InvokeForHandleAsync( CliArguments arguments, IMxGatewayCliClient client, string sessionId, MxCommand command, Func handleSelector, List replies, CancellationToken cancellationToken) { MxCommandReply reply = await InvokeAndEnsureAsync( client, CreateCommandRequest(sessionId, command), cancellationToken) .ConfigureAwait(false); replies.Add(reply); return handleSelector(reply); } private static async Task InvokeAndEnsureAsync( IMxGatewayCliClient client, MxCommandRequest request, CancellationToken cancellationToken) { MxCommandReply reply = await client.InvokeAsync(request, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply; } private static MxCommandRequest CreateCommandRequest( string sessionId, MxCommand command) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId(), Command = command, }; } private static async Task WriteReplyAsync( Task replyTask, CliArguments arguments, TextWriter output) where TReply : IMessage { TReply reply = await replyTask.ConfigureAwait(false); WriteMessage(arguments, output, reply); return 0; } private static void WriteVersion(CliArguments arguments, TextWriter output) { if (arguments.HasFlag("json")) { output.WriteLine(JsonSerializer.Serialize( new { gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion, workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion, }, JsonOptions)); return; } output.WriteLine( $"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}"); output.WriteLine( $"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}"); } private static void WriteMessage( CliArguments arguments, TextWriter output, IMessage message) { output.WriteLine(arguments.HasFlag("json") ? ProtobufJsonFormatter.Format(message) : message.ToString()); } private static void WriteSmokeResult( CliArguments arguments, TextWriter output, OpenSessionReply? openReply, CloseSessionReply? closeReply, IReadOnlyList commandReplies, IReadOnlyList events) { if (!arguments.HasFlag("json")) { output.WriteLine($"session-id={openReply?.SessionId}"); output.WriteLine($"commands={commandReplies.Count}"); output.WriteLine($"events={events.Count}"); output.WriteLine($"closed={closeReply is not null}"); return; } output.WriteLine(JsonSerializer.Serialize( new { sessionId = openReply?.SessionId, closed = closeReply is not null, commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(), events = events.Select(EventToJsonElement).ToArray(), }, JsonOptions)); } private static JsonElement CommandReplyToJsonElement(MxCommandReply reply) { return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone(); } private static JsonElement EventToJsonElement(MxEvent gatewayEvent) { return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone(); } private static MxValue ParseValue(CliArguments arguments) { string type = arguments.GetRequired("type").ToLowerInvariant(); string value = arguments.GetRequired("value"); string[] values = value.Split(',', StringSplitOptions.TrimEntries); return type switch { "bool" or "boolean" => bool.Parse(value).ToMxValue(), "bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(), "int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(), "double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(), "string" => value.ToMxValue(), "string-array" => values.ToMxValue(), "time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(), "time-array" or "timestamp-array" => values .Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal)) .ToArray() .ToMxValue(), _ => throw new ArgumentException($"Unsupported MX value type '{type}'."), }; } private static MxValue ParseTimestampValue(CliArguments arguments) { string timestamp = arguments.GetOptional("timestamp") ?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture); return DateTimeOffset.Parse( timestamp, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal) .ToMxValue(); } private static int WriteUnknownCommand(string command, TextWriter standardError) { standardError.WriteLine($"Unknown command '{command}'."); WriteUsage(standardError); return 2; } private static bool IsHelp(string value) { return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase) || string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase) || string.Equals(value, "help", StringComparison.OrdinalIgnoreCase); } private static bool IsKnownGatewayCommand(string command) { return command is "open-session" or "close-session" or "ping" or "register" or "add-item" or "advise" or "stream-events" or "write" or "write2" or "smoke"; } private static string CreateCorrelationId() { return Guid.NewGuid().ToString("N"); } private static void WriteUsage(TextWriter writer) { writer.WriteLine("mxgw-dotnet version [--json]"); writer.WriteLine("mxgw-dotnet ping --session-id [--json]"); writer.WriteLine("mxgw-dotnet open-session [--client-name ] [--json]"); writer.WriteLine("mxgw-dotnet close-session --session-id [--json]"); writer.WriteLine("mxgw-dotnet register --session-id --client-name [--json]"); writer.WriteLine("mxgw-dotnet add-item --session-id --server-handle --item [--json]"); writer.WriteLine("mxgw-dotnet advise --session-id --server-handle --item-handle [--json]"); writer.WriteLine("mxgw-dotnet stream-events --session-id [--max-events ] [--json]"); writer.WriteLine("mxgw-dotnet write --session-id --server-handle --item-handle --type --value [--json]"); writer.WriteLine("mxgw-dotnet write2 --session-id --server-handle --item-handle --type --value [--timestamp ] [--json]"); writer.WriteLine("mxgw-dotnet smoke --item [--value --type ] [--json]"); } }