Files
mxaccessgw/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs
T
Joseph Doherty eaa7093cd6 .NET CLI: register the five new bulk subcommands in IsKnownGatewayCommand
The previous commit added read-bulk / write-bulk / write2-bulk /
write-secured-bulk / write-secured2-bulk dispatch cases to RunCoreAsync
but left them out of IsKnownGatewayCommand, so the .NET CLI rejected
them at the pre-dispatch gate and printed the usage banner instead of
running the new code paths. Surfaced when the live e2e exercised the
read-bulk phase against the deployed gateway — the call routed through
the unknown-command path before reaching the protobuf builder.

Also extends WriteUsage with one line per new subcommand so the banner
documents the new surface.

Live e2e against the deployed gateway now passes for all five clients
(dotnet, go, rust, python, java) with 4/4 tags returning was_cached=true
after the subscribe-bulk + read-bulk path, confirming the worker
MxAccessValueCache populates from real MXAccess OnDataChange events and
round-trips through every client''s JSON parser.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 04:48:55 -04:00

1309 lines
48 KiB
C#

using System.Globalization;
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Client.Cli;
/// <summary>Command-line interface for the MXAccess Gateway client, supporting session and command operations.</summary>
public static class MxGatewayClientCli
{
private const uint MaxAggregateEvents = 10_000;
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
/// <summary>Runs the CLI synchronously with the given arguments, writing output and errors.</summary>
/// <param name="args">Command-line arguments (command name followed by options).</param>
/// <param name="standardOutput">TextWriter for command output.</param>
/// <param name="standardError">TextWriter for error messages.</param>
public static int Run(
string[] args,
TextWriter standardOutput,
TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
.GetAwaiter()
.GetResult();
}
/// <summary>Runs the CLI asynchronously with the given arguments, writing output and errors.</summary>
/// <param name="args">Command-line arguments (command name followed by options).</param>
/// <param name="standardOutput">TextWriter for command output.</param>
/// <param name="standardError">TextWriter for error messages.</param>
/// <param name="clientFactory">Optional factory to create the gateway client; defaults to MxGatewayClient.Create.</param>
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
ArgumentNullException.ThrowIfNull(standardError);
return RunCoreAsync(
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
}
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
{
if (args.Length is 0 || IsHelp(args[0]))
{
WriteUsage(standardOutput);
return 0;
}
string command = args[0].ToLowerInvariant();
CliArguments arguments = new(args.Skip(1));
try
{
if (command is "version")
{
WriteVersion(arguments, standardOutput);
return 0;
}
if (!IsKnownGatewayCommand(command))
{
return WriteUnknownCommand(command, standardError);
}
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
using CancellationTokenSource cancellation = CreateCancellation(arguments, command);
return command switch
{
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"subscribe-bulk" => await SubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"galaxy-test-connection" => await GalaxyTestConnectionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"galaxy-last-deploy" => await GalaxyLastDeployAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"galaxy-discover" => await GalaxyDiscoverAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"galaxy-watch" => await GalaxyWatchAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
_ => WriteUnknownCommand(command, standardError),
};
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
// Redact the effective API key — whether it came from --api-key or from
// the (documented default) --api-key-env environment variable — so a
// transport error message that echoes the bearer token is never printed.
string? apiKey = TryResolveApiKey(arguments);
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
JsonOptions));
}
else
{
standardError.WriteLine(message);
}
return 1;
}
}
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
{
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
}
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
{
string endpoint = arguments.GetOptional("endpoint")
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
?? "http://localhost:5000";
string apiKey = ResolveApiKey(arguments);
return new MxGatewayClientOptions
{
Endpoint = new Uri(endpoint, UriKind.Absolute),
ApiKey = apiKey,
UseTls = arguments.HasFlag("tls")
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
CaCertificatePath = arguments.GetOptional("ca-file"),
ServerNameOverride = arguments.GetOptional("server-name"),
};
}
private static string ResolveApiKey(CliArguments arguments)
{
string? apiKey = TryResolveApiKey(arguments);
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
?? "MXGATEWAY_API_KEY";
throw new ArgumentException(
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
}
/// <summary>
/// Resolves the effective API key from <c>--api-key</c> or, failing that, the
/// environment variable named by <c>--api-key-env</c> (default
/// <c>MXGATEWAY_API_KEY</c>). Returns <see langword="null"/> when no key is
/// configured; used for redaction where a missing key must not throw.
/// </summary>
private static string? TryResolveApiKey(CliArguments arguments)
{
string? apiKey = arguments.GetOptional("api-key");
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
?? "MXGATEWAY_API_KEY";
return Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
}
private static CancellationTokenSource CreateCancellation(CliArguments arguments, string command)
{
var cancellation = new CancellationTokenSource();
// Long-running streaming commands run until Ctrl+C / cancellation by default;
// a caller-supplied --timeout still applies if present.
bool isLongRunning = command is "galaxy-watch";
string? rawTimeout = arguments.GetOptional("timeout");
if (isLongRunning && string.IsNullOrWhiteSpace(rawTimeout))
{
return cancellation;
}
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
cancellation.CancelAfter(timeout);
return cancellation;
}
private static Task<int> OpenSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
ClientCorrelationId = CreateCorrelationId(),
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
},
cancellationToken),
arguments,
output);
}
private static Task<int> CloseSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = arguments.GetRequired("session-id"),
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken),
arguments,
output);
}
private static Task<int> PingAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
},
cancellationToken);
}
private static Task<int> RegisterAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
},
cancellationToken);
}
private static Task<int> AddItemAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemDefinition = arguments.GetRequired("item"),
},
},
cancellationToken);
}
private static Task<int> AdviseAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
},
},
cancellationToken);
}
private static Task<int> SubscribeBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
SubscribeBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = command,
},
cancellationToken);
}
private static Task<int> UnsubscribeBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
UnsubscribeBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
command.ItemHandles.Add(ParseInt32List(arguments.GetRequired("item-handles")));
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = command,
},
cancellationToken);
}
private static Task<int> ReadBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
ReadBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0),
};
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.ReadBulk,
ReadBulk = command,
},
cancellationToken);
}
private static Task<int> WriteBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
int userId = arguments.GetInt32("user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteBulkEntry
{
ItemHandle = handles[i],
Value = values[i],
UserId = userId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteBulk,
WriteBulk = command,
},
cancellationToken);
}
private static Task<int> Write2BulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
Write2BulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
MxValue timestampValue = ParseTimestampValue(arguments);
int userId = arguments.GetInt32("user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new Write2BulkEntry
{
ItemHandle = handles[i],
Value = values[i],
TimestampValue = timestampValue,
UserId = userId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2Bulk,
Write2Bulk = command,
},
cancellationToken);
}
private static Task<int> WriteSecuredBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteSecuredBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
int currentUserId = arguments.GetInt32("current-user-id");
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteSecuredBulkEntry
{
ItemHandle = handles[i],
Value = values[i],
CurrentUserId = currentUserId,
VerifierUserId = verifierUserId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteSecuredBulk,
WriteSecuredBulk = command,
},
cancellationToken);
}
private static Task<int> WriteSecured2BulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteSecured2BulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
MxValue timestampValue = ParseTimestampValue(arguments);
int currentUserId = arguments.GetInt32("current-user-id");
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteSecured2BulkEntry
{
ItemHandle = handles[i],
Value = values[i],
TimestampValue = timestampValue,
CurrentUserId = currentUserId,
VerifierUserId = verifierUserId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteSecured2Bulk,
WriteSecured2Bulk = command,
},
cancellationToken);
}
/// <summary>
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
/// the single <c>--type</c> argument; the comma-separated values are
/// each parsed via <see cref="ParseValue"/> on a per-entry basis. This
/// keeps the CLI simple for e2e use (one type, N values) — callers
/// that need heterogeneous types per entry should drive the library
/// directly.
/// </summary>
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
{
string type = arguments.GetRequired("type");
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
MxValue[] result = new MxValue[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = ParseValue(type, values[i]);
}
return result;
}
private static void EnsureSameLength(int handles, int values)
{
if (handles != values)
{
throw new ArgumentException(
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
}
}
private static Task<int> WriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
},
cancellationToken);
}
private static Task<int> Write2Async(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
TimestampValue = ParseTimestampValue(arguments),
},
},
cancellationToken);
}
private static async Task<int> StreamEventsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
uint maxEvents = arguments.GetUInt32("max-events", 0);
bool json = arguments.HasFlag("json");
bool jsonLines = arguments.HasFlag("jsonl");
if (json && !jsonLines && maxEvents is 0)
{
throw new ArgumentException("--json stream-events requires --max-events to bound aggregate output.");
}
if (maxEvents > MaxAggregateEvents)
{
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
}
var events = json && !jsonLines
? new List<MxEvent>(checked((int)maxEvents))
: [];
uint eventCount = 0;
var request = new StreamEventsRequest
{
SessionId = arguments.GetRequired("session-id"),
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
};
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (jsonLines)
{
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
}
else if (json)
{
events.Add(gatewayEvent);
}
else
{
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
}
eventCount++;
if (maxEvents > 0 && eventCount >= maxEvents)
{
break;
}
}
if (json && !jsonLines)
{
output.WriteLine(JsonSerializer.Serialize(
new { events = events.Select(EventToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
OpenSessionReply? openReply = null;
CloseSessionReply? closeReply = null;
var commandReplies = new List<MxCommandReply>();
var events = new List<MxEvent>();
try
{
openReply = await client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken)
.ConfigureAwait(false);
int serverHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
},
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
int itemHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = arguments.GetRequired("item"),
},
},
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}),
cancellationToken)
.ConfigureAwait(false));
if (arguments.GetOptional("value") is not null)
{
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
}),
cancellationToken)
.ConfigureAwait(false));
}
using CancellationTokenSource streamCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
streamCancellation.CancelAfter(arguments.GetDuration(
"event-timeout",
TimeSpan.FromSeconds(2)));
try
{
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
new StreamEventsRequest { SessionId = openReply.SessionId },
streamCancellation.Token)
.WithCancellation(streamCancellation.Token)
.ConfigureAwait(false))
{
events.Add(gatewayEvent);
if (events.Count >= arguments.GetUInt32("max-events", 1))
{
break;
}
}
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
}
}
finally
{
if (openReply is not null)
{
closeReply = await client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = openReply.SessionId,
ClientCorrelationId = CreateCorrelationId(),
},
CancellationToken.None)
.ConfigureAwait(false);
}
}
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
return 0;
}
private static async Task<int> InvokeAndWriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
MxCommand command,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(arguments.GetRequired("session-id"), command),
cancellationToken)
.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static async Task<int> InvokeForHandleAsync(
CliArguments arguments,
IMxGatewayCliClient client,
string sessionId,
MxCommand command,
Func<MxCommandReply, int> handleSelector,
List<MxCommandReply> replies,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, command),
cancellationToken)
.ConfigureAwait(false);
replies.Add(reply);
return handleSelector(reply);
}
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
IMxGatewayCliClient client,
MxCommandRequest request,
CancellationToken cancellationToken)
{
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply;
}
private static MxCommandRequest CreateCommandRequest(
string sessionId,
MxCommand command)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = CreateCorrelationId(),
Command = command,
};
}
private static async Task<int> WriteReplyAsync<TReply>(
Task<TReply> replyTask,
CliArguments arguments,
TextWriter output)
where TReply : IMessage
{
TReply reply = await replyTask.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static void WriteVersion(CliArguments arguments, TextWriter output)
{
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new
{
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
},
JsonOptions));
return;
}
output.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
output.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
}
private static void WriteMessage(
CliArguments arguments,
TextWriter output,
IMessage message)
{
output.WriteLine(arguments.HasFlag("json")
? ProtobufJsonFormatter.Format(message)
: message.ToString());
}
private static void WriteSmokeResult(
CliArguments arguments,
TextWriter output,
OpenSessionReply? openReply,
CloseSessionReply? closeReply,
IReadOnlyList<MxCommandReply> commandReplies,
IReadOnlyList<MxEvent> events)
{
if (!arguments.HasFlag("json"))
{
output.WriteLine($"session-id={openReply?.SessionId}");
output.WriteLine($"commands={commandReplies.Count}");
output.WriteLine($"events={events.Count}");
output.WriteLine($"closed={closeReply is not null}");
return;
}
output.WriteLine(JsonSerializer.Serialize(
new
{
sessionId = openReply?.SessionId,
closed = closeReply is not null,
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
events = events.Select(EventToJsonElement).ToArray(),
},
JsonOptions));
}
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
}
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
}
private static MxValue ParseValue(CliArguments arguments)
{
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
}
private static MxValue ParseValue(string typeName, string value)
{
string type = typeName.ToLowerInvariant();
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
return type switch
{
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"string" => value.ToMxValue(),
"string-array" => values.ToMxValue(),
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
"time-array" or "timestamp-array" => values
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
.ToArray()
.ToMxValue(),
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
};
}
private static MxValue ParseTimestampValue(CliArguments arguments)
{
string timestamp = arguments.GetOptional("timestamp")
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
return DateTimeOffset.Parse(
timestamp,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal)
.ToMxValue();
}
private static Task<int> GalaxyTestConnectionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.GalaxyTestConnectionAsync(new TestConnectionRequest(), cancellationToken),
arguments,
output);
}
private static Task<int> GalaxyLastDeployAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.GalaxyGetLastDeployTimeAsync(new GetLastDeployTimeRequest(), cancellationToken),
arguments,
output);
}
private static async Task<int> GalaxyDiscoverAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken)
.ConfigureAwait(false);
if (arguments.HasFlag("json"))
{
output.WriteLine(ProtobufJsonFormatter.Format(reply));
return 0;
}
output.WriteLine($"objects={reply.Objects.Count}");
foreach (GalaxyObject galaxyObject in reply.Objects)
{
output.WriteLine($"- gobject_id={galaxyObject.GobjectId} tag_name={galaxyObject.TagName} contained_name={galaxyObject.ContainedName} parent={galaxyObject.ParentGobjectId} attributes={galaxyObject.Attributes.Count}");
}
return 0;
}
private static async Task<DiscoverHierarchyReply> DiscoverAllGalaxyHierarchyAsync(
IMxGatewayCliClient client,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply aggregate = new();
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
string pageToken = string.Empty;
do
{
DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync(
new DiscoverHierarchyRequest
{
PageSize = 5000,
PageToken = pageToken,
},
cancellationToken)
.ConfigureAwait(false);
aggregate.Objects.Add(page.Objects);
aggregate.TotalObjectCount = page.TotalObjectCount;
pageToken = page.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken)
&& !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
return aggregate;
}
private static async Task<int> GalaxyWatchAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
bool json = arguments.HasFlag("json");
uint maxEvents = arguments.GetUInt32("max-events", 0);
if (maxEvents > MaxAggregateEvents)
{
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
}
WatchDeployEventsRequest request = new();
string? lastSeen = arguments.GetOptional("last-seen-deploy-time");
if (!string.IsNullOrWhiteSpace(lastSeen))
{
DateTimeOffset parsed = DateTimeOffset.Parse(
lastSeen,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
request.LastSeenDeployTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(parsed);
}
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
ConsoleCancelEventHandler handler = (_, eventArgs) =>
{
eventArgs.Cancel = true;
try
{
linked.Cancel();
}
catch (ObjectDisposedException)
{
}
};
Console.CancelKeyPress += handler;
uint emitted = 0;
try
{
await foreach (DeployEvent deployEvent in client
.GalaxyWatchDeployEventsAsync(request, linked.Token)
.WithCancellation(linked.Token)
.ConfigureAwait(false))
{
if (json)
{
output.WriteLine(ProtobufJsonFormatter.Format(deployEvent));
}
else
{
output.WriteLine(FormatDeployEvent(deployEvent));
}
emitted++;
if (maxEvents > 0 && emitted >= maxEvents)
{
break;
}
}
}
catch (OperationCanceledException) when (linked.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
// Ctrl+C-driven cancellation is a clean exit.
}
finally
{
Console.CancelKeyPress -= handler;
}
return 0;
}
private static string FormatDeployEvent(DeployEvent deployEvent)
{
string deployTime = deployEvent.TimeOfLastDeployPresent && deployEvent.TimeOfLastDeploy is not null
? deployEvent.TimeOfLastDeploy
.ToDateTimeOffset()
.ToString("O", CultureInfo.InvariantCulture)
: "<none>";
string observed = deployEvent.ObservedAt is not null
? deployEvent.ObservedAt
.ToDateTimeOffset()
.ToString("O", CultureInfo.InvariantCulture)
: "<unknown>";
return $"sequence={deployEvent.Sequence} observed_at={observed} time_of_last_deploy={deployTime} objects={deployEvent.ObjectCount} attributes={deployEvent.AttributeCount}";
}
private static int WriteUnknownCommand(string command, TextWriter standardError)
{
standardError.WriteLine($"Unknown command '{command}'.");
WriteUsage(standardError);
return 2;
}
private static bool IsHelp(string value)
{
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
}
private static bool IsKnownGatewayCommand(string command)
{
return command is "open-session"
or "close-session"
or "ping"
or "register"
or "add-item"
or "advise"
or "subscribe-bulk"
or "unsubscribe-bulk"
or "read-bulk"
or "write-bulk"
or "write2-bulk"
or "write-secured-bulk"
or "write-secured2-bulk"
or "stream-events"
or "write"
or "write2"
or "smoke"
or "galaxy-test-connection"
or "galaxy-last-deploy"
or "galaxy-discover"
or "galaxy-watch";
}
private static IReadOnlyList<string> ParseStringList(string value)
{
string[] items = value
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
if (items.Length is 0)
{
throw new ArgumentException("At least one item is required.");
}
return items;
}
private static IReadOnlyList<int> ParseInt32List(string value)
{
string[] items = value
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
if (items.Length is 0)
{
throw new ArgumentException("At least one item handle is required.");
}
return items
.Select(item => int.Parse(item, CultureInfo.InvariantCulture))
.ToArray();
}
private static string CreateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
writer.WriteLine("mxgw-dotnet read-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--timeout-ms <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] [--user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] --current-user-id <n> [--verifier-user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
writer.WriteLine("mxgw-dotnet galaxy-test-connection [--json]");
writer.WriteLine("mxgw-dotnet galaxy-last-deploy [--json]");
writer.WriteLine("mxgw-dotnet galaxy-discover [--json]");
writer.WriteLine("mxgw-dotnet galaxy-watch [--last-seen-deploy-time <iso8601>] [--max-events <n>] [--json]");
}
}