9eedf9d6a9
A consuming project hit two MXAccess parity surprises: a plain Write only records its user_id when the item has an active supervisory advise (the path to take when not authenticating), and array writes replace the whole array rather than patching individual elements. Document both across the five client READMEs and gateway.md's compatibility baseline, and expose the missing advise-supervisory subcommand in the go/python/rust/java CLIs (plus the .NET help text) so callers can establish the supervisory advise without dropping to the raw command API.
2115 lines
82 KiB
C#
2115 lines
82 KiB
C#
using System.Globalization;
|
|
using System.Text.Json;
|
|
using Google.Protobuf;
|
|
using ZB.MOM.WW.MxGateway.Client;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
|
|
|
|
namespace ZB.MOM.WW.MxGateway.Client.Cli;
|
|
|
|
/// <summary>Command-line interface for the MXAccess Gateway client, supporting session and command operations.</summary>
|
|
public static class MxGatewayClientCli
|
|
{
|
|
private const uint MaxAggregateEvents = 10_000;
|
|
|
|
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
|
|
|
|
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
|
|
|
|
private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__";
|
|
|
|
/// <summary>Runs the CLI synchronously with the given arguments, writing output and errors.</summary>
|
|
/// <param name="args">Command-line arguments (command name followed by options).</param>
|
|
/// <param name="standardOutput">TextWriter for command output.</param>
|
|
/// <param name="standardError">TextWriter for error messages.</param>
|
|
public static int Run(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError)
|
|
{
|
|
return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null)
|
|
.GetAwaiter()
|
|
.GetResult();
|
|
}
|
|
|
|
/// <summary>Runs the CLI asynchronously with the given arguments, writing output and errors.</summary>
|
|
/// <param name="args">Command-line arguments (command name followed by options).</param>
|
|
/// <param name="standardOutput">TextWriter for command output.</param>
|
|
/// <param name="standardError">TextWriter for error messages.</param>
|
|
/// <param name="clientFactory">Optional factory to create the gateway client; defaults to MxGatewayClient.Create.</param>
|
|
/// <param name="standardInput">Optional TextReader for batch-mode stdin; defaults to <see cref="Console.In"/>.</param>
|
|
public static Task<int> RunAsync(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null,
|
|
TextReader? standardInput = null)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(args);
|
|
ArgumentNullException.ThrowIfNull(standardOutput);
|
|
ArgumentNullException.ThrowIfNull(standardError);
|
|
|
|
return RunCoreAsync(
|
|
args,
|
|
standardOutput,
|
|
standardError,
|
|
clientFactory ?? CreateDefaultClient,
|
|
standardInput ?? Console.In);
|
|
}
|
|
|
|
private static async Task<int> RunCoreAsync(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
|
|
TextReader standardInput,
|
|
bool forceJsonErrors = false)
|
|
{
|
|
if (args.Length is 0 || IsHelp(args[0]))
|
|
{
|
|
WriteUsage(standardOutput);
|
|
return 0;
|
|
}
|
|
|
|
string command = args[0].ToLowerInvariant();
|
|
|
|
if (command is "batch")
|
|
{
|
|
return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false);
|
|
}
|
|
|
|
CliArguments arguments = new(args.Skip(1));
|
|
|
|
try
|
|
{
|
|
if (command is "version")
|
|
{
|
|
WriteVersion(arguments, standardOutput);
|
|
return 0;
|
|
}
|
|
|
|
if (!IsKnownGatewayCommand(command))
|
|
{
|
|
return WriteUnknownCommand(command, standardError);
|
|
}
|
|
|
|
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
|
|
using CancellationTokenSource cancellation = CreateCancellation(arguments, command);
|
|
|
|
return command switch
|
|
{
|
|
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"advise-supervisory" => await AdviseSupervisoryAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"subscribe-bulk" => await SubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-test-connection" => await GalaxyTestConnectionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-last-deploy" => await GalaxyLastDeployAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-discover" => await GalaxyDiscoverAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-browse" => await GalaxyBrowseAsync(arguments, client, standardOutput, standardError, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-watch" => await GalaxyWatchAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
_ => WriteUnknownCommand(command, standardError),
|
|
};
|
|
}
|
|
catch (Exception exception) when (exception is not OperationCanceledException)
|
|
{
|
|
// Client.Dotnet-028: redact the *effective* key — from --api-key or the
|
|
// --api-key-env environment variable — so an env-var-sourced key echoed
|
|
// in a transport error never reaches stderr unredacted.
|
|
string? apiKey = TryResolveApiKey(arguments);
|
|
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
|
|
|
|
if (forceJsonErrors || arguments.HasFlag("json"))
|
|
{
|
|
standardError.WriteLine(JsonSerializer.Serialize(
|
|
new { error = message, type = exception.GetType().Name },
|
|
JsonOptions));
|
|
}
|
|
else
|
|
{
|
|
standardError.WriteLine(message);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Runs the CLI in batch mode: reads one command line at a time from
|
|
/// <paramref name="standardInput"/>, dispatches it through the normal
|
|
/// routing, writes all output to <paramref name="standardOutput"/>, and
|
|
/// then appends <see cref="BatchEndOfRecord"/> as a sentinel so the
|
|
/// caller can delimit command results. Continues on failure; errors are
|
|
/// written as JSON to <paramref name="standardOutput"/> (not stderr) so
|
|
/// that the harness sees them inside the same delimited block. Exits 0
|
|
/// on EOF or empty line.
|
|
/// </summary>
|
|
private static async Task<int> RunBatchAsync(
|
|
TextWriter standardOutput,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
|
|
TextReader standardInput)
|
|
{
|
|
while (true)
|
|
{
|
|
string? line = await standardInput.ReadLineAsync().ConfigureAwait(false);
|
|
|
|
// EOF or empty line signals clean exit.
|
|
if (line is null || line.Length is 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
// Split on runs of ASCII whitespace — no quoting support by design.
|
|
string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
|
|
|
|
// Per-command output is buffered so we can redirect errors to stdout.
|
|
using StringWriter commandOutput = new();
|
|
|
|
// Errors in batch mode go to stdout (same delimited block), formatted as JSON.
|
|
// We use a capturing error writer and re-emit through commandOutput after the
|
|
// command returns, so the EOR sentinel always follows the complete result.
|
|
using StringWriter commandError = new();
|
|
|
|
try
|
|
{
|
|
await RunCoreAsync(
|
|
lineArgs,
|
|
commandOutput,
|
|
commandError,
|
|
clientFactory,
|
|
standardInput,
|
|
forceJsonErrors: true)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
|
|
// OperationCanceledException from long-running streaming commands
|
|
// (e.g. galaxy-watch hit by --timeout) is caught here too — the
|
|
// batch process must continue with the next command rather than
|
|
// unwinding.
|
|
commandError.WriteLine(JsonSerializer.Serialize(
|
|
new { error = exception.Message, type = exception.GetType().Name },
|
|
JsonOptions));
|
|
}
|
|
|
|
// Write any buffered normal output first.
|
|
string commandOutputText = commandOutput.ToString();
|
|
if (commandOutputText.Length > 0)
|
|
{
|
|
standardOutput.Write(commandOutputText);
|
|
}
|
|
|
|
// Then any error output — in batch mode it belongs on stdout so the harness
|
|
// sees it inside the delimited record.
|
|
string commandErrorText = commandError.ToString();
|
|
if (commandErrorText.Length > 0)
|
|
{
|
|
standardOutput.Write(commandErrorText);
|
|
}
|
|
|
|
// Write the end-of-record sentinel and flush so the harness can unblock.
|
|
standardOutput.WriteLine(BatchEndOfRecord);
|
|
await standardOutput.FlushAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
|
|
{
|
|
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
|
|
}
|
|
|
|
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
|
|
{
|
|
string endpoint = arguments.GetOptional("endpoint")
|
|
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
|
|
?? "http://localhost:5000";
|
|
|
|
string apiKey = ResolveApiKey(arguments);
|
|
|
|
return new MxGatewayClientOptions
|
|
{
|
|
Endpoint = new Uri(endpoint, UriKind.Absolute),
|
|
ApiKey = apiKey,
|
|
UseTls = arguments.HasFlag("tls")
|
|
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
|
|
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
|
|
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
|
|
CaCertificatePath = arguments.GetOptional("ca-file"),
|
|
ServerNameOverride = arguments.GetOptional("server-name"),
|
|
};
|
|
}
|
|
|
|
private static string ResolveApiKey(CliArguments arguments)
|
|
{
|
|
string? apiKey = TryResolveApiKey(arguments);
|
|
if (!string.IsNullOrWhiteSpace(apiKey))
|
|
{
|
|
return apiKey;
|
|
}
|
|
|
|
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
|
|
?? "MXGATEWAY_API_KEY";
|
|
|
|
throw new ArgumentException(
|
|
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resolves the effective API key from <c>--api-key</c> or, failing that,
|
|
/// the <c>--api-key-env</c>-named environment variable (default
|
|
/// <c>MXGATEWAY_API_KEY</c>), returning <see langword="null"/> when neither
|
|
/// is set. Unlike <see cref="ResolveApiKey"/> this never throws, so the
|
|
/// error-redaction catch block can strip the env-var-sourced key from
|
|
/// output (Client.Dotnet-028) without re-raising on the absent-key path.
|
|
/// </summary>
|
|
private static string? TryResolveApiKey(CliArguments arguments)
|
|
{
|
|
string? apiKey = arguments.GetOptional("api-key");
|
|
if (!string.IsNullOrWhiteSpace(apiKey))
|
|
{
|
|
return apiKey;
|
|
}
|
|
|
|
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
|
|
?? "MXGATEWAY_API_KEY";
|
|
|
|
return Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
|
|
}
|
|
|
|
private static CancellationTokenSource CreateCancellation(CliArguments arguments, string command)
|
|
{
|
|
var cancellation = new CancellationTokenSource();
|
|
// Long-running streaming commands run until Ctrl+C / cancellation by default;
|
|
// a caller-supplied --timeout still applies if present.
|
|
bool isLongRunning = command is "galaxy-watch" or "galaxy-browse";
|
|
string? rawTimeout = arguments.GetOptional("timeout");
|
|
if (isLongRunning && string.IsNullOrWhiteSpace(rawTimeout))
|
|
{
|
|
return cancellation;
|
|
}
|
|
|
|
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
|
|
cancellation.CancelAfter(timeout);
|
|
return cancellation;
|
|
}
|
|
|
|
private static Task<int> OpenSessionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.OpenSessionAsync(
|
|
new OpenSessionRequest
|
|
{
|
|
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
|
|
},
|
|
cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> CloseSessionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.CloseSessionAsync(
|
|
new CloseSessionRequest
|
|
{
|
|
SessionId = arguments.GetRequired("session-id"),
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> PingAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Ping,
|
|
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> RegisterAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> AddItemAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AddItem,
|
|
AddItem = new AddItemCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemDefinition = arguments.GetRequired("item"),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> AdviseAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Advise,
|
|
Advise = new AdviseCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> AdviseSupervisoryAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AdviseSupervisory,
|
|
AdviseSupervisory = new AdviseSupervisoryCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> SubscribeBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
SubscribeBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
SubscribeBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> UnsubscribeBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
UnsubscribeBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
command.ItemHandles.Add(ParseInt32List(arguments.GetRequired("item-handles")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.UnsubscribeBulk,
|
|
UnsubscribeBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> ReadBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ReadBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
TimeoutMs = ParseTimeoutMs(arguments, defaultValue: 0),
|
|
};
|
|
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.ReadBulk,
|
|
ReadBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
int userId = arguments.GetInt32("user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteBulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
UserId = userId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteBulk,
|
|
WriteBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> Write2BulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Write2BulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
|
int userId = arguments.GetInt32("user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new Write2BulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
TimestampValue = timestampValue,
|
|
UserId = userId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write2Bulk,
|
|
Write2Bulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteSecuredBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteSecuredBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
int currentUserId = arguments.GetInt32("current-user-id");
|
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteSecuredBulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
CurrentUserId = currentUserId,
|
|
VerifierUserId = verifierUserId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteSecuredBulk,
|
|
WriteSecuredBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteSecured2BulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteSecured2BulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
|
int currentUserId = arguments.GetInt32("current-user-id");
|
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteSecured2BulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
TimestampValue = timestampValue,
|
|
CurrentUserId = currentUserId,
|
|
VerifierUserId = verifierUserId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteSecured2Bulk,
|
|
WriteSecured2Bulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
|
|
/// the single <c>--type</c> argument; the comma-separated values are
|
|
/// each parsed via <see cref="ParseValue(string, string)"/> on a per-entry basis.
|
|
/// This keeps the CLI simple for e2e use (one type, N values) — callers
|
|
/// that need heterogeneous types per entry should drive the library
|
|
/// directly.
|
|
/// </summary>
|
|
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
|
|
{
|
|
string type = arguments.GetRequired("type");
|
|
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
|
|
MxValue[] result = new MxValue[values.Length];
|
|
for (int i = 0; i < values.Length; i++)
|
|
{
|
|
result[i] = ParseValue(type, values[i]);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private static void EnsureSameLength(int handles, int values)
|
|
{
|
|
if (handles != values)
|
|
{
|
|
throw new ArgumentException(
|
|
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses the optional <c>--timeout-ms</c> argument as a non-negative
|
|
/// unsigned millisecond count. Mirrors the SDK-side <c>(uint)Math.Min</c>
|
|
/// guard on <c>MxGatewaySession.ReadBulkAsync</c>: a negative value
|
|
/// (e.g. <c>-1</c>, an easy copy-paste mistake for "unbounded") is
|
|
/// rejected loudly rather than silently wrapped to <c>~49.7 days</c>,
|
|
/// which would park one worker thread per pending tag for hours.
|
|
/// Resolves Client.Dotnet-021.
|
|
/// </summary>
|
|
private static uint ParseTimeoutMs(CliArguments arguments, int defaultValue)
|
|
{
|
|
int raw = arguments.GetInt32("timeout-ms", defaultValue);
|
|
if (raw < 0)
|
|
{
|
|
throw new ArgumentException(
|
|
"--timeout-ms must be a non-negative integer (use 0 for the gateway default).");
|
|
}
|
|
|
|
return (uint)raw;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts the <c>ServerHandle</c> from a Register reply, throwing a
|
|
/// descriptive <see cref="MxGatewayException"/> when the typed
|
|
/// <c>Register</c> payload is absent on an otherwise-successful reply.
|
|
/// The typed sub-message is the contract for the Register command, so
|
|
/// its absence must not silently fall through to
|
|
/// <c>ReturnValue.Int32Value</c> (which would be <c>0</c> for an empty
|
|
/// reply, driving the rest of the bench against an invalid handle).
|
|
/// Resolves Client.Dotnet-019.
|
|
/// </summary>
|
|
private static int RequireRegisterServerHandle(MxCommandReply reply, string sessionId)
|
|
{
|
|
if (reply.Register is null)
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Gateway reply for Register on session '{sessionId}' (correlation '{reply.CorrelationId}') "
|
|
+ "succeeded but is missing the typed 'register' payload required to read ServerHandle.");
|
|
}
|
|
|
|
return reply.Register.ServerHandle;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cross-language stress benchmark for ReadBulk. Opens its own session,
|
|
/// subscribes to N tags so the worker's MxAccessValueCache populates from
|
|
/// real OnDataChange events, then hammers ReadBulk in a tight in-process
|
|
/// loop with per-call Stopwatch timing. Emits a single JSON object on
|
|
/// stdout that the scripts/bench-read-bulk.ps1 driver collates across
|
|
/// all five language clients.
|
|
/// </summary>
|
|
private static async Task<int> BenchReadBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
|
|
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
|
|
int bulkSize = arguments.GetInt32("bulk-size", 6);
|
|
int tagStart = arguments.GetInt32("tag-start", 1);
|
|
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
|
|
string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
|
|
uint timeoutMs = ParseTimeoutMs(arguments, defaultValue: 1500);
|
|
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench";
|
|
|
|
string[] tags = new string[bulkSize];
|
|
for (int i = 0; i < bulkSize; i++)
|
|
{
|
|
// TestMachine_NNN.<attribute>, three-digit machine numbers matching
|
|
// the existing e2e tag-discovery convention.
|
|
tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}";
|
|
}
|
|
|
|
// Open + register + subscribe-bulk so the cache populates before the
|
|
// measurement window opens.
|
|
OpenSessionReply openReply = await client.OpenSessionAsync(
|
|
new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
string sessionId = openReply.SessionId;
|
|
|
|
try
|
|
{
|
|
MxCommandReply registerReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = clientName },
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
int serverHandle = RequireRegisterServerHandle(registerReply, sessionId);
|
|
|
|
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
|
|
subscribe.TagAddresses.Add(tags);
|
|
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
SubscribeBulk = subscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
int[] itemHandles = subscribeReply.SubscribeBulk?.Results
|
|
.Where(r => r.WasSuccessful)
|
|
.Select(r => r.ItemHandle)
|
|
.ToArray() ?? [];
|
|
|
|
// Warm-up: drive the same call shape so the JIT / connection
|
|
// pipelines settle before the measurement window opens.
|
|
DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds);
|
|
ReadBulkCommand readBulkCommand = new()
|
|
{
|
|
ServerHandle = serverHandle,
|
|
TimeoutMs = timeoutMs,
|
|
};
|
|
readBulkCommand.TagAddresses.Add(tags);
|
|
MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand };
|
|
|
|
while (DateTime.UtcNow < warmupDeadline)
|
|
{
|
|
_ = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, readBulkMxCommand),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
// Steady state — capture per-call wall latency with a high-res
|
|
// Stopwatch so the resolution is sub-millisecond on modern Windows.
|
|
List<double> latencyMillis = new(capacity: 65536);
|
|
long totalReadResults = 0;
|
|
long cachedReadResults = 0;
|
|
int successfulCalls = 0;
|
|
int failedCalls = 0;
|
|
DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds);
|
|
DateTime steadyStart = DateTime.UtcNow;
|
|
|
|
while (DateTime.UtcNow < steadyDeadline)
|
|
{
|
|
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
|
|
MxCommandReply reply;
|
|
try
|
|
{
|
|
reply = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, readBulkMxCommand),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
sw.Stop();
|
|
}
|
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
|
{
|
|
// Client.Dotnet-020: never swallow OperationCanceledException
|
|
// here. A bare `catch` would let Ctrl+C / parent CTS /
|
|
// wall-clock timeouts keep spinning until --duration-seconds
|
|
// elapsed, burning CPU and skewing the p99/max latency numbers
|
|
// with hundreds of immediate-OCE iterations.
|
|
sw.Stop();
|
|
failedCalls++;
|
|
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
|
continue;
|
|
}
|
|
|
|
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
|
if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
|
|
{
|
|
failedCalls++;
|
|
continue;
|
|
}
|
|
|
|
successfulCalls++;
|
|
if (reply.ReadBulk is not null)
|
|
{
|
|
foreach (BulkReadResult r in reply.ReadBulk.Results)
|
|
{
|
|
totalReadResults++;
|
|
if (r.WasCached)
|
|
{
|
|
cachedReadResults++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds;
|
|
|
|
if (itemHandles.Length > 0)
|
|
{
|
|
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle };
|
|
unsubscribe.ItemHandles.Add(itemHandles);
|
|
_ = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.UnsubscribeBulk,
|
|
UnsubscribeBulk = unsubscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
int totalCalls = successfulCalls + failedCalls;
|
|
double callsPerSecond = steadyElapsedSeconds > 0
|
|
? totalCalls / steadyElapsedSeconds
|
|
: 0;
|
|
|
|
object stats = new
|
|
{
|
|
language = "dotnet",
|
|
command = "bench-read-bulk",
|
|
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
|
|
clientName,
|
|
bulkSize,
|
|
durationSeconds,
|
|
warmupSeconds,
|
|
durationMs = (long)(steadyElapsedSeconds * 1000),
|
|
tags,
|
|
totalCalls,
|
|
successfulCalls,
|
|
failedCalls,
|
|
totalReadResults,
|
|
cachedReadResults,
|
|
callsPerSecond = Math.Round(callsPerSecond, 2),
|
|
latencyMs = new
|
|
{
|
|
p50 = Percentile(latencyMillis, 0.50),
|
|
p95 = Percentile(latencyMillis, 0.95),
|
|
p99 = Percentile(latencyMillis, 0.99),
|
|
max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0,
|
|
mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0,
|
|
},
|
|
};
|
|
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
|
|
return 0;
|
|
}
|
|
finally
|
|
{
|
|
try
|
|
{
|
|
await client.CloseSessionAsync(
|
|
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// Closing the session is best-effort — never let it mask a real bench error.
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Computes the requested percentile from an unsorted latency sample using
|
|
/// nearest-rank with linear interpolation. Rounds to 3 decimal places to
|
|
/// match the JSON schema the PS driver collates.
|
|
/// </summary>
|
|
private static double Percentile(IReadOnlyList<double> sample, double quantile)
|
|
{
|
|
if (sample.Count == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
double[] sorted = sample.ToArray();
|
|
Array.Sort(sorted);
|
|
if (sorted.Length == 1)
|
|
{
|
|
return Math.Round(sorted[0], 3);
|
|
}
|
|
|
|
double rank = quantile * (sorted.Length - 1);
|
|
int lower = (int)Math.Floor(rank);
|
|
int upper = (int)Math.Ceiling(rank);
|
|
double fraction = rank - lower;
|
|
double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction;
|
|
return Math.Round(value, 3);
|
|
}
|
|
|
|
private static Task<int> WriteAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write,
|
|
Write = new WriteCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> Write2Async(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write2,
|
|
Write2 = new Write2Command
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
TimestampValue = ParseTimestampValue(arguments),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static async Task<int> StreamEventsAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
bool json = arguments.HasFlag("json");
|
|
bool jsonLines = arguments.HasFlag("jsonl");
|
|
if (json && !jsonLines && maxEvents is 0)
|
|
{
|
|
throw new ArgumentException("--json stream-events requires --max-events to bound aggregate output.");
|
|
}
|
|
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
var events = json && !jsonLines
|
|
? new List<MxEvent>(checked((int)maxEvents))
|
|
: [];
|
|
uint eventCount = 0;
|
|
var request = new StreamEventsRequest
|
|
{
|
|
SessionId = arguments.GetRequired("session-id"),
|
|
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
|
|
};
|
|
|
|
try
|
|
{
|
|
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
|
|
.WithCancellation(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (jsonLines)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
|
|
}
|
|
else if (json)
|
|
{
|
|
events.Add(gatewayEvent);
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
|
|
}
|
|
|
|
eventCount++;
|
|
if (maxEvents > 0 && eventCount >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Client.Dotnet-017: graceful end-of-window completion mode for a
|
|
// finite-window event collector. Emit aggregate JSON below and exit 0.
|
|
}
|
|
|
|
if (json && !jsonLines)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new { events = events.Select(EventToJsonElement).ToArray() },
|
|
JsonOptions));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> StreamAlarmsAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
bool json = arguments.HasFlag("json");
|
|
bool jsonLines = arguments.HasFlag("jsonl");
|
|
if (json && !jsonLines && maxEvents is 0)
|
|
{
|
|
throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output.");
|
|
}
|
|
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
var messages = json && !jsonLines
|
|
? new List<AlarmFeedMessage>(checked((int)maxEvents))
|
|
: [];
|
|
uint messageCount = 0;
|
|
var request = new StreamAlarmsRequest
|
|
{
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty,
|
|
};
|
|
|
|
try
|
|
{
|
|
await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken)
|
|
.WithCancellation(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (jsonLines)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(feedMessage));
|
|
}
|
|
else if (json)
|
|
{
|
|
messages.Add(feedMessage);
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(FormatAlarmFeedMessage(feedMessage));
|
|
}
|
|
|
|
messageCount++;
|
|
if (maxEvents > 0 && messageCount >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Mirrors stream-events (Client.Dotnet-017): the supplied token covers
|
|
// the user's --timeout wall-clock budget and external Ctrl+C / parent
|
|
// CTS cancellation. All are graceful completion modes for a
|
|
// finite-window alarm-feed collector: emit what arrived and exit 0.
|
|
}
|
|
|
|
if (json && !jsonLines)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() },
|
|
JsonOptions));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static Task<int> AcknowledgeAlarmAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var request = new AcknowledgeAlarmRequest
|
|
{
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
AlarmFullReference = arguments.GetRequired("reference"),
|
|
Comment = arguments.GetOptional("comment") ?? string.Empty,
|
|
OperatorUser = arguments.GetOptional("operator") ?? string.Empty,
|
|
};
|
|
|
|
return WriteReplyAsync(
|
|
client.AcknowledgeAlarmAsync(request, cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Renders one <see cref="AlarmFeedMessage"/> for the human-readable
|
|
/// (non-JSON) stream-alarms output, distinguishing the <c>payload</c> oneof
|
|
/// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live
|
|
/// transition.
|
|
/// </summary>
|
|
private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage)
|
|
{
|
|
return feedMessage.PayloadCase switch
|
|
{
|
|
AlarmFeedMessage.PayloadOneofCase.ActiveAlarm =>
|
|
$"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}",
|
|
AlarmFeedMessage.PayloadOneofCase.SnapshotComplete =>
|
|
$"snapshot-complete {feedMessage.SnapshotComplete}",
|
|
AlarmFeedMessage.PayloadOneofCase.Transition =>
|
|
$"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}",
|
|
_ => $"unknown-payload {feedMessage.PayloadCase}",
|
|
};
|
|
}
|
|
|
|
private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone();
|
|
}
|
|
|
|
private static async Task<int> SmokeAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
OpenSessionReply? openReply = null;
|
|
CloseSessionReply? closeReply = null;
|
|
var commandReplies = new List<MxCommandReply>();
|
|
var events = new List<MxEvent>();
|
|
|
|
try
|
|
{
|
|
openReply = await client.OpenSessionAsync(
|
|
new OpenSessionRequest
|
|
{
|
|
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
int serverHandle = await InvokeForHandleAsync(
|
|
arguments,
|
|
client,
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
|
|
},
|
|
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
|
|
commandReplies,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
int itemHandle = await InvokeForHandleAsync(
|
|
arguments,
|
|
client,
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AddItem,
|
|
AddItem = new AddItemCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemDefinition = arguments.GetRequired("item"),
|
|
},
|
|
},
|
|
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
|
|
commandReplies,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
commandReplies.Add(await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Advise,
|
|
Advise = new AdviseCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
},
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false));
|
|
|
|
if (arguments.GetOptional("value") is not null)
|
|
{
|
|
commandReplies.Add(await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write,
|
|
Write = new WriteCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
},
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false));
|
|
}
|
|
|
|
using CancellationTokenSource streamCancellation = CancellationTokenSource
|
|
.CreateLinkedTokenSource(cancellationToken);
|
|
streamCancellation.CancelAfter(arguments.GetDuration(
|
|
"event-timeout",
|
|
TimeSpan.FromSeconds(2)));
|
|
|
|
try
|
|
{
|
|
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
|
|
new StreamEventsRequest { SessionId = openReply.SessionId },
|
|
streamCancellation.Token)
|
|
.WithCancellation(streamCancellation.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
events.Add(gatewayEvent);
|
|
if (events.Count >= arguments.GetUInt32("max-events", 1))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (openReply is not null)
|
|
{
|
|
closeReply = await client.CloseSessionAsync(
|
|
new CloseSessionRequest
|
|
{
|
|
SessionId = openReply.SessionId,
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
CancellationToken.None)
|
|
.ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> InvokeAndWriteAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
MxCommand command,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(arguments.GetRequired("session-id"), command),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
WriteMessage(arguments, output, reply);
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> InvokeForHandleAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
string sessionId,
|
|
MxCommand command,
|
|
Func<MxCommandReply, int> handleSelector,
|
|
List<MxCommandReply> replies,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, command),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
replies.Add(reply);
|
|
return handleSelector(reply);
|
|
}
|
|
|
|
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
|
|
IMxGatewayCliClient client,
|
|
MxCommandRequest request,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
|
return reply;
|
|
}
|
|
|
|
private static MxCommandRequest CreateCommandRequest(
|
|
string sessionId,
|
|
MxCommand command)
|
|
{
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = sessionId,
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
Command = command,
|
|
};
|
|
}
|
|
|
|
private static async Task<int> WriteReplyAsync<TReply>(
|
|
Task<TReply> replyTask,
|
|
CliArguments arguments,
|
|
TextWriter output)
|
|
where TReply : IMessage
|
|
{
|
|
TReply reply = await replyTask.ConfigureAwait(false);
|
|
WriteMessage(arguments, output, reply);
|
|
return 0;
|
|
}
|
|
|
|
private static void WriteVersion(CliArguments arguments, TextWriter output)
|
|
{
|
|
if (arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
|
|
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
|
|
},
|
|
JsonOptions));
|
|
return;
|
|
}
|
|
|
|
output.WriteLine(
|
|
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
|
|
output.WriteLine(
|
|
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
|
|
}
|
|
|
|
private static void WriteMessage(
|
|
CliArguments arguments,
|
|
TextWriter output,
|
|
IMessage message)
|
|
{
|
|
output.WriteLine(arguments.HasFlag("json")
|
|
? ProtobufJsonFormatter.Format(message)
|
|
: message.ToString());
|
|
}
|
|
|
|
private static void WriteSmokeResult(
|
|
CliArguments arguments,
|
|
TextWriter output,
|
|
OpenSessionReply? openReply,
|
|
CloseSessionReply? closeReply,
|
|
IReadOnlyList<MxCommandReply> commandReplies,
|
|
IReadOnlyList<MxEvent> events)
|
|
{
|
|
if (!arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine($"session-id={openReply?.SessionId}");
|
|
output.WriteLine($"commands={commandReplies.Count}");
|
|
output.WriteLine($"events={events.Count}");
|
|
output.WriteLine($"closed={closeReply is not null}");
|
|
return;
|
|
}
|
|
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
sessionId = openReply?.SessionId,
|
|
closed = closeReply is not null,
|
|
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
|
|
events = events.Select(EventToJsonElement).ToArray(),
|
|
},
|
|
JsonOptions));
|
|
}
|
|
|
|
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
|
|
}
|
|
|
|
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
|
|
}
|
|
|
|
private static MxValue ParseValue(CliArguments arguments)
|
|
{
|
|
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
|
|
}
|
|
|
|
private static MxValue ParseValue(string type, string value)
|
|
{
|
|
string normalisedType = type.ToLowerInvariant();
|
|
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
|
|
|
|
return normalisedType switch
|
|
{
|
|
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
|
|
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
|
|
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"string" => value.ToMxValue(),
|
|
"string-array" => values.ToMxValue(),
|
|
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
|
|
"time-array" or "timestamp-array" => values
|
|
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
|
|
.ToArray()
|
|
.ToMxValue(),
|
|
_ => throw new ArgumentException($"Unsupported MX value type '{normalisedType}'."),
|
|
};
|
|
}
|
|
|
|
private static MxValue ParseTimestampValue(CliArguments arguments)
|
|
{
|
|
string timestamp = arguments.GetOptional("timestamp")
|
|
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
|
|
|
|
return DateTimeOffset.Parse(
|
|
timestamp,
|
|
CultureInfo.InvariantCulture,
|
|
DateTimeStyles.AssumeUniversal)
|
|
.ToMxValue();
|
|
}
|
|
|
|
private static Task<int> GalaxyTestConnectionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.GalaxyTestConnectionAsync(new TestConnectionRequest(), cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> GalaxyLastDeployAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.GalaxyGetLastDeployTimeAsync(new GetLastDeployTimeRequest(), cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static async Task<int> GalaxyDiscoverAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(reply));
|
|
return 0;
|
|
}
|
|
|
|
output.WriteLine($"objects={reply.Objects.Count}");
|
|
foreach (GalaxyObject galaxyObject in reply.Objects)
|
|
{
|
|
output.WriteLine($"- gobject_id={galaxyObject.GobjectId} tag_name={galaxyObject.TagName} contained_name={galaxyObject.ContainedName} parent={galaxyObject.ParentGobjectId} attributes={galaxyObject.Attributes.Count}");
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<DiscoverHierarchyReply> DiscoverAllGalaxyHierarchyAsync(
|
|
IMxGatewayCliClient client,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
DiscoverHierarchyReply aggregate = new();
|
|
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
|
|
string pageToken = string.Empty;
|
|
do
|
|
{
|
|
DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync(
|
|
new DiscoverHierarchyRequest
|
|
{
|
|
PageSize = 5000,
|
|
PageToken = pageToken,
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
aggregate.Objects.Add(page.Objects);
|
|
aggregate.TotalObjectCount = page.TotalObjectCount;
|
|
pageToken = page.NextPageToken;
|
|
if (!string.IsNullOrWhiteSpace(pageToken)
|
|
&& !seenPageTokens.Add(pageToken))
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
|
|
}
|
|
}
|
|
while (!string.IsNullOrWhiteSpace(pageToken));
|
|
|
|
return aggregate;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Per-request page size for the galaxy-browse single-level walks. Mirrors
|
|
/// the library's <c>BrowseChildrenPageSize</c> so the CLI and the
|
|
/// lazy-browse helper page identically.
|
|
/// </summary>
|
|
private const int BrowseChildrenCliPageSize = 500;
|
|
|
|
/// <summary>
|
|
/// Drives the lazy-browse Galaxy surface from the CLI. Without
|
|
/// <c>--parent</c> it walks the root objects and eagerly expands
|
|
/// <c>--depth</c> further levels (each level reuses the same
|
|
/// <see cref="BrowseChildrenOptions"/>, like the library helper). With
|
|
/// <c>--parent</c> it fetches exactly one level of children for that
|
|
/// gobject id via a parent-scoped BrowseChildren request; <c>--depth</c>
|
|
/// is not meaningful there and a warning is emitted if combined, mirroring
|
|
/// the Go/Rust CLIs.
|
|
/// </summary>
|
|
private static async Task<int> GalaxyBrowseAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
TextWriter standardError,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
BrowseChildrenOptions options = ParseBrowseChildrenOptions(arguments);
|
|
bool json = arguments.HasFlag("json");
|
|
int parent = arguments.GetInt32("parent", -1);
|
|
int depth = arguments.GetInt32("depth", 0);
|
|
|
|
// A specific parent → one level of children via the parent-scoped RPC.
|
|
if (parent >= 0)
|
|
{
|
|
if (depth > 0)
|
|
{
|
|
standardError.WriteLine("warning: --depth is ignored when --parent is specified.");
|
|
}
|
|
|
|
IReadOnlyList<GalaxyObject> children = await BrowseOneLevelAsync(
|
|
client,
|
|
options,
|
|
parent,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (json)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
command = "galaxy-browse",
|
|
parentId = parent,
|
|
children = children.Select(GalaxyObjectToJsonElement).ToArray(),
|
|
},
|
|
JsonOptions));
|
|
return 0;
|
|
}
|
|
|
|
output.WriteLine(children.Count.ToString(CultureInfo.InvariantCulture));
|
|
foreach (GalaxyObject child in children)
|
|
{
|
|
output.WriteLine(FormatGalaxyObject(child, level: 0, hasChildrenHint: null));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// No parent → walk the root objects, eagerly expanding --depth levels.
|
|
IReadOnlyList<BrowseTreeNode> roots = await BrowseTreeAsync(
|
|
client,
|
|
options,
|
|
parentGobjectId: 0,
|
|
remainingDepth: depth,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (json)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
command = "galaxy-browse",
|
|
nodes = roots.Select(BrowseTreeNodeToJson).ToArray(),
|
|
},
|
|
JsonOptions));
|
|
return 0;
|
|
}
|
|
|
|
output.WriteLine(roots.Count.ToString(CultureInfo.InvariantCulture));
|
|
foreach (BrowseTreeNode node in roots)
|
|
{
|
|
WriteBrowseTreeNode(output, node, level: 0);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/// <summary>
|
|
/// One node in the eagerly-expanded galaxy-browse tree: the Galaxy object,
|
|
/// the server's has-children hint, and any children fetched up to the
|
|
/// requested depth.
|
|
/// </summary>
|
|
private sealed record BrowseTreeNode(
|
|
GalaxyObject Object,
|
|
bool HasChildrenHint,
|
|
IReadOnlyList<BrowseTreeNode> Children);
|
|
|
|
/// <summary>
|
|
/// Fetches the direct children of <paramref name="parentGobjectId"/>
|
|
/// (0 = root) and recursively expands <paramref name="remainingDepth"/>
|
|
/// further levels. Paging is followed to completion at each level.
|
|
/// </summary>
|
|
private static async Task<IReadOnlyList<BrowseTreeNode>> BrowseTreeAsync(
|
|
IMxGatewayCliClient client,
|
|
BrowseChildrenOptions options,
|
|
int parentGobjectId,
|
|
int remainingDepth,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
List<BrowseTreeNode> nodes = [];
|
|
string pageToken = string.Empty;
|
|
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
|
|
do
|
|
{
|
|
BrowseChildrenRequest request = BuildBrowseChildrenRequest(options, parentGobjectId, pageToken);
|
|
BrowseChildrenReply reply = await client.GalaxyBrowseChildrenAsync(request, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
for (int i = 0; i < reply.Children.Count; i++)
|
|
{
|
|
GalaxyObject child = reply.Children[i];
|
|
bool hint = i < reply.ChildHasChildren.Count && reply.ChildHasChildren[i];
|
|
IReadOnlyList<BrowseTreeNode> grandChildren = remainingDepth > 0
|
|
? await BrowseTreeAsync(client, options, child.GobjectId, remainingDepth - 1, cancellationToken)
|
|
.ConfigureAwait(false)
|
|
: [];
|
|
nodes.Add(new BrowseTreeNode(child, hint, grandChildren));
|
|
}
|
|
|
|
pageToken = reply.NextPageToken;
|
|
if (!string.IsNullOrWhiteSpace(pageToken) && !seenPageTokens.Add(pageToken))
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Galaxy BrowseChildren returned a repeated page token '{pageToken}'.");
|
|
}
|
|
}
|
|
while (!string.IsNullOrWhiteSpace(pageToken));
|
|
|
|
return nodes;
|
|
}
|
|
|
|
/// <summary>Fetches exactly one level of children for a parent gobject id, paging to completion.</summary>
|
|
private static async Task<IReadOnlyList<GalaxyObject>> BrowseOneLevelAsync(
|
|
IMxGatewayCliClient client,
|
|
BrowseChildrenOptions options,
|
|
int parentGobjectId,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
List<GalaxyObject> children = [];
|
|
string pageToken = string.Empty;
|
|
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
|
|
do
|
|
{
|
|
BrowseChildrenRequest request = BuildBrowseChildrenRequest(options, parentGobjectId, pageToken);
|
|
BrowseChildrenReply reply = await client.GalaxyBrowseChildrenAsync(request, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
children.AddRange(reply.Children);
|
|
pageToken = reply.NextPageToken;
|
|
if (!string.IsNullOrWhiteSpace(pageToken) && !seenPageTokens.Add(pageToken))
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Galaxy BrowseChildren returned a repeated page token '{pageToken}'.");
|
|
}
|
|
}
|
|
while (!string.IsNullOrWhiteSpace(pageToken));
|
|
|
|
return children;
|
|
}
|
|
|
|
private static BrowseChildrenOptions ParseBrowseChildrenOptions(CliArguments arguments)
|
|
{
|
|
return new BrowseChildrenOptions
|
|
{
|
|
CategoryIds = ParseOptionalInt32List(arguments.GetOptional("category-ids")),
|
|
TemplateChainContains = ParseOptionalStringList(arguments.GetOptional("template-contains")),
|
|
TagNameGlob = arguments.GetOptional("tag-name-glob"),
|
|
AlarmBearingOnly = arguments.HasFlag("alarm-bearing-only"),
|
|
HistorizedOnly = arguments.HasFlag("historized-only"),
|
|
// Tri-state: only override the server default when the flag is present.
|
|
IncludeAttributes = arguments.HasFlag("include-attributes") ? true : null,
|
|
};
|
|
}
|
|
|
|
private static BrowseChildrenRequest BuildBrowseChildrenRequest(
|
|
BrowseChildrenOptions options,
|
|
int parentGobjectId,
|
|
string pageToken)
|
|
{
|
|
BrowseChildrenRequest request = new()
|
|
{
|
|
PageSize = BrowseChildrenCliPageSize,
|
|
PageToken = pageToken,
|
|
ParentGobjectId = parentGobjectId,
|
|
AlarmBearingOnly = options.AlarmBearingOnly,
|
|
HistorizedOnly = options.HistorizedOnly,
|
|
};
|
|
request.CategoryIds.Add(options.CategoryIds);
|
|
request.TemplateChainContains.Add(options.TemplateChainContains);
|
|
if (!string.IsNullOrWhiteSpace(options.TagNameGlob))
|
|
{
|
|
request.TagNameGlob = options.TagNameGlob;
|
|
}
|
|
|
|
if (options.IncludeAttributes.HasValue)
|
|
{
|
|
request.IncludeAttributes = options.IncludeAttributes.Value;
|
|
}
|
|
|
|
return request;
|
|
}
|
|
|
|
private static void WriteBrowseTreeNode(TextWriter output, BrowseTreeNode node, int level)
|
|
{
|
|
output.WriteLine(FormatGalaxyObject(node.Object, level, node.HasChildrenHint));
|
|
foreach (BrowseTreeNode child in node.Children)
|
|
{
|
|
WriteBrowseTreeNode(output, child, level + 1);
|
|
}
|
|
}
|
|
|
|
private static string FormatGalaxyObject(GalaxyObject galaxyObject, int level, bool? hasChildrenHint)
|
|
{
|
|
string indent = new(' ', level * 2);
|
|
string suffix = hasChildrenHint is null
|
|
? $"(attrs={galaxyObject.Attributes.Count})"
|
|
: $"(attrs={galaxyObject.Attributes.Count}, hasChildrenHint={hasChildrenHint.Value})";
|
|
return $"{indent}{galaxyObject.GobjectId}\t{galaxyObject.TagName}\t{galaxyObject.BrowseName}\t{suffix}";
|
|
}
|
|
|
|
private static object BrowseTreeNodeToJson(BrowseTreeNode node)
|
|
{
|
|
return new
|
|
{
|
|
@object = GalaxyObjectToJsonElement(node.Object),
|
|
hasChildrenHint = node.HasChildrenHint,
|
|
children = node.Children.Select(BrowseTreeNodeToJson).ToArray(),
|
|
};
|
|
}
|
|
|
|
private static JsonElement GalaxyObjectToJsonElement(GalaxyObject galaxyObject)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(galaxyObject)).RootElement.Clone();
|
|
}
|
|
|
|
private static IReadOnlyList<int> ParseOptionalInt32List(string? value)
|
|
{
|
|
return string.IsNullOrWhiteSpace(value) ? [] : ParseInt32List(value);
|
|
}
|
|
|
|
private static IReadOnlyList<string> ParseOptionalStringList(string? value)
|
|
{
|
|
return string.IsNullOrWhiteSpace(value) ? [] : ParseStringList(value);
|
|
}
|
|
|
|
private static async Task<int> GalaxyWatchAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
bool json = arguments.HasFlag("json");
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
WatchDeployEventsRequest request = new();
|
|
string? lastSeen = arguments.GetOptional("last-seen-deploy-time");
|
|
if (!string.IsNullOrWhiteSpace(lastSeen))
|
|
{
|
|
DateTimeOffset parsed = DateTimeOffset.Parse(
|
|
lastSeen,
|
|
CultureInfo.InvariantCulture,
|
|
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
|
|
request.LastSeenDeployTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(parsed);
|
|
}
|
|
|
|
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
ConsoleCancelEventHandler handler = (_, eventArgs) =>
|
|
{
|
|
eventArgs.Cancel = true;
|
|
try
|
|
{
|
|
linked.Cancel();
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
}
|
|
};
|
|
Console.CancelKeyPress += handler;
|
|
|
|
uint emitted = 0;
|
|
try
|
|
{
|
|
await foreach (DeployEvent deployEvent in client
|
|
.GalaxyWatchDeployEventsAsync(request, linked.Token)
|
|
.WithCancellation(linked.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (json)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(deployEvent));
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(FormatDeployEvent(deployEvent));
|
|
}
|
|
|
|
emitted++;
|
|
if (maxEvents > 0 && emitted >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (linked.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Ctrl+C-driven cancellation is a clean exit.
|
|
}
|
|
finally
|
|
{
|
|
Console.CancelKeyPress -= handler;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static string FormatDeployEvent(DeployEvent deployEvent)
|
|
{
|
|
string deployTime = deployEvent.TimeOfLastDeployPresent && deployEvent.TimeOfLastDeploy is not null
|
|
? deployEvent.TimeOfLastDeploy
|
|
.ToDateTimeOffset()
|
|
.ToString("O", CultureInfo.InvariantCulture)
|
|
: "<none>";
|
|
string observed = deployEvent.ObservedAt is not null
|
|
? deployEvent.ObservedAt
|
|
.ToDateTimeOffset()
|
|
.ToString("O", CultureInfo.InvariantCulture)
|
|
: "<unknown>";
|
|
|
|
return $"sequence={deployEvent.Sequence} observed_at={observed} time_of_last_deploy={deployTime} objects={deployEvent.ObjectCount} attributes={deployEvent.AttributeCount}";
|
|
}
|
|
|
|
private static int WriteUnknownCommand(string command, TextWriter standardError)
|
|
{
|
|
standardError.WriteLine($"Unknown command '{command}'.");
|
|
WriteUsage(standardError);
|
|
return 2;
|
|
}
|
|
|
|
private static bool IsHelp(string value)
|
|
{
|
|
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|
|
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|
|
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
|
|
}
|
|
|
|
private static bool IsKnownGatewayCommand(string command)
|
|
{
|
|
return command is "open-session"
|
|
or "close-session"
|
|
or "ping"
|
|
or "register"
|
|
or "add-item"
|
|
or "advise"
|
|
or "subscribe-bulk"
|
|
or "unsubscribe-bulk"
|
|
or "read-bulk"
|
|
or "write-bulk"
|
|
or "write2-bulk"
|
|
or "write-secured-bulk"
|
|
or "write-secured2-bulk"
|
|
or "bench-read-bulk"
|
|
or "stream-events"
|
|
or "stream-alarms"
|
|
or "acknowledge-alarm"
|
|
or "write"
|
|
or "write2"
|
|
or "smoke"
|
|
or "galaxy-test-connection"
|
|
or "galaxy-last-deploy"
|
|
or "galaxy-discover"
|
|
or "galaxy-browse"
|
|
or "galaxy-watch";
|
|
}
|
|
|
|
private static IReadOnlyList<string> ParseStringList(string value)
|
|
{
|
|
string[] items = value
|
|
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
|
if (items.Length is 0)
|
|
{
|
|
throw new ArgumentException("At least one item is required.");
|
|
}
|
|
|
|
return items;
|
|
}
|
|
|
|
private static IReadOnlyList<int> ParseInt32List(string value)
|
|
{
|
|
string[] items = value
|
|
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
|
if (items.Length is 0)
|
|
{
|
|
throw new ArgumentException("At least one item handle is required.");
|
|
}
|
|
|
|
return items
|
|
.Select(item => int.Parse(item, CultureInfo.InvariantCulture))
|
|
.ToArray();
|
|
}
|
|
|
|
private static string CreateCorrelationId()
|
|
{
|
|
return Guid.NewGuid().ToString("N");
|
|
}
|
|
|
|
private static void WriteUsage(TextWriter writer)
|
|
{
|
|
writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)");
|
|
writer.WriteLine("mxgw-dotnet version [--json]");
|
|
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
|
|
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
|
|
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
|
|
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
|
|
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
|
|
writer.WriteLine("mxgw-dotnet advise-supervisory --session-id <id> --server-handle <n> --item-handle <n> [--json]");
|
|
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
|
|
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
|
|
writer.WriteLine("mxgw-dotnet read-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--timeout-ms <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] [--user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--timestamp <iso>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>] [--tag-start <n>] [--tag-prefix <s>] [--tag-attribute <s>] [--timeout-ms <n>] [--client-name <name>]");
|
|
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix <ref>] [--max-events <n>] [--json] [--jsonl]");
|
|
writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference <ref> [--comment <text>] [--operator <user>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
|
|
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-test-connection [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-last-deploy [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-discover [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-browse [--parent <gobject-id>] [--depth <n>] [--category-ids <n,n>] [--template-contains <s,s>] [--tag-name-glob <glob>] [--alarm-bearing-only] [--historized-only] [--include-attributes] [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-watch [--last-seen-deploy-time <iso8601>] [--max-events <n>] [--json]");
|
|
}
|
|
}
|