56949c967b
stream-alarms attaches to the gateway's central alarm feed (mirrors stream-events: --max-events cap, --json/--jsonl, --filter-prefix); acknowledge-alarm is a unary session-less ack (--reference required, --comment, --operator). Both wired through IMxGatewayCliClient and the adapter. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2104 lines
83 KiB
C#
2104 lines
83 KiB
C#
using System.Globalization;
|
|
using System.Text.Json;
|
|
using Google.Protobuf;
|
|
using MxGateway.Client;
|
|
using MxGateway.Contracts.Proto;
|
|
using MxGateway.Contracts.Proto.Galaxy;
|
|
|
|
namespace MxGateway.Client.Cli;
|
|
|
|
/// <summary>Command-line interface for the MXAccess Gateway client, supporting session and command operations.</summary>
|
|
public static class MxGatewayClientCli
|
|
{
|
|
private const uint MaxAggregateEvents = 10_000;
|
|
|
|
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
|
|
|
|
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
|
|
|
|
/// <summary>Runs the CLI synchronously with the given arguments, writing output and errors.</summary>
|
|
/// <param name="args">Command-line arguments (command name followed by options).</param>
|
|
/// <param name="standardOutput">TextWriter for command output.</param>
|
|
/// <param name="standardError">TextWriter for error messages.</param>
|
|
public static int Run(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError)
|
|
{
|
|
return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null)
|
|
.GetAwaiter()
|
|
.GetResult();
|
|
}
|
|
|
|
/// <summary>Runs the CLI asynchronously with the given arguments, writing output and errors.</summary>
|
|
/// <param name="args">Command-line arguments (command name followed by options).</param>
|
|
/// <param name="standardOutput">TextWriter for command output.</param>
|
|
/// <param name="standardError">TextWriter for error messages.</param>
|
|
/// <param name="clientFactory">Optional factory to create the gateway client; defaults to MxGatewayClient.Create.</param>
|
|
/// <param name="standardInput">Optional TextReader for batch-mode stdin; defaults to <see cref="Console.In"/>.</param>
|
|
public static Task<int> RunAsync(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null,
|
|
TextReader? standardInput = null)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(args);
|
|
ArgumentNullException.ThrowIfNull(standardOutput);
|
|
ArgumentNullException.ThrowIfNull(standardError);
|
|
|
|
return RunCoreAsync(
|
|
args,
|
|
standardOutput,
|
|
standardError,
|
|
clientFactory ?? CreateDefaultClient,
|
|
standardInput ?? Console.In);
|
|
}
|
|
|
|
private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__";
|
|
|
|
private static async Task<int> RunCoreAsync(
|
|
string[] args,
|
|
TextWriter standardOutput,
|
|
TextWriter standardError,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
|
|
TextReader standardInput,
|
|
bool forceJsonErrors = false)
|
|
{
|
|
if (args.Length is 0 || IsHelp(args[0]))
|
|
{
|
|
WriteUsage(standardOutput);
|
|
return 0;
|
|
}
|
|
|
|
string command = args[0].ToLowerInvariant();
|
|
|
|
if (command is "batch")
|
|
{
|
|
return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false);
|
|
}
|
|
|
|
CliArguments arguments = new(args.Skip(1));
|
|
|
|
try
|
|
{
|
|
if (command is "version")
|
|
{
|
|
WriteVersion(arguments, standardOutput);
|
|
return 0;
|
|
}
|
|
|
|
if (!IsKnownGatewayCommand(command))
|
|
{
|
|
return WriteUnknownCommand(command, standardError);
|
|
}
|
|
|
|
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
|
|
using CancellationTokenSource cancellation = CreateCancellation(arguments, command);
|
|
|
|
return command switch
|
|
{
|
|
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"subscribe-bulk" => await SubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"bench-stream-events" => await BenchStreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-test-connection" => await GalaxyTestConnectionAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-last-deploy" => await GalaxyLastDeployAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-discover" => await GalaxyDiscoverAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
"galaxy-watch" => await GalaxyWatchAsync(arguments, client, standardOutput, cancellation.Token)
|
|
.ConfigureAwait(false),
|
|
_ => WriteUnknownCommand(command, standardError),
|
|
};
|
|
}
|
|
catch (Exception exception) when (exception is not OperationCanceledException)
|
|
{
|
|
// Redact the effective API key — whether it came from --api-key or from
|
|
// the (documented default) --api-key-env environment variable — so a
|
|
// transport error message that echoes the bearer token is never printed.
|
|
string? apiKey = TryResolveApiKey(arguments);
|
|
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
|
|
|
|
if (forceJsonErrors || arguments.HasFlag("json"))
|
|
{
|
|
standardError.WriteLine(JsonSerializer.Serialize(
|
|
new { error = message, type = exception.GetType().Name },
|
|
JsonOptions));
|
|
}
|
|
else
|
|
{
|
|
standardError.WriteLine(message);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
|
|
{
|
|
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
|
|
}
|
|
|
|
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
|
|
{
|
|
string endpoint = arguments.GetOptional("endpoint")
|
|
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
|
|
?? "http://localhost:5000";
|
|
|
|
string apiKey = ResolveApiKey(arguments);
|
|
|
|
return new MxGatewayClientOptions
|
|
{
|
|
Endpoint = new Uri(endpoint, UriKind.Absolute),
|
|
ApiKey = apiKey,
|
|
UseTls = arguments.HasFlag("tls")
|
|
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
|
|
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
|
|
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
|
|
CaCertificatePath = arguments.GetOptional("ca-file"),
|
|
ServerNameOverride = arguments.GetOptional("server-name"),
|
|
};
|
|
}
|
|
|
|
private static string ResolveApiKey(CliArguments arguments)
|
|
{
|
|
string? apiKey = TryResolveApiKey(arguments);
|
|
if (!string.IsNullOrWhiteSpace(apiKey))
|
|
{
|
|
return apiKey;
|
|
}
|
|
|
|
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
|
|
?? "MXGATEWAY_API_KEY";
|
|
|
|
throw new ArgumentException(
|
|
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resolves the effective API key from <c>--api-key</c> or, failing that, the
|
|
/// environment variable named by <c>--api-key-env</c> (default
|
|
/// <c>MXGATEWAY_API_KEY</c>). Returns <see langword="null"/> when no key is
|
|
/// configured; used for redaction where a missing key must not throw.
|
|
/// </summary>
|
|
private static string? TryResolveApiKey(CliArguments arguments)
|
|
{
|
|
string? apiKey = arguments.GetOptional("api-key");
|
|
if (!string.IsNullOrWhiteSpace(apiKey))
|
|
{
|
|
return apiKey;
|
|
}
|
|
|
|
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
|
|
?? "MXGATEWAY_API_KEY";
|
|
|
|
return Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
|
|
}
|
|
|
|
private static CancellationTokenSource CreateCancellation(CliArguments arguments, string command)
|
|
{
|
|
var cancellation = new CancellationTokenSource();
|
|
// Long-running streaming / bench commands run until they finish (or Ctrl+C)
|
|
// by default; a caller-supplied --timeout still applies if present. The
|
|
// bench commands default to --duration-seconds=30 --warmup-seconds=3 plus
|
|
// a per-session stagger, which already exceeds the default 30 s wall-clock
|
|
// budget, so applying that budget would cancel them mid-window and emit a
|
|
// zero-throughput JSON payload (see Client.Dotnet-015).
|
|
bool isLongRunning = command is "galaxy-watch" or "bench-read-bulk" or "bench-stream-events";
|
|
string? rawTimeout = arguments.GetOptional("timeout");
|
|
if (isLongRunning && string.IsNullOrWhiteSpace(rawTimeout))
|
|
{
|
|
return cancellation;
|
|
}
|
|
|
|
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
|
|
cancellation.CancelAfter(timeout);
|
|
return cancellation;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Runs the CLI in batch mode: reads one command line at a time from
|
|
/// <paramref name="standardInput"/>, dispatches it through the normal
|
|
/// routing, writes all output to <paramref name="standardOutput"/>, and
|
|
/// then appends <see cref="BatchEndOfRecord"/> as a sentinel so the
|
|
/// caller can delimit command results. Continues on failure; errors are
|
|
/// written as JSON to <paramref name="standardOutput"/> (not stderr) so
|
|
/// that the harness sees them inside the same delimited block. Exits 0
|
|
/// on EOF or empty line.
|
|
/// </summary>
|
|
private static async Task<int> RunBatchAsync(
|
|
TextWriter standardOutput,
|
|
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
|
|
TextReader standardInput)
|
|
{
|
|
while (true)
|
|
{
|
|
string? line = await standardInput.ReadLineAsync().ConfigureAwait(false);
|
|
|
|
// EOF or empty line signals clean exit.
|
|
if (line is null || line.Length is 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
// Split on runs of ASCII whitespace — no quoting support by design.
|
|
string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
|
|
|
|
// Per-command output is buffered so we can redirect errors to stdout.
|
|
using StringWriter commandOutput = new();
|
|
|
|
// Errors in batch mode go to stdout (same delimited block), formatted as JSON.
|
|
// We use a capturing error writer and re-emit through commandOutput after the
|
|
// command returns, so the EOR sentinel always follows the complete result.
|
|
using StringWriter commandError = new();
|
|
|
|
try
|
|
{
|
|
await RunCoreAsync(
|
|
lineArgs,
|
|
commandOutput,
|
|
commandError,
|
|
clientFactory,
|
|
standardInput,
|
|
forceJsonErrors: true)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception exception) when (exception is not OperationCanceledException)
|
|
{
|
|
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
|
|
commandError.WriteLine(JsonSerializer.Serialize(
|
|
new { error = exception.Message, type = exception.GetType().Name },
|
|
JsonOptions));
|
|
}
|
|
|
|
// Write any buffered normal output first.
|
|
string commandOutputText = commandOutput.ToString();
|
|
if (commandOutputText.Length > 0)
|
|
{
|
|
standardOutput.Write(commandOutputText);
|
|
}
|
|
|
|
// Then any error output — in batch mode it belongs on stdout so the harness
|
|
// sees it inside the delimited record.
|
|
string commandErrorText = commandError.ToString();
|
|
if (commandErrorText.Length > 0)
|
|
{
|
|
standardOutput.Write(commandErrorText);
|
|
}
|
|
|
|
// Write the end-of-record sentinel and flush so the harness can unblock.
|
|
standardOutput.WriteLine(BatchEndOfRecord);
|
|
await standardOutput.FlushAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
private static Task<int> OpenSessionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.OpenSessionAsync(
|
|
new OpenSessionRequest
|
|
{
|
|
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
|
|
},
|
|
cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> CloseSessionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.CloseSessionAsync(
|
|
new CloseSessionRequest
|
|
{
|
|
SessionId = arguments.GetRequired("session-id"),
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> PingAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Ping,
|
|
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> RegisterAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> AddItemAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AddItem,
|
|
AddItem = new AddItemCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemDefinition = arguments.GetRequired("item"),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> AdviseAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Advise,
|
|
Advise = new AdviseCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> SubscribeBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
SubscribeBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
SubscribeBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> UnsubscribeBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
UnsubscribeBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
command.ItemHandles.Add(ParseInt32List(arguments.GetRequired("item-handles")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.UnsubscribeBulk,
|
|
UnsubscribeBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> ReadBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ReadBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0),
|
|
};
|
|
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.ReadBulk,
|
|
ReadBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
int userId = arguments.GetInt32("user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteBulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
UserId = userId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteBulk,
|
|
WriteBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> Write2BulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Write2BulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
|
int userId = arguments.GetInt32("user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new Write2BulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
TimestampValue = timestampValue,
|
|
UserId = userId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write2Bulk,
|
|
Write2Bulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteSecuredBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteSecuredBulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
int currentUserId = arguments.GetInt32("current-user-id");
|
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteSecuredBulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
CurrentUserId = currentUserId,
|
|
VerifierUserId = verifierUserId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteSecuredBulk,
|
|
WriteSecuredBulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> WriteSecured2BulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
WriteSecured2BulkCommand command = new()
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
};
|
|
|
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
|
int currentUserId = arguments.GetInt32("current-user-id");
|
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
|
EnsureSameLength(handles.Count, values.Count);
|
|
|
|
for (int i = 0; i < handles.Count; i++)
|
|
{
|
|
command.Entries.Add(new WriteSecured2BulkEntry
|
|
{
|
|
ItemHandle = handles[i],
|
|
Value = values[i],
|
|
TimestampValue = timestampValue,
|
|
CurrentUserId = currentUserId,
|
|
VerifierUserId = verifierUserId,
|
|
});
|
|
}
|
|
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.WriteSecured2Bulk,
|
|
WriteSecured2Bulk = command,
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cross-language stress benchmark for ReadBulk. Opens its own session,
|
|
/// subscribes to N tags so the worker's MxAccessValueCache populates from
|
|
/// real OnDataChange events, then hammers ReadBulk in a tight in-process
|
|
/// loop with per-call Stopwatch timing. Emits a single JSON object on
|
|
/// stdout that the scripts/bench-read-bulk.ps1 driver collates across
|
|
/// all five language clients.
|
|
/// </summary>
|
|
private static async Task<int> BenchReadBulkAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
|
|
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
|
|
int bulkSize = arguments.GetInt32("bulk-size", 6);
|
|
int tagStart = arguments.GetInt32("tag-start", 1);
|
|
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
|
|
string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
|
|
uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500);
|
|
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench";
|
|
|
|
string[] tags = new string[bulkSize];
|
|
for (int i = 0; i < bulkSize; i++)
|
|
{
|
|
// TestMachine_NNN.<attribute>, three-digit machine numbers matching
|
|
// the existing e2e tag-discovery convention.
|
|
tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}";
|
|
}
|
|
|
|
// Open + register + subscribe-bulk so the cache populates before the
|
|
// measurement window opens.
|
|
OpenSessionReply openReply = await client.OpenSessionAsync(
|
|
new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
string sessionId = openReply.SessionId;
|
|
|
|
try
|
|
{
|
|
MxCommandReply registerReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = clientName },
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
int serverHandle = RequireRegisterServerHandle(registerReply);
|
|
|
|
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
|
|
subscribe.TagAddresses.Add(tags);
|
|
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
SubscribeBulk = subscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
int[] itemHandles = subscribeReply.SubscribeBulk?.Results
|
|
.Where(r => r.WasSuccessful)
|
|
.Select(r => r.ItemHandle)
|
|
.ToArray() ?? [];
|
|
|
|
// Warm-up: drive the same call shape so the JIT / connection
|
|
// pipelines settle before the measurement window opens.
|
|
DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds);
|
|
ReadBulkCommand readBulkCommand = new()
|
|
{
|
|
ServerHandle = serverHandle,
|
|
TimeoutMs = timeoutMs,
|
|
};
|
|
readBulkCommand.TagAddresses.Add(tags);
|
|
MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand };
|
|
|
|
while (DateTime.UtcNow < warmupDeadline)
|
|
{
|
|
_ = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, readBulkMxCommand),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
// Steady state — capture per-call wall latency with a high-res
|
|
// Stopwatch so the resolution is sub-millisecond on modern Windows.
|
|
List<double> latencyMillis = new(capacity: 65536);
|
|
long totalReadResults = 0;
|
|
long cachedReadResults = 0;
|
|
int successfulCalls = 0;
|
|
int failedCalls = 0;
|
|
DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds);
|
|
DateTime steadyStart = DateTime.UtcNow;
|
|
|
|
while (DateTime.UtcNow < steadyDeadline)
|
|
{
|
|
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
|
|
MxCommandReply reply;
|
|
try
|
|
{
|
|
reply = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, readBulkMxCommand),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
sw.Stop();
|
|
}
|
|
catch
|
|
{
|
|
sw.Stop();
|
|
failedCalls++;
|
|
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
|
continue;
|
|
}
|
|
|
|
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
|
if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
|
|
{
|
|
failedCalls++;
|
|
continue;
|
|
}
|
|
|
|
successfulCalls++;
|
|
if (reply.ReadBulk is not null)
|
|
{
|
|
foreach (BulkReadResult r in reply.ReadBulk.Results)
|
|
{
|
|
totalReadResults++;
|
|
if (r.WasCached)
|
|
{
|
|
cachedReadResults++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds;
|
|
|
|
if (itemHandles.Length > 0)
|
|
{
|
|
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle };
|
|
unsubscribe.ItemHandles.Add(itemHandles);
|
|
_ = await client.InvokeAsync(
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.UnsubscribeBulk,
|
|
UnsubscribeBulk = unsubscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
int totalCalls = successfulCalls + failedCalls;
|
|
double callsPerSecond = steadyElapsedSeconds > 0
|
|
? totalCalls / steadyElapsedSeconds
|
|
: 0;
|
|
|
|
object stats = new
|
|
{
|
|
language = "dotnet",
|
|
command = "bench-read-bulk",
|
|
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
|
|
clientName,
|
|
bulkSize,
|
|
durationSeconds,
|
|
warmupSeconds,
|
|
durationMs = (long)(steadyElapsedSeconds * 1000),
|
|
tags,
|
|
totalCalls,
|
|
successfulCalls,
|
|
failedCalls,
|
|
totalReadResults,
|
|
cachedReadResults,
|
|
callsPerSecond = Math.Round(callsPerSecond, 2),
|
|
latencyMs = new
|
|
{
|
|
p50 = Percentile(latencyMillis, 0.50),
|
|
p95 = Percentile(latencyMillis, 0.95),
|
|
p99 = Percentile(latencyMillis, 0.99),
|
|
max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0,
|
|
mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0,
|
|
},
|
|
};
|
|
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
|
|
return 0;
|
|
}
|
|
finally
|
|
{
|
|
try
|
|
{
|
|
await client.CloseSessionAsync(
|
|
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// Closing the session is best-effort — never let it mask a real bench error.
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Single-client event-rate stress benchmark. Opens its own session,
|
|
/// subscribes to <c>--bulk-size</c> tags spanning the dev galaxy's
|
|
/// TestMachine_NNN.* set, then drains the gateway's StreamEvents
|
|
/// server-stream as fast as it can for <c>--duration-seconds</c>.
|
|
/// Tracks events received per second, end-to-end latency from
|
|
/// <c>event.worker_timestamp</c> to client receive time, and any
|
|
/// worker faults emitted by the gateway over the same window.
|
|
/// <para>
|
|
/// The companion <c>--all-attributes</c> flag (default <c>true</c>)
|
|
/// subscribes to all six TestMachine attributes per machine number
|
|
/// so the bench can drive event volume past one-attribute-per-machine
|
|
/// when the dev galaxy's TestChangingInt is the only churning tag.
|
|
/// </para>
|
|
/// </summary>
|
|
private static async Task<int> BenchStreamEventsAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
|
|
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
|
|
int bulkSize = arguments.GetInt32("bulk-size", 60);
|
|
int tagStart = arguments.GetInt32("tag-start", 1);
|
|
int sessionCount = arguments.GetInt32("session-count", 1);
|
|
// Concurrent OpenSession calls all the way up the stack force the
|
|
// gateway to spawn N x86 workers simultaneously; that hits the
|
|
// 30-second worker startup timeout around 6-8 concurrent opens on
|
|
// a dev rig. The stagger spreads the opens out so per-session
|
|
// OpenSession+Register completes inside that budget before the
|
|
// next session starts spawning. The steady-state event window
|
|
// begins only once all sessions are open and subscribed.
|
|
int sessionStartStaggerMs = arguments.GetInt32("session-start-stagger-ms", sessionCount > 1 ? 750 : 0);
|
|
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
|
|
bool allAttributes = !arguments.HasFlag("single-attribute");
|
|
string singleAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
|
|
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench-events";
|
|
|
|
if (sessionCount < 1)
|
|
{
|
|
throw new ArgumentException("--session-count must be >= 1.");
|
|
}
|
|
|
|
// Build the tag set. With --all-attributes (default) we rotate through
|
|
// all six TestMachine attributes to drive more event volume from a
|
|
// smaller machine range; with --single-attribute we use the same
|
|
// attribute on contiguous machine numbers (the original bench-read-
|
|
// bulk shape).
|
|
string[] attributes = allAttributes
|
|
? new[] { "TestChangingInt", "ProtectedValue", "TestBoolArray[]", "TestIntArray[]", "TestDateTimeArray[]", "TestStringArray[]" }
|
|
: new[] { singleAttribute };
|
|
List<string> tags = new(capacity: bulkSize);
|
|
for (int i = 0; i < bulkSize; i++)
|
|
{
|
|
int machineNumber = tagStart + (i / attributes.Length);
|
|
string attribute = attributes[i % attributes.Length];
|
|
tags.Add($"{tagPrefix}{machineNumber:D3}.{attribute}");
|
|
}
|
|
|
|
long warmupEvents = 0;
|
|
long steadyEvents = 0;
|
|
long steadyDataChangeEvents = 0;
|
|
List<double> endToEndLatencyMs = new(capacity: 262_144);
|
|
object latencyLock = new();
|
|
DateTime? firstSteadyEventUtc = null;
|
|
DateTime? lastSteadyEventUtc = null;
|
|
int totalSubscribeFailures = 0;
|
|
int totalSubscribedTagCount = 0;
|
|
int totalDrainedFaultCount = 0;
|
|
DateTime warmupStart = default;
|
|
DateTime warmupEnd = default;
|
|
DateTime steadyEnd = default;
|
|
|
|
// Phase 1: open + subscribe each session sequentially with a stagger,
|
|
// so worker spawn-up doesn't exceed the gateway's per-session startup
|
|
// timeout. Each session stashes its (sessionId, serverHandle, itemHandles)
|
|
// for phase 2.
|
|
var openedSessions = new List<(string SessionId, int ServerHandle, int[] ItemHandles, string ClientName)>(sessionCount);
|
|
for (int sessionIndex = 0; sessionIndex < sessionCount; sessionIndex++)
|
|
{
|
|
if (sessionIndex > 0 && sessionStartStaggerMs > 0)
|
|
{
|
|
await Task.Delay(sessionStartStaggerMs, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
string thisClientName = sessionCount == 1
|
|
? clientName
|
|
: $"{clientName}-{sessionIndex:D2}";
|
|
OpenSessionReply openReply = await client.OpenSessionAsync(
|
|
new OpenSessionRequest { ClientSessionName = thisClientName, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
string sessionId = openReply.SessionId;
|
|
|
|
MxCommandReply registerReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = thisClientName },
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
int serverHandle = RequireRegisterServerHandle(registerReply);
|
|
|
|
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
|
|
subscribe.TagAddresses.Add(tags);
|
|
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeBulk,
|
|
SubscribeBulk = subscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
SubscribeResult[] subscribeResults = subscribeReply.SubscribeBulk?.Results.ToArray() ?? [];
|
|
int[] itemHandles = subscribeResults.Where(r => r.WasSuccessful).Select(r => r.ItemHandle).ToArray();
|
|
totalSubscribedTagCount += itemHandles.Length;
|
|
totalSubscribeFailures += subscribeResults.Count(r => !r.WasSuccessful);
|
|
openedSessions.Add((sessionId, serverHandle, itemHandles, thisClientName));
|
|
}
|
|
|
|
// Phase 2: now every session is open + advised. Start the measurement
|
|
// window and run each session's StreamEvents reader in parallel.
|
|
warmupStart = DateTime.UtcNow;
|
|
warmupEnd = warmupStart + TimeSpan.FromSeconds(warmupSeconds);
|
|
steadyEnd = warmupEnd + TimeSpan.FromSeconds(durationSeconds);
|
|
|
|
async Task RunStreamAsync((string SessionId, int ServerHandle, int[] ItemHandles, string ClientName) ctx)
|
|
{
|
|
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
|
|
Task streamTask = Task.Run(async () =>
|
|
{
|
|
StreamEventsRequest streamRequest = new() { SessionId = ctx.SessionId };
|
|
await foreach (MxEvent mxEvent in client.StreamEventsAsync(streamRequest, streamCts.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
DateTime nowUtc = DateTime.UtcNow;
|
|
if (nowUtc >= steadyEnd)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (nowUtc < warmupEnd)
|
|
{
|
|
Interlocked.Increment(ref warmupEvents);
|
|
continue;
|
|
}
|
|
|
|
// Guarded by latencyLock so parallel sessions can't tear a 64-bit
|
|
// DateTime? read or stomp an already-set firstSteadyEventUtc with
|
|
// a later timestamp from a slower-to-start session. The lock is
|
|
// already held by the latency append a few lines below, so the
|
|
// extra cost is one uncontended lock acquisition per event.
|
|
lock (latencyLock)
|
|
{
|
|
firstSteadyEventUtc ??= nowUtc;
|
|
lastSteadyEventUtc = nowUtc;
|
|
}
|
|
Interlocked.Increment(ref steadyEvents);
|
|
if (mxEvent.Family == MxEventFamily.OnDataChange)
|
|
{
|
|
Interlocked.Increment(ref steadyDataChangeEvents);
|
|
}
|
|
|
|
if (mxEvent.WorkerTimestamp is { } workerStamp)
|
|
{
|
|
double latency = (nowUtc - workerStamp.ToDateTime()).TotalMilliseconds;
|
|
if (latency >= 0)
|
|
{
|
|
lock (latencyLock) { endToEndLatencyMs.Add(latency); }
|
|
}
|
|
}
|
|
}
|
|
}, streamCts.Token);
|
|
|
|
// The inner streamTask MUST be observed on every path — including when
|
|
// the outer cancellationToken cancels during the Task.Delay below — or
|
|
// its fault surfaces as a TaskScheduler.UnobservedTaskException after
|
|
// GC. Use try/finally so the cancel + await pair always runs (see
|
|
// Client.Dotnet-016). RpcException(Cancelled) never reaches here in
|
|
// production because GrpcMxGatewayClientTransport.StreamEventsAsync
|
|
// routes through RpcExceptionMapper.Map, which returns OCE for
|
|
// StatusCode.Cancelled.
|
|
try
|
|
{
|
|
await Task.Delay(steadyEnd - warmupStart, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
streamCts.Cancel();
|
|
try { await streamTask.ConfigureAwait(false); }
|
|
catch (OperationCanceledException) { }
|
|
catch (MxGatewayException) { }
|
|
}
|
|
|
|
try
|
|
{
|
|
MxCommandReply drainReply = await client.InvokeAsync(
|
|
CreateCommandRequest(ctx.SessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.DrainEvents,
|
|
DrainEvents = new DrainEventsCommand { MaxEvents = 16 },
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
Interlocked.Add(ref totalDrainedFaultCount, drainReply.DrainEvents?.Events.Count ?? 0);
|
|
}
|
|
catch { /* fault probe is best-effort */ }
|
|
|
|
if (ctx.ItemHandles.Length > 0)
|
|
{
|
|
try
|
|
{
|
|
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = ctx.ServerHandle };
|
|
unsubscribe.ItemHandles.Add(ctx.ItemHandles);
|
|
_ = await client.InvokeAsync(
|
|
CreateCommandRequest(ctx.SessionId, new MxCommand
|
|
{
|
|
Kind = MxCommandKind.UnsubscribeBulk,
|
|
UnsubscribeBulk = unsubscribe,
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch { /* best-effort */ }
|
|
}
|
|
|
|
try
|
|
{
|
|
await client.CloseSessionAsync(
|
|
new CloseSessionRequest { SessionId = ctx.SessionId, ClientCorrelationId = CreateCorrelationId() },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch { /* best-effort */ }
|
|
}
|
|
|
|
Task[] streamTasks = openedSessions.Select(RunStreamAsync).ToArray();
|
|
await Task.WhenAll(streamTasks).ConfigureAwait(false);
|
|
|
|
double steadyElapsedSeconds = (firstSteadyEventUtc.HasValue && lastSteadyEventUtc.HasValue)
|
|
? (lastSteadyEventUtc.Value - firstSteadyEventUtc.Value).TotalSeconds
|
|
: 0;
|
|
double eventsPerSecond = steadyElapsedSeconds > 0 ? steadyEvents / steadyElapsedSeconds : 0;
|
|
double dataChangeEventsPerSecond = steadyElapsedSeconds > 0
|
|
? steadyDataChangeEvents / steadyElapsedSeconds
|
|
: 0;
|
|
|
|
double[] latencySnapshot;
|
|
lock (latencyLock) { latencySnapshot = endToEndLatencyMs.ToArray(); }
|
|
|
|
object stats = new
|
|
{
|
|
language = "dotnet",
|
|
command = "bench-stream-events",
|
|
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
|
|
clientName,
|
|
sessionCount,
|
|
bulkSize,
|
|
durationSeconds,
|
|
warmupSeconds,
|
|
allAttributes,
|
|
steadyElapsedSeconds = Math.Round(steadyElapsedSeconds, 3),
|
|
subscribedTagCount = totalSubscribedTagCount,
|
|
subscribeFailures = totalSubscribeFailures,
|
|
warmupEvents,
|
|
steadyEvents,
|
|
steadyDataChangeEvents,
|
|
eventsPerSecond = Math.Round(eventsPerSecond, 2),
|
|
dataChangeEventsPerSecond = Math.Round(dataChangeEventsPerSecond, 2),
|
|
drainedFaultCount = totalDrainedFaultCount,
|
|
endToEndLatencyMs = new
|
|
{
|
|
p50 = Percentile(latencySnapshot, 0.50),
|
|
p95 = Percentile(latencySnapshot, 0.95),
|
|
p99 = Percentile(latencySnapshot, 0.99),
|
|
max = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Max(), 3) : 0,
|
|
mean = latencySnapshot.Length > 0 ? Math.Round(latencySnapshot.Average(), 3) : 0,
|
|
sampleCount = latencySnapshot.Length,
|
|
},
|
|
};
|
|
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
|
|
return 0;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Computes the requested percentile from an unsorted latency sample using
|
|
/// nearest-rank with linear interpolation. Rounds to 3 decimal places to
|
|
/// match the JSON schema the PS driver collates.
|
|
/// </summary>
|
|
private static double Percentile(IReadOnlyList<double> sample, double quantile)
|
|
{
|
|
if (sample.Count == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
double[] sorted = sample.ToArray();
|
|
Array.Sort(sorted);
|
|
if (sorted.Length == 1)
|
|
{
|
|
return Math.Round(sorted[0], 3);
|
|
}
|
|
|
|
double rank = quantile * (sorted.Length - 1);
|
|
int lower = (int)Math.Floor(rank);
|
|
int upper = (int)Math.Ceiling(rank);
|
|
double fraction = rank - lower;
|
|
double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction;
|
|
return Math.Round(value, 3);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
|
|
/// the single <c>--type</c> argument; the comma-separated values are
|
|
/// each parsed via <see cref="ParseValue"/> on a per-entry basis. This
|
|
/// keeps the CLI simple for e2e use (one type, N values) — callers
|
|
/// that need heterogeneous types per entry should drive the library
|
|
/// directly.
|
|
/// </summary>
|
|
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
|
|
{
|
|
string type = arguments.GetRequired("type");
|
|
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
|
|
MxValue[] result = new MxValue[values.Length];
|
|
for (int i = 0; i < values.Length; i++)
|
|
{
|
|
result[i] = ParseValue(type, values[i]);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private static void EnsureSameLength(int handles, int values)
|
|
{
|
|
if (handles != values)
|
|
{
|
|
throw new ArgumentException(
|
|
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
|
|
}
|
|
}
|
|
|
|
private static Task<int> WriteAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write,
|
|
Write = new WriteCommand
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static Task<int> Write2Async(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return InvokeAndWriteAsync(
|
|
arguments,
|
|
client,
|
|
output,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write2,
|
|
Write2 = new Write2Command
|
|
{
|
|
ServerHandle = arguments.GetInt32("server-handle"),
|
|
ItemHandle = arguments.GetInt32("item-handle"),
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
TimestampValue = ParseTimestampValue(arguments),
|
|
},
|
|
},
|
|
cancellationToken);
|
|
}
|
|
|
|
private static async Task<int> StreamEventsAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
bool json = arguments.HasFlag("json");
|
|
bool jsonLines = arguments.HasFlag("jsonl");
|
|
if (json && !jsonLines && maxEvents is 0)
|
|
{
|
|
throw new ArgumentException("--json stream-events requires --max-events to bound aggregate output.");
|
|
}
|
|
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
var events = json && !jsonLines
|
|
? new List<MxEvent>(checked((int)maxEvents))
|
|
: [];
|
|
uint eventCount = 0;
|
|
var request = new StreamEventsRequest
|
|
{
|
|
SessionId = arguments.GetRequired("session-id"),
|
|
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
|
|
};
|
|
|
|
try
|
|
{
|
|
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
|
|
.WithCancellation(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (jsonLines)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
|
|
}
|
|
else if (json)
|
|
{
|
|
events.Add(gatewayEvent);
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
|
|
}
|
|
|
|
eventCount++;
|
|
if (maxEvents > 0 && eventCount >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Client.Dotnet-017: the supplied cancellation token covers both the
|
|
// user's --timeout wall-clock budget (via CreateCancellation's
|
|
// CancelAfter) and external Ctrl+C / parent CTS cancellation. All
|
|
// three are graceful completion modes for a finite-window event
|
|
// collector: emit the events that arrived before the window closed
|
|
// and exit 0. The events list is well-formed at this point; the
|
|
// aggregate JSON below still runs. This matches how the Go, Rust,
|
|
// Python, and Java CLIs treat their equivalent timeouts.
|
|
}
|
|
|
|
if (json && !jsonLines)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new { events = events.Select(EventToJsonElement).ToArray() },
|
|
JsonOptions));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> StreamAlarmsAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
bool json = arguments.HasFlag("json");
|
|
bool jsonLines = arguments.HasFlag("jsonl");
|
|
if (json && !jsonLines && maxEvents is 0)
|
|
{
|
|
throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output.");
|
|
}
|
|
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
var messages = json && !jsonLines
|
|
? new List<AlarmFeedMessage>(checked((int)maxEvents))
|
|
: [];
|
|
uint messageCount = 0;
|
|
var request = new StreamAlarmsRequest
|
|
{
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty,
|
|
};
|
|
|
|
try
|
|
{
|
|
await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken)
|
|
.WithCancellation(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (jsonLines)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(feedMessage));
|
|
}
|
|
else if (json)
|
|
{
|
|
messages.Add(feedMessage);
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(FormatAlarmFeedMessage(feedMessage));
|
|
}
|
|
|
|
messageCount++;
|
|
if (maxEvents > 0 && messageCount >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Mirrors stream-events (Client.Dotnet-017): the supplied token covers
|
|
// the user's --timeout wall-clock budget and external Ctrl+C / parent
|
|
// CTS cancellation. All are graceful completion modes for a
|
|
// finite-window alarm-feed collector: emit what arrived and exit 0.
|
|
}
|
|
|
|
if (json && !jsonLines)
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() },
|
|
JsonOptions));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static Task<int> AcknowledgeAlarmAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var request = new AcknowledgeAlarmRequest
|
|
{
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
AlarmFullReference = arguments.GetRequired("reference"),
|
|
Comment = arguments.GetOptional("comment") ?? string.Empty,
|
|
OperatorUser = arguments.GetOptional("operator") ?? string.Empty,
|
|
};
|
|
|
|
return WriteReplyAsync(
|
|
client.AcknowledgeAlarmAsync(request, cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Renders one <see cref="AlarmFeedMessage"/> for the human-readable
|
|
/// (non-JSON) stream-alarms output, distinguishing the <c>payload</c> oneof
|
|
/// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live
|
|
/// transition.
|
|
/// </summary>
|
|
private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage)
|
|
{
|
|
return feedMessage.PayloadCase switch
|
|
{
|
|
AlarmFeedMessage.PayloadOneofCase.ActiveAlarm =>
|
|
$"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}",
|
|
AlarmFeedMessage.PayloadOneofCase.SnapshotComplete =>
|
|
$"snapshot-complete {feedMessage.SnapshotComplete}",
|
|
AlarmFeedMessage.PayloadOneofCase.Transition =>
|
|
$"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}",
|
|
_ => $"unknown-payload {feedMessage.PayloadCase}",
|
|
};
|
|
}
|
|
|
|
private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone();
|
|
}
|
|
|
|
private static async Task<int> SmokeAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
OpenSessionReply? openReply = null;
|
|
CloseSessionReply? closeReply = null;
|
|
var commandReplies = new List<MxCommandReply>();
|
|
var events = new List<MxEvent>();
|
|
|
|
try
|
|
{
|
|
openReply = await client.OpenSessionAsync(
|
|
new OpenSessionRequest
|
|
{
|
|
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
int serverHandle = await InvokeForHandleAsync(
|
|
arguments,
|
|
client,
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Register,
|
|
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
|
|
},
|
|
RequireRegisterServerHandle,
|
|
commandReplies,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
int itemHandle = await InvokeForHandleAsync(
|
|
arguments,
|
|
client,
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AddItem,
|
|
AddItem = new AddItemCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemDefinition = arguments.GetRequired("item"),
|
|
},
|
|
},
|
|
RequireAddItemItemHandle,
|
|
commandReplies,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
commandReplies.Add(await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Advise,
|
|
Advise = new AdviseCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
},
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false));
|
|
|
|
if (arguments.GetOptional("value") is not null)
|
|
{
|
|
commandReplies.Add(await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(
|
|
openReply.SessionId,
|
|
new MxCommand
|
|
{
|
|
Kind = MxCommandKind.Write,
|
|
Write = new WriteCommand
|
|
{
|
|
ServerHandle = serverHandle,
|
|
ItemHandle = itemHandle,
|
|
UserId = arguments.GetInt32("user-id", 0),
|
|
Value = ParseValue(arguments),
|
|
},
|
|
}),
|
|
cancellationToken)
|
|
.ConfigureAwait(false));
|
|
}
|
|
|
|
using CancellationTokenSource streamCancellation = CancellationTokenSource
|
|
.CreateLinkedTokenSource(cancellationToken);
|
|
streamCancellation.CancelAfter(arguments.GetDuration(
|
|
"event-timeout",
|
|
TimeSpan.FromSeconds(2)));
|
|
|
|
try
|
|
{
|
|
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
|
|
new StreamEventsRequest { SessionId = openReply.SessionId },
|
|
streamCancellation.Token)
|
|
.WithCancellation(streamCancellation.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
events.Add(gatewayEvent);
|
|
if (events.Count >= arguments.GetUInt32("max-events", 1))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (openReply is not null)
|
|
{
|
|
closeReply = await client.CloseSessionAsync(
|
|
new CloseSessionRequest
|
|
{
|
|
SessionId = openReply.SessionId,
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
},
|
|
CancellationToken.None)
|
|
.ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> InvokeAndWriteAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
MxCommand command,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(arguments.GetRequired("session-id"), command),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
WriteMessage(arguments, output, reply);
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<int> InvokeForHandleAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
string sessionId,
|
|
MxCommand command,
|
|
Func<MxCommandReply, int> handleSelector,
|
|
List<MxCommandReply> replies,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await InvokeAndEnsureAsync(
|
|
client,
|
|
CreateCommandRequest(sessionId, command),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
replies.Add(reply);
|
|
return handleSelector(reply);
|
|
}
|
|
|
|
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
|
|
IMxGatewayCliClient client,
|
|
MxCommandRequest request,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
|
return reply;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the server handle from a successful <c>register</c> reply, or throws
|
|
/// <see cref="MxGatewayException"/> when the typed <see cref="MxCommandReply.Register"/>
|
|
/// payload is absent. Mirrors the SDK-level <see cref="MxGatewaySession.RegisterAsync"/>
|
|
/// contract: a successful reply without the typed payload is a gateway protocol
|
|
/// error, not a license to fall through to <c>ReturnValue.Int32Value</c> (which is 0
|
|
/// when the reply carries no return value).
|
|
/// </summary>
|
|
private static int RequireRegisterServerHandle(MxCommandReply reply)
|
|
{
|
|
return reply.Register?.ServerHandle
|
|
?? throw CreateMissingPayloadException(reply, "register");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the item handle from a successful <c>add_item</c> reply, or throws
|
|
/// <see cref="MxGatewayException"/> when the typed <see cref="MxCommandReply.AddItem"/>
|
|
/// payload is absent. See <see cref="RequireRegisterServerHandle"/> for the rationale.
|
|
/// </summary>
|
|
private static int RequireAddItemItemHandle(MxCommandReply reply)
|
|
{
|
|
return reply.AddItem?.ItemHandle
|
|
?? throw CreateMissingPayloadException(reply, "add_item");
|
|
}
|
|
|
|
private static MxGatewayException CreateMissingPayloadException(
|
|
MxCommandReply reply,
|
|
string expectedPayload)
|
|
{
|
|
return new MxGatewayException(
|
|
$"Gateway reply for command kind={reply.Kind} reported success but is missing "
|
|
+ $"the required '{expectedPayload}' payload; cannot resolve a handle. "
|
|
+ $"session={reply.SessionId}; correlation={reply.CorrelationId}");
|
|
}
|
|
|
|
private static MxCommandRequest CreateCommandRequest(
|
|
string sessionId,
|
|
MxCommand command)
|
|
{
|
|
return new MxCommandRequest
|
|
{
|
|
SessionId = sessionId,
|
|
ClientCorrelationId = CreateCorrelationId(),
|
|
Command = command,
|
|
};
|
|
}
|
|
|
|
private static async Task<int> WriteReplyAsync<TReply>(
|
|
Task<TReply> replyTask,
|
|
CliArguments arguments,
|
|
TextWriter output)
|
|
where TReply : IMessage
|
|
{
|
|
TReply reply = await replyTask.ConfigureAwait(false);
|
|
WriteMessage(arguments, output, reply);
|
|
return 0;
|
|
}
|
|
|
|
private static void WriteVersion(CliArguments arguments, TextWriter output)
|
|
{
|
|
if (arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
|
|
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
|
|
},
|
|
JsonOptions));
|
|
return;
|
|
}
|
|
|
|
output.WriteLine(
|
|
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
|
|
output.WriteLine(
|
|
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
|
|
}
|
|
|
|
private static void WriteMessage(
|
|
CliArguments arguments,
|
|
TextWriter output,
|
|
IMessage message)
|
|
{
|
|
output.WriteLine(arguments.HasFlag("json")
|
|
? ProtobufJsonFormatter.Format(message)
|
|
: message.ToString());
|
|
}
|
|
|
|
private static void WriteSmokeResult(
|
|
CliArguments arguments,
|
|
TextWriter output,
|
|
OpenSessionReply? openReply,
|
|
CloseSessionReply? closeReply,
|
|
IReadOnlyList<MxCommandReply> commandReplies,
|
|
IReadOnlyList<MxEvent> events)
|
|
{
|
|
if (!arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine($"session-id={openReply?.SessionId}");
|
|
output.WriteLine($"commands={commandReplies.Count}");
|
|
output.WriteLine($"events={events.Count}");
|
|
output.WriteLine($"closed={closeReply is not null}");
|
|
return;
|
|
}
|
|
|
|
output.WriteLine(JsonSerializer.Serialize(
|
|
new
|
|
{
|
|
sessionId = openReply?.SessionId,
|
|
closed = closeReply is not null,
|
|
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
|
|
events = events.Select(EventToJsonElement).ToArray(),
|
|
},
|
|
JsonOptions));
|
|
}
|
|
|
|
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
|
|
}
|
|
|
|
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
|
|
{
|
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
|
|
}
|
|
|
|
private static MxValue ParseValue(CliArguments arguments)
|
|
{
|
|
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
|
|
}
|
|
|
|
private static MxValue ParseValue(string typeName, string value)
|
|
{
|
|
string type = typeName.ToLowerInvariant();
|
|
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
|
|
|
|
return type switch
|
|
{
|
|
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
|
|
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
|
|
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
|
|
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
|
|
"string" => value.ToMxValue(),
|
|
"string-array" => values.ToMxValue(),
|
|
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
|
|
"time-array" or "timestamp-array" => values
|
|
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
|
|
.ToArray()
|
|
.ToMxValue(),
|
|
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
|
|
};
|
|
}
|
|
|
|
private static MxValue ParseTimestampValue(CliArguments arguments)
|
|
{
|
|
string timestamp = arguments.GetOptional("timestamp")
|
|
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
|
|
|
|
return DateTimeOffset.Parse(
|
|
timestamp,
|
|
CultureInfo.InvariantCulture,
|
|
DateTimeStyles.AssumeUniversal)
|
|
.ToMxValue();
|
|
}
|
|
|
|
private static Task<int> GalaxyTestConnectionAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.GalaxyTestConnectionAsync(new TestConnectionRequest(), cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static Task<int> GalaxyLastDeployAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
return WriteReplyAsync(
|
|
client.GalaxyGetLastDeployTimeAsync(new GetLastDeployTimeRequest(), cancellationToken),
|
|
arguments,
|
|
output);
|
|
}
|
|
|
|
private static async Task<int> GalaxyDiscoverAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (arguments.HasFlag("json"))
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(reply));
|
|
return 0;
|
|
}
|
|
|
|
output.WriteLine($"objects={reply.Objects.Count}");
|
|
foreach (GalaxyObject galaxyObject in reply.Objects)
|
|
{
|
|
output.WriteLine($"- gobject_id={galaxyObject.GobjectId} tag_name={galaxyObject.TagName} contained_name={galaxyObject.ContainedName} parent={galaxyObject.ParentGobjectId} attributes={galaxyObject.Attributes.Count}");
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static async Task<DiscoverHierarchyReply> DiscoverAllGalaxyHierarchyAsync(
|
|
IMxGatewayCliClient client,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
DiscoverHierarchyReply aggregate = new();
|
|
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
|
|
string pageToken = string.Empty;
|
|
do
|
|
{
|
|
DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync(
|
|
new DiscoverHierarchyRequest
|
|
{
|
|
PageSize = 5000,
|
|
PageToken = pageToken,
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
aggregate.Objects.Add(page.Objects);
|
|
aggregate.TotalObjectCount = page.TotalObjectCount;
|
|
pageToken = page.NextPageToken;
|
|
if (!string.IsNullOrWhiteSpace(pageToken)
|
|
&& !seenPageTokens.Add(pageToken))
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
|
|
}
|
|
}
|
|
while (!string.IsNullOrWhiteSpace(pageToken));
|
|
|
|
return aggregate;
|
|
}
|
|
|
|
private static async Task<int> GalaxyWatchAsync(
|
|
CliArguments arguments,
|
|
IMxGatewayCliClient client,
|
|
TextWriter output,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
bool json = arguments.HasFlag("json");
|
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
|
if (maxEvents > MaxAggregateEvents)
|
|
{
|
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
|
}
|
|
|
|
WatchDeployEventsRequest request = new();
|
|
string? lastSeen = arguments.GetOptional("last-seen-deploy-time");
|
|
if (!string.IsNullOrWhiteSpace(lastSeen))
|
|
{
|
|
DateTimeOffset parsed = DateTimeOffset.Parse(
|
|
lastSeen,
|
|
CultureInfo.InvariantCulture,
|
|
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
|
|
request.LastSeenDeployTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(parsed);
|
|
}
|
|
|
|
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
ConsoleCancelEventHandler handler = (_, eventArgs) =>
|
|
{
|
|
eventArgs.Cancel = true;
|
|
try
|
|
{
|
|
linked.Cancel();
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
}
|
|
};
|
|
Console.CancelKeyPress += handler;
|
|
|
|
uint emitted = 0;
|
|
try
|
|
{
|
|
await foreach (DeployEvent deployEvent in client
|
|
.GalaxyWatchDeployEventsAsync(request, linked.Token)
|
|
.WithCancellation(linked.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
if (json)
|
|
{
|
|
output.WriteLine(ProtobufJsonFormatter.Format(deployEvent));
|
|
}
|
|
else
|
|
{
|
|
output.WriteLine(FormatDeployEvent(deployEvent));
|
|
}
|
|
|
|
emitted++;
|
|
if (maxEvents > 0 && emitted >= maxEvents)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (linked.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Ctrl+C-driven cancellation is a clean exit.
|
|
}
|
|
finally
|
|
{
|
|
Console.CancelKeyPress -= handler;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static string FormatDeployEvent(DeployEvent deployEvent)
|
|
{
|
|
string deployTime = deployEvent.TimeOfLastDeployPresent && deployEvent.TimeOfLastDeploy is not null
|
|
? deployEvent.TimeOfLastDeploy
|
|
.ToDateTimeOffset()
|
|
.ToString("O", CultureInfo.InvariantCulture)
|
|
: "<none>";
|
|
string observed = deployEvent.ObservedAt is not null
|
|
? deployEvent.ObservedAt
|
|
.ToDateTimeOffset()
|
|
.ToString("O", CultureInfo.InvariantCulture)
|
|
: "<unknown>";
|
|
|
|
return $"sequence={deployEvent.Sequence} observed_at={observed} time_of_last_deploy={deployTime} objects={deployEvent.ObjectCount} attributes={deployEvent.AttributeCount}";
|
|
}
|
|
|
|
private static int WriteUnknownCommand(string command, TextWriter standardError)
|
|
{
|
|
standardError.WriteLine($"Unknown command '{command}'.");
|
|
WriteUsage(standardError);
|
|
return 2;
|
|
}
|
|
|
|
private static bool IsHelp(string value)
|
|
{
|
|
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|
|
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|
|
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
|
|
}
|
|
|
|
private static bool IsKnownGatewayCommand(string command)
|
|
{
|
|
return command is "open-session"
|
|
or "close-session"
|
|
or "ping"
|
|
or "register"
|
|
or "add-item"
|
|
or "advise"
|
|
or "subscribe-bulk"
|
|
or "unsubscribe-bulk"
|
|
or "read-bulk"
|
|
or "write-bulk"
|
|
or "write2-bulk"
|
|
or "write-secured-bulk"
|
|
or "write-secured2-bulk"
|
|
or "bench-read-bulk"
|
|
or "bench-stream-events"
|
|
or "stream-events"
|
|
or "stream-alarms"
|
|
or "acknowledge-alarm"
|
|
or "write"
|
|
or "write2"
|
|
or "smoke"
|
|
or "galaxy-test-connection"
|
|
or "galaxy-last-deploy"
|
|
or "galaxy-discover"
|
|
or "galaxy-watch";
|
|
}
|
|
|
|
private static IReadOnlyList<string> ParseStringList(string value)
|
|
{
|
|
string[] items = value
|
|
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
|
if (items.Length is 0)
|
|
{
|
|
throw new ArgumentException("At least one item is required.");
|
|
}
|
|
|
|
return items;
|
|
}
|
|
|
|
private static IReadOnlyList<int> ParseInt32List(string value)
|
|
{
|
|
string[] items = value
|
|
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
|
if (items.Length is 0)
|
|
{
|
|
throw new ArgumentException("At least one item handle is required.");
|
|
}
|
|
|
|
return items
|
|
.Select(item => int.Parse(item, CultureInfo.InvariantCulture))
|
|
.ToArray();
|
|
}
|
|
|
|
private static string CreateCorrelationId()
|
|
{
|
|
return Guid.NewGuid().ToString("N");
|
|
}
|
|
|
|
private static void WriteUsage(TextWriter writer)
|
|
{
|
|
writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)");
|
|
writer.WriteLine("mxgw-dotnet version [--json]");
|
|
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
|
|
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
|
|
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
|
|
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
|
|
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
|
|
writer.WriteLine("mxgw-dotnet read-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--timeout-ms <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] [--user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] --current-user-id <n> [--verifier-user-id <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
|
|
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
|
|
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix <ref>] [--max-events <n>] [--json] [--jsonl]");
|
|
writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference <ref> [--comment <text>] [--operator <user>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
|
|
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-test-connection [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-last-deploy [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-discover [--json]");
|
|
writer.WriteLine("mxgw-dotnet galaxy-watch [--last-seen-deploy-time <iso8601>] [--max-events <n>] [--json]");
|
|
}
|
|
}
|