Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty f861a8b3b8 Issue #45: scaffold Python package 2026-04-26 20:22:35 -04:00
dohertj2 8d312a6d2e Merge pull request #93 from agent-1/issue-40-implement-dotnet-values-status-errors-and-cli
Issue #40: implement .NET values status errors and CLI
2026-04-26 20:22:58 -04:00
Joseph Doherty 499708b2a2 Issue #40: implement .NET values status errors and CLI 2026-04-26 20:17:02 -04:00
dohertj2 191b724f95 Merge pull request #92 from agent-3/issue-42-implement-go-client-session-values-errors-and-cli
Issue #42: implement Go client session values errors and CLI
2026-04-26 20:14:56 -04:00
40 changed files with 2852 additions and 24 deletions
@@ -0,0 +1,117 @@
using System.Globalization;
namespace MxGateway.Client.Cli;
internal sealed class CliArguments
{
private readonly Dictionary<string, string> _values = new(StringComparer.OrdinalIgnoreCase);
private readonly HashSet<string> _flags = new(StringComparer.OrdinalIgnoreCase);
public CliArguments(IEnumerable<string> args)
{
string? pendingName = null;
foreach (string arg in args)
{
if (arg.StartsWith("--", StringComparison.Ordinal))
{
if (pendingName is not null)
{
_flags.Add(pendingName);
}
pendingName = arg[2..];
continue;
}
if (pendingName is null)
{
throw new ArgumentException($"Unexpected argument '{arg}'.");
}
_values[pendingName] = arg;
pendingName = null;
}
if (pendingName is not null)
{
_flags.Add(pendingName);
}
}
public bool HasFlag(string name)
{
return _flags.Contains(name);
}
public string? GetOptional(string name)
{
return _values.TryGetValue(name, out string? value)
? value
: null;
}
public string GetRequired(string name)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"Missing required option --{name}.");
}
return value;
}
public int GetInt32(string name, int? defaultValue = null)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
if (defaultValue.HasValue)
{
return defaultValue.Value;
}
throw new ArgumentException($"Missing required option --{name}.");
}
return int.Parse(value, CultureInfo.InvariantCulture);
}
public uint GetUInt32(string name, uint defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: uint.Parse(value, CultureInfo.InvariantCulture);
}
public ulong GetUInt64(string name, ulong defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: ulong.Parse(value, CultureInfo.InvariantCulture);
}
public TimeSpan GetDuration(string name, TimeSpan defaultValue)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
return defaultValue;
}
if (value.EndsWith("ms", StringComparison.OrdinalIgnoreCase))
{
return TimeSpan.FromMilliseconds(double.Parse(value[..^2], CultureInfo.InvariantCulture));
}
if (value.EndsWith('s'))
{
return TimeSpan.FromSeconds(double.Parse(value[..^1], CultureInfo.InvariantCulture));
}
return TimeSpan.Parse(value, CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
public interface IMxGatewayCliClient : IAsyncDisposable
{
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,40 @@
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
internal sealed class MxGatewayCliClientAdapter(MxGatewayClient client) : IMxGatewayCliClient
{
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return client.OpenSessionRawAsync(request, cancellationToken);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
return client.CloseSessionRawAsync(request, cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
return client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken)
{
return client.StreamEventsAsync(request, cancellationToken);
}
public ValueTask DisposeAsync()
{
return client.DisposeAsync();
}
}
@@ -0,0 +1,14 @@
namespace MxGateway.Client.Cli;
internal static class MxGatewayCliSecretRedactor
{
public static string Redact(string value, string? apiKey)
{
if (string.IsNullOrEmpty(value) || string.IsNullOrEmpty(apiKey))
{
return value;
}
return value.Replace(apiKey, "[redacted]", StringComparison.Ordinal);
}
}
@@ -1,34 +1,702 @@
using System.Globalization;
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client; using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli; namespace MxGateway.Client.Cli;
public static class MxGatewayClientCli public static class MxGatewayClientCli
{ {
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
public static int Run( public static int Run(
string[] args, string[] args,
TextWriter standardOutput, TextWriter standardOutput,
TextWriter standardError) TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
.GetAwaiter()
.GetResult();
}
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
{ {
ArgumentNullException.ThrowIfNull(args); ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput); ArgumentNullException.ThrowIfNull(standardOutput);
ArgumentNullException.ThrowIfNull(standardError); ArgumentNullException.ThrowIfNull(standardError);
return RunCoreAsync(
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
}
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
{
if (args.Length is 0 || IsHelp(args[0])) if (args.Length is 0 || IsHelp(args[0]))
{ {
WriteUsage(standardOutput); WriteUsage(standardOutput);
return 0; return 0;
} }
if (string.Equals(args[0], "version", StringComparison.OrdinalIgnoreCase)) string command = args[0].ToLowerInvariant();
CliArguments arguments = new(args.Skip(1));
try
{ {
standardOutput.WriteLine( if (command is "version")
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}"); {
standardOutput.WriteLine( WriteVersion(arguments, standardOutput);
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}"); return 0;
return 0; }
if (!IsKnownGatewayCommand(command))
{
return WriteUnknownCommand(command, standardError);
}
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
using CancellationTokenSource cancellation = CreateCancellation(arguments);
return command switch
{
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
_ => WriteUnknownCommand(command, standardError),
};
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
string? apiKey = arguments.GetOptional("api-key");
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
JsonOptions));
}
else
{
standardError.WriteLine(message);
}
return 1;
}
}
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
{
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
}
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
{
string endpoint = arguments.GetOptional("endpoint")
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
?? "http://localhost:5000";
string apiKey = ResolveApiKey(arguments);
return new MxGatewayClientOptions
{
Endpoint = new Uri(endpoint, UriKind.Absolute),
ApiKey = apiKey,
UseTls = arguments.HasFlag("tls")
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
CaCertificatePath = arguments.GetOptional("ca-file"),
ServerNameOverride = arguments.GetOptional("server-name"),
};
}
private static string ResolveApiKey(CliArguments arguments)
{
string? apiKey = arguments.GetOptional("api-key");
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
} }
standardError.WriteLine($"Unknown command '{args[0]}'."); string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
?? "MXGATEWAY_API_KEY";
apiKey = Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
throw new ArgumentException(
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
}
private static CancellationTokenSource CreateCancellation(CliArguments arguments)
{
var cancellation = new CancellationTokenSource();
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
cancellation.CancelAfter(timeout);
return cancellation;
}
private static Task<int> OpenSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
ClientCorrelationId = CreateCorrelationId(),
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
},
cancellationToken),
arguments,
output);
}
private static Task<int> CloseSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = arguments.GetRequired("session-id"),
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken),
arguments,
output);
}
private static Task<int> PingAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
},
cancellationToken);
}
private static Task<int> RegisterAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
},
cancellationToken);
}
private static Task<int> AddItemAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemDefinition = arguments.GetRequired("item"),
},
},
cancellationToken);
}
private static Task<int> AdviseAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
},
},
cancellationToken);
}
private static Task<int> WriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
},
cancellationToken);
}
private static Task<int> Write2Async(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
TimestampValue = ParseTimestampValue(arguments),
},
},
cancellationToken);
}
private static async Task<int> StreamEventsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
var events = new List<MxEvent>();
uint maxEvents = arguments.GetUInt32("max-events", 0);
uint eventCount = 0;
var request = new StreamEventsRequest
{
SessionId = arguments.GetRequired("session-id"),
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
};
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (arguments.HasFlag("json"))
{
events.Add(gatewayEvent);
}
else
{
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
}
eventCount++;
if (maxEvents > 0 && eventCount >= maxEvents)
{
break;
}
}
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new { events = events.Select(EventToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
OpenSessionReply? openReply = null;
CloseSessionReply? closeReply = null;
var commandReplies = new List<MxCommandReply>();
var events = new List<MxEvent>();
try
{
openReply = await client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken)
.ConfigureAwait(false);
int serverHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
},
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
int itemHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = arguments.GetRequired("item"),
},
},
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}),
cancellationToken)
.ConfigureAwait(false));
if (arguments.GetOptional("value") is not null)
{
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
}),
cancellationToken)
.ConfigureAwait(false));
}
using CancellationTokenSource streamCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
streamCancellation.CancelAfter(arguments.GetDuration(
"event-timeout",
TimeSpan.FromSeconds(2)));
try
{
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
new StreamEventsRequest { SessionId = openReply.SessionId },
streamCancellation.Token)
.WithCancellation(streamCancellation.Token)
.ConfigureAwait(false))
{
events.Add(gatewayEvent);
if (events.Count >= arguments.GetUInt32("max-events", 1))
{
break;
}
}
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
}
}
finally
{
if (openReply is not null)
{
closeReply = await client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = openReply.SessionId,
ClientCorrelationId = CreateCorrelationId(),
},
CancellationToken.None)
.ConfigureAwait(false);
}
}
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
return 0;
}
private static async Task<int> InvokeAndWriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
MxCommand command,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(arguments.GetRequired("session-id"), command),
cancellationToken)
.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static async Task<int> InvokeForHandleAsync(
CliArguments arguments,
IMxGatewayCliClient client,
string sessionId,
MxCommand command,
Func<MxCommandReply, int> handleSelector,
List<MxCommandReply> replies,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, command),
cancellationToken)
.ConfigureAwait(false);
replies.Add(reply);
return handleSelector(reply);
}
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
IMxGatewayCliClient client,
MxCommandRequest request,
CancellationToken cancellationToken)
{
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply;
}
private static MxCommandRequest CreateCommandRequest(
string sessionId,
MxCommand command)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = CreateCorrelationId(),
Command = command,
};
}
private static async Task<int> WriteReplyAsync<TReply>(
Task<TReply> replyTask,
CliArguments arguments,
TextWriter output)
where TReply : IMessage
{
TReply reply = await replyTask.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static void WriteVersion(CliArguments arguments, TextWriter output)
{
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new
{
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
},
JsonOptions));
return;
}
output.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
output.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
}
private static void WriteMessage(
CliArguments arguments,
TextWriter output,
IMessage message)
{
output.WriteLine(arguments.HasFlag("json")
? ProtobufJsonFormatter.Format(message)
: message.ToString());
}
private static void WriteSmokeResult(
CliArguments arguments,
TextWriter output,
OpenSessionReply? openReply,
CloseSessionReply? closeReply,
IReadOnlyList<MxCommandReply> commandReplies,
IReadOnlyList<MxEvent> events)
{
if (!arguments.HasFlag("json"))
{
output.WriteLine($"session-id={openReply?.SessionId}");
output.WriteLine($"commands={commandReplies.Count}");
output.WriteLine($"events={events.Count}");
output.WriteLine($"closed={closeReply is not null}");
return;
}
output.WriteLine(JsonSerializer.Serialize(
new
{
sessionId = openReply?.SessionId,
closed = closeReply is not null,
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
events = events.Select(EventToJsonElement).ToArray(),
},
JsonOptions));
}
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
}
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
}
private static MxValue ParseValue(CliArguments arguments)
{
string type = arguments.GetRequired("type").ToLowerInvariant();
string value = arguments.GetRequired("value");
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
return type switch
{
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"string" => value.ToMxValue(),
"string-array" => values.ToMxValue(),
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
"time-array" or "timestamp-array" => values
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
.ToArray()
.ToMxValue(),
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
};
}
private static MxValue ParseTimestampValue(CliArguments arguments)
{
string timestamp = arguments.GetOptional("timestamp")
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
return DateTimeOffset.Parse(
timestamp,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal)
.ToMxValue();
}
private static int WriteUnknownCommand(string command, TextWriter standardError)
{
standardError.WriteLine($"Unknown command '{command}'.");
WriteUsage(standardError); WriteUsage(standardError);
return 2; return 2;
} }
@@ -40,9 +708,37 @@ public static class MxGatewayClientCli
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase); || string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
} }
private static bool IsKnownGatewayCommand(string command)
{
return command is "open-session"
or "close-session"
or "ping"
or "register"
or "add-item"
or "advise"
or "stream-events"
or "write"
or "write2"
or "smoke";
}
private static string CreateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
private static void WriteUsage(TextWriter writer) private static void WriteUsage(TextWriter writer)
{ {
writer.WriteLine("mxgw-dotnet version"); writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet --help"); writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
} }
} }
@@ -1,3 +1,3 @@
using MxGateway.Client.Cli; using MxGateway.Client.Cli;
return MxGatewayClientCli.Run(args, Console.Out, Console.Error); return await MxGatewayClientCli.RunAsync(args, Console.Out, Console.Error);
@@ -0,0 +1,78 @@
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxCommandReplyExtensionsTests
{
[Fact]
public void EnsureSuccess_WithRegisterFixture_ReturnsReply()
{
MxCommandReply reply = ReadReplyFixture("register.ok.reply.json");
Assert.Same(reply, reply.EnsureProtocolSuccess());
Assert.Same(reply, reply.EnsureMxAccessSuccess());
}
[Fact]
public void EnsureMxAccessSuccess_WithFailureFixture_PreservesHResultAndStatuses()
{
MxCommandReply reply = ReadReplyFixture("write.mxaccess-failure.reply.json");
reply.EnsureProtocolSuccess();
MxAccessException exception = Assert.Throws<MxAccessException>(
reply.EnsureMxAccessSuccess);
Assert.Equal(-2147220992, exception.HResultCode);
Assert.Equal(reply.Statuses.Count, exception.Statuses.Count);
Assert.Equal(reply, exception.Reply);
Assert.Contains("0x80040200", exception.Message);
}
[Fact]
public void EnsureProtocolSuccess_WithSessionFailure_ThrowsSessionException()
{
MxCommandReply reply = new()
{
SessionId = "session-missing",
CorrelationId = "correlation",
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotFound,
Message = "Session was not found.",
},
};
MxGatewaySessionException exception = Assert.Throws<MxGatewaySessionException>(
reply.EnsureProtocolSuccess);
Assert.Equal("session-missing", exception.SessionId);
Assert.Equal(ProtocolStatusCode.SessionNotFound, exception.ProtocolStatus?.Code);
}
private static MxCommandReply ReadReplyFixture(string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
"command-replies",
fileName);
if (File.Exists(path))
{
return JsonParser.Default.Parse<MxCommandReply>(File.ReadAllText(path));
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -1,4 +1,5 @@
using MxGateway.Client.Cli; using MxGateway.Client.Cli;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests; namespace MxGateway.Client.Tests;
@@ -17,4 +18,224 @@ public sealed class MxGatewayClientCliTests
Assert.Contains("worker-protocol=1", output.ToString()); Assert.Contains("worker-protocol=1", output.ToString());
Assert.Equal(string.Empty, error.ToString()); Assert.Equal(string.Empty, error.ToString());
} }
[Fact]
public async Task RunAsync_VersionJson_PrintsJsonProtocolVersions()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(["version", "--json"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("\"gatewayProtocolVersion\":1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_Write_BuildsWriteCommandAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.InvokeReplies.Enqueue(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"write",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"int32",
"--value",
"123",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
MxCommandRequest request = Assert.Single(fakeClient.InvokeRequests);
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(123, request.Command.Write.Value.Int32Value);
Assert.Contains("MX_COMMAND_KIND_WRITE", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_ErrorOutput_RedactsApiKey()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(
[
"open-session",
"--endpoint",
"http://localhost:5000",
"--api-key",
"secret-api-key",
],
output,
error,
_ => throw new InvalidOperationException("boom secret-api-key"));
Assert.Equal(1, exitCode);
Assert.DoesNotContain("secret-api-key", error.ToString());
Assert.Contains("[redacted]", error.ToString());
}
[Fact]
public async Task RunAsync_StreamEvents_WithMaxEventsStopsNonJsonOutput()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-events",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
Assert.Contains("workerSequence", output.ToString());
Assert.DoesNotContain("ON_WRITE_COMPLETE", output.ToString());
}
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new()
{
InvokeFailure = new InvalidOperationException("register failed"),
};
int exitCode = await MxGatewayClientCli.RunAsync(
[
"smoke",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--item",
"Area001.Pump001.Speed",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(1, exitCode);
CloseSessionRequest closeRequest = Assert.Single(fakeClient.CloseSessionRequests);
Assert.Equal("session-fixture", closeRequest.SessionId);
}
private sealed class FakeCliClient : IMxGatewayCliClient
{
public Queue<MxCommandReply> InvokeReplies { get; } = new();
public List<MxCommandRequest> InvokeRequests { get; } = [];
public List<CloseSessionRequest> CloseSessionRequests { get; } = [];
public List<MxEvent> Events { get; } = [];
public Exception? InvokeFailure { get; init; }
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return Task.FromResult(new OpenSessionReply
{
SessionId = "session-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
});
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
CloseSessionRequests.Add(request);
return Task.FromResult(new CloseSessionReply
{
SessionId = request.SessionId,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
FinalState = SessionState.Closed,
});
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
InvokeRequests.Add(request);
if (InvokeFailure is not null)
{
throw InvokeFailure;
}
return Task.FromResult(InvokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (MxEvent gatewayEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
}
} }
@@ -110,6 +110,33 @@ public sealed class MxGatewayClientSessionTests
Assert.Equal(56, request.Command.Write.UserId); Assert.Equal(56, request.Command.Write.UserId);
} }
[Fact]
public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = 123.ToMxValue();
MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue();
MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56);
Assert.Equal(MxCommandKind.Write2, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write2, request.Command.Kind);
Assert.Equal(12, request.Command.Write2.ServerHandle);
Assert.Equal(34, request.Command.Write2.ItemHandle);
Assert.Same(value, request.Command.Write2.Value);
Assert.Same(timestampValue, request.Command.Write2.TimestampValue);
Assert.Equal(56, request.Command.Write2.UserId);
}
[Fact] [Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{ {
@@ -0,0 +1,57 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxStatusProxyExtensionsTests
{
[Fact]
public void FixtureStatuses_ProjectSuccessAndPreserveRawFields()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"statuses",
"status-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
MxStatusProxy status = JsonParser.Default.Parse<MxStatusProxy>(
testCase.GetProperty("status").GetRawText());
int success = testCase.GetProperty("status").GetProperty("success").GetInt32();
Assert.Equal(success != 0 && status.Category is MxStatusCategory.Ok, status.IsSuccess());
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawCategory").GetInt32(),
status.RawCategory);
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawDetectedBy").GetInt32(),
status.RawDetectedBy);
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -0,0 +1,79 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxValueExtensionsTests
{
[Fact]
public void ToMxValue_WithScalarValues_CreatesTypedProtobufValues()
{
Assert.Equal(MxValue.KindOneofCase.BoolValue, true.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int32Value, 123.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int64Value, 123L.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.FloatValue, 1.25F.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.DoubleValue, 2.5D.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.StringValue, "alpha".ToMxValue().KindCase);
}
[Fact]
public void ToMxValue_WithArrays_CreatesTypedArrayProtobufValues()
{
MxValue value = new[] { "alpha", "beta" }.ToMxValue();
Assert.Equal(MxValue.KindOneofCase.ArrayValue, value.KindCase);
Assert.Equal(MxArray.ValuesOneofCase.StringValues, value.ArrayValue.ValuesCase);
Assert.Equal(["alpha", "beta"], value.ArrayValue.StringValues.Values);
Assert.Equal([2U], value.ArrayValue.Dimensions);
}
[Fact]
public void FixtureValues_ProjectExpectedKindsAndPreserveRawMetadata()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"values",
"value-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
string expectedKind = testCase.GetProperty("expectedKind").GetString()!;
MxValue value = JsonParser.Default.Parse<MxValue>(
testCase.GetProperty("value").GetRawText());
Assert.Equal(expectedKind, value.GetProjectionKind());
if (testCase.GetProperty("id").GetString() is "raw-fallback.variant")
{
Assert.Equal(32767, value.RawDataType);
Assert.Equal([1, 2, 3, 4, 5], Assert.IsType<byte[]>(value.ToClrValue()));
}
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -17,27 +17,48 @@ internal sealed class GrpcMxGatewayClientTransport(
OpenSessionRequest request, OpenSessionRequest request,
CallOptions callOptions) CallOptions callOptions)
{ {
return await RawClient.OpenSessionAsync(request, callOptions) try
.ResponseAsync {
.ConfigureAwait(false); return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
} }
public async Task<CloseSessionReply> CloseSessionAsync( public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request, CloseSessionRequest request,
CallOptions callOptions) CallOptions callOptions)
{ {
return await RawClient.CloseSessionAsync(request, callOptions) try
.ResponseAsync {
.ConfigureAwait(false); return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
} }
public async Task<MxCommandReply> InvokeAsync( public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request, MxCommandRequest request,
CallOptions callOptions) CallOptions callOptions)
{ {
return await RawClient.InvokeAsync(request, callOptions) try
.ResponseAsync {
.ConfigureAwait(false); return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
} }
public async IAsyncEnumerable<MxEvent> StreamEventsAsync( public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
@@ -51,10 +72,24 @@ internal sealed class GrpcMxGatewayClientTransport(
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions); using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
await foreach (MxEvent gatewayEvent in call.ResponseStream IAsyncStreamReader<MxEvent> responseStream = call.ResponseStream;
.ReadAllAsync(effectiveCancellationToken) while (true)
.ConfigureAwait(false))
{ {
MxEvent? gatewayEvent;
try
{
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
{
break;
}
gatewayEvent = responseStream.Current;
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
yield return gatewayEvent; yield return gatewayEvent;
} }
} }
@@ -65,4 +100,18 @@ internal sealed class GrpcMxGatewayClientTransport(
{ {
return StreamEventsAsync(request, callOptions); return StreamEventsAsync(request, callOptions);
} }
private static MxGatewayException MapRpcException(RpcException exception)
{
return exception.StatusCode switch
{
StatusCode.Unauthenticated => new MxGatewayAuthenticationException(
exception.Status.Detail,
innerException: exception),
StatusCode.PermissionDenied => new MxGatewayAuthorizationException(
exception.Status.Detail,
innerException: exception),
_ => new MxGatewayException(exception.Status.Detail, exception),
};
}
} }
@@ -0,0 +1,24 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxAccessException : MxGatewayCommandException
{
public MxAccessException(
string message,
MxCommandReply reply,
Exception? innerException = null)
: base(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
reply.HasHresult ? reply.Hresult : null,
reply.Statuses.ToArray(),
innerException)
{
Reply = reply;
}
public MxCommandReply Reply { get; }
}
@@ -0,0 +1,96 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxCommandReplyExtensions
{
public static MxCommandReply EnsureProtocolSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
ProtocolStatusCode code = reply.ProtocolStatus?.Code
?? ProtocolStatusCode.Unspecified;
if (code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure)
{
return reply;
}
throw CreateProtocolException(reply, code);
}
public static MxCommandReply EnsureMxAccessSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
bool mxAccessFailure = reply.ProtocolStatus?.Code is ProtocolStatusCode.MxaccessFailure;
bool hResultFailure = reply.HasHresult && reply.Hresult != 0;
bool statusFailure = reply.Statuses.Any(status => !status.IsSuccess());
if (!mxAccessFailure && !hResultFailure && !statusFailure)
{
return reply;
}
throw new MxAccessException(CreateMxAccessMessage(reply), reply);
}
private static MxGatewayException CreateProtocolException(
MxCommandReply reply,
ProtocolStatusCode code)
{
string message = CreateProtocolMessage(reply);
int? hResult = reply.HasHresult ? reply.Hresult : null;
MxStatusProxy[] statuses = reply.Statuses.ToArray();
return code switch
{
ProtocolStatusCode.SessionNotFound or ProtocolStatusCode.SessionNotReady
=> new MxGatewaySessionException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
ProtocolStatusCode.WorkerUnavailable
=> new MxGatewayWorkerException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
_
=> new MxGatewayCommandException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
};
}
private static string CreateProtocolMessage(MxCommandReply reply)
{
string statusMessage = string.IsNullOrWhiteSpace(reply.ProtocolStatus?.Message)
? "Gateway protocol failure."
: reply.ProtocolStatus.Message;
return $"{statusMessage} code={reply.ProtocolStatus?.Code}; session={reply.SessionId}; correlation={reply.CorrelationId}";
}
private static string CreateMxAccessMessage(MxCommandReply reply)
{
string statusSummary = reply.Statuses.Count is 0
? "no MXSTATUS_PROXY entries"
: string.Join("; ", reply.Statuses.Select(status => status.ToDiagnosticSummary()));
string hResult = reply.HasHresult
? $"0x{reply.Hresult:X8}"
: "none";
return $"MXAccess command failed. kind={reply.Kind}; hresult={hResult}; statuses={statusSummary}";
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthenticationException : MxGatewayException
{
public MxGatewayAuthenticationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthorizationException : MxGatewayException
{
public MxGatewayAuthorizationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayCommandException : MxGatewayException
{
public MxGatewayCommandException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,45 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayException : Exception
{
public MxGatewayException(string message)
: base(message)
{
Statuses = [];
}
public MxGatewayException(string message, Exception? innerException)
: base(message, innerException)
{
Statuses = [];
}
public MxGatewayException(
string message,
string? sessionId,
string? correlationId,
ProtocolStatus? protocolStatus,
int? hResult,
IReadOnlyList<MxStatusProxy> statuses,
Exception? innerException = null)
: base(message, innerException)
{
SessionId = sessionId;
CorrelationId = correlationId;
ProtocolStatus = protocolStatus;
HResultCode = hResult;
Statuses = statuses;
}
public string? SessionId { get; }
public string? CorrelationId { get; }
public ProtocolStatus? ProtocolStatus { get; }
public int? HResultCode { get; }
public IReadOnlyList<MxStatusProxy> Statuses { get; }
}
@@ -56,6 +56,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
{ {
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken) MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value; return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
} }
@@ -84,6 +85,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
itemDefinition, itemDefinition,
cancellationToken) cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value; return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
} }
@@ -119,6 +121,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
itemContext, itemContext,
cancellationToken) cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value; return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
} }
@@ -149,8 +152,9 @@ public sealed class MxGatewaySession : IAsyncDisposable
int itemHandle, int itemHandle,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken) MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
} }
public Task<MxCommandReply> AdviseRawAsync( public Task<MxCommandReply> AdviseRawAsync(
@@ -178,8 +182,9 @@ public sealed class MxGatewaySession : IAsyncDisposable
int userId, int userId,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken) MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
} }
public Task<MxCommandReply> WriteRawAsync( public Task<MxCommandReply> WriteRawAsync(
@@ -206,6 +211,52 @@ public sealed class MxGatewaySession : IAsyncDisposable
cancellationToken); cancellationToken);
} }
public async Task Write2Async(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await Write2RawAsync(
serverHandle,
itemHandle,
value,
timestampValue,
userId,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> Write2RawAsync(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
ArgumentNullException.ThrowIfNull(timestampValue);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
TimestampValue = timestampValue,
UserId = userId,
},
},
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync( public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request, MxCommandRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewaySessionException : MxGatewayException
{
public MxGatewaySessionException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayWorkerException : MxGatewayException
{
public MxGatewayWorkerException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxStatusProxyExtensions
{
public static bool IsSuccess(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
return status.Success != 0
&& status.Category is MxStatusCategory.Ok;
}
public static string ToDiagnosticSummary(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
string diagnosticText = string.IsNullOrWhiteSpace(status.DiagnosticText)
? "no diagnostic text"
: status.DiagnosticText;
return $"{status.Category} by {status.DetectedBy}; detail={status.Detail}; {diagnosticText}";
}
}
@@ -0,0 +1,284 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Creates and projects gateway MXAccess values without hiding the raw
/// protobuf value carried by command replies and events.
/// </summary>
public static class MxValueExtensions
{
public static MxValue ToMxValue(this bool value)
{
return new MxValue
{
DataType = MxDataType.Boolean,
VariantType = "VT_BOOL",
BoolValue = value,
};
}
public static MxValue ToMxValue(this int value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = value,
};
}
public static MxValue ToMxValue(this long value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I8",
Int64Value = value,
};
}
public static MxValue ToMxValue(this float value)
{
return new MxValue
{
DataType = MxDataType.Float,
VariantType = "VT_R4",
FloatValue = value,
};
}
public static MxValue ToMxValue(this double value)
{
return new MxValue
{
DataType = MxDataType.Double,
VariantType = "VT_R8",
DoubleValue = value,
};
}
public static MxValue ToMxValue(this string value)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.String,
VariantType = "VT_BSTR",
StringValue = value,
};
}
public static MxValue ToMxValue(this DateTimeOffset value)
{
return new MxValue
{
DataType = MxDataType.Time,
VariantType = "VT_DATE",
TimestampValue = Timestamp.FromDateTimeOffset(value),
};
}
public static MxValue ToMxValue(this DateTime value)
{
return new DateTimeOffset(
value.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(value, DateTimeKind.Utc)
: value.ToUniversalTime())
.ToMxValue();
}
public static MxValue ToMxValue(this IReadOnlyList<bool> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new BoolArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Boolean, "VT_ARRAY|VT_BOOL", values.Count, new MxArray
{
ElementDataType = MxDataType.Boolean,
VariantType = "VT_ARRAY|VT_BOOL",
BoolValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<int> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int32Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I4", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I4",
Int32Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<long> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int64Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I8", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I8",
Int64Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<float> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new FloatArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Float, "VT_ARRAY|VT_R4", values.Count, new MxArray
{
ElementDataType = MxDataType.Float,
VariantType = "VT_ARRAY|VT_R4",
FloatValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<double> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new DoubleArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Double, "VT_ARRAY|VT_R8", values.Count, new MxArray
{
ElementDataType = MxDataType.Double,
VariantType = "VT_ARRAY|VT_R8",
DoubleValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<string> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new StringArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.String, "VT_ARRAY|VT_BSTR", values.Count, new MxArray
{
ElementDataType = MxDataType.String,
VariantType = "VT_ARRAY|VT_BSTR",
StringValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<DateTimeOffset> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new TimestampArray();
array.Values.Add(values.Select(Timestamp.FromDateTimeOffset));
return CreateArrayValue(MxDataType.Time, "VT_ARRAY|VT_DATE", values.Count, new MxArray
{
ElementDataType = MxDataType.Time,
VariantType = "VT_ARRAY|VT_DATE",
TimestampValues = array,
});
}
public static string GetProjectionKind(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => "boolValue",
MxValue.KindOneofCase.Int32Value => "int32Value",
MxValue.KindOneofCase.Int64Value => "int64Value",
MxValue.KindOneofCase.FloatValue => "floatValue",
MxValue.KindOneofCase.DoubleValue => "doubleValue",
MxValue.KindOneofCase.StringValue => "stringValue",
MxValue.KindOneofCase.TimestampValue => "timestampValue",
MxValue.KindOneofCase.ArrayValue => "arrayValue",
MxValue.KindOneofCase.RawValue => "rawValue",
_ => value.IsNull ? "nullValue" : "unspecified",
};
}
public static object? ToClrValue(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => value.BoolValue,
MxValue.KindOneofCase.Int32Value => value.Int32Value,
MxValue.KindOneofCase.Int64Value => value.Int64Value,
MxValue.KindOneofCase.FloatValue => value.FloatValue,
MxValue.KindOneofCase.DoubleValue => value.DoubleValue,
MxValue.KindOneofCase.StringValue => value.StringValue,
MxValue.KindOneofCase.TimestampValue => value.TimestampValue.ToDateTimeOffset(),
MxValue.KindOneofCase.ArrayValue => value.ArrayValue.ToClrArrayValue(),
MxValue.KindOneofCase.RawValue => value.RawValue.ToByteArray(),
_ => value.IsNull ? null : value,
};
}
public static object? ToClrArrayValue(this MxArray array)
{
ArgumentNullException.ThrowIfNull(array);
return array.ValuesCase switch
{
MxArray.ValuesOneofCase.BoolValues => array.BoolValues.Values.ToArray(),
MxArray.ValuesOneofCase.Int32Values => array.Int32Values.Values.ToArray(),
MxArray.ValuesOneofCase.Int64Values => array.Int64Values.Values.ToArray(),
MxArray.ValuesOneofCase.FloatValues => array.FloatValues.Values.ToArray(),
MxArray.ValuesOneofCase.DoubleValues => array.DoubleValues.Values.ToArray(),
MxArray.ValuesOneofCase.StringValues => array.StringValues.Values.ToArray(),
MxArray.ValuesOneofCase.TimestampValues => array.TimestampValues.Values
.Select(timestamp => timestamp.ToDateTimeOffset())
.ToArray(),
MxArray.ValuesOneofCase.RawValues => array.RawValues.Values
.Select(value => value.ToByteArray())
.ToArray(),
_ => null,
};
}
public static MxValue ToRawMxValue(
byte[] value,
string variantType,
string rawDiagnostic,
int rawDataType = 0)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.Unknown,
VariantType = variantType,
RawDiagnostic = rawDiagnostic,
RawDataType = rawDataType,
RawValue = ByteString.CopyFrom(value),
};
}
private static MxValue CreateArrayValue(
MxDataType dataType,
string variantType,
int length,
MxArray array)
{
array.Dimensions.Add((uint)length);
return new MxValue
{
DataType = dataType,
VariantType = variantType,
ArrayValue = array,
};
}
}
+46
View File
@@ -63,3 +63,49 @@ complete `MxCommandReply`.
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return `MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
the first `CloseSessionReply` instead of sending another close request. the first `CloseSessionReply` instead of sending another close request.
## Values, Status, And Errors
The client provides extension helpers for generated protobuf values. Use
`ToMxValue()` on .NET scalar values and typed arrays to create `MxValue`
instances for `Write` and `Write2`. Use `ToClrValue()` and
`GetProjectionKind()` when test or diagnostic code needs to inspect generated
`MxValue` replies while preserving `rawDiagnostic`, raw data type fields, and
raw byte payloads.
`MxStatusProxy.IsSuccess()` and `ToDiagnosticSummary()` expose MXAccess status
arrays without collapsing them into a single gateway success flag. Command
reply helpers follow the same split:
```csharp
reply.EnsureProtocolSuccess();
reply.EnsureMxAccessSuccess();
```
`EnsureProtocolSuccess()` raises gateway, session, worker, or command
exceptions for gateway-level failures. It leaves
`PROTOCOL_STATUS_CODE_MXACCESS_FAILURE` to `EnsureMxAccessSuccess()` so callers
can keep the full `MxCommandReply`, HRESULT, and status array when MXAccess
itself rejects a command. `MxAccessException.Reply` contains the raw generated
reply.
## CLI Usage
The test CLI supports deterministic JSON output for automation:
```powershell
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- version --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- open-session --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- register --session-id <id> --client-name mxgw-dotnet-cli --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- add-item --session-id <id> --server-handle 1 --item Area001.Pump001.Speed --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- advise --session-id <id> --server-handle 1 --item-handle 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write2 --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --timestamp 2026-01-01T00:00:00Z --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- stream-events --session-id <id> --max-events 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item Area001.Pump001.Speed --json
```
`smoke` opens a session, registers a client, adds one item, advises it,
optionally writes a value when `--type` and `--value` are supplied, reads a
bounded event stream, and closes the session in a `finally` block. CLI error
output redacts API keys supplied through `--api-key`.
+57
View File
@@ -0,0 +1,57 @@
# Python Client
The Python client package contains generated MXAccess Gateway protobuf
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
scaffold. The package uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
## Layout
```text
clients/python/
pyproject.toml
generate-proto.ps1
src/mxgateway/
src/mxgateway/generated/
src/mxgateway_cli/
tests/
```
`src/mxgateway/generated` contains code produced by `grpc_tools.protoc`. Do not
edit generated files by hand.
## Regenerating Protobuf Bindings
Run generation after the shared `.proto` files or the Python output path
changes:
```powershell
./generate-proto.ps1
```
The script uses the Python tool path recorded in
`../../docs/toolchain-links.md`.
## Build And Test
Run the Python checks from `clients/python`:
```powershell
python -m pip install -e ".[dev]"
python -m pytest
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
```
The scaffold tests import the generated gateway and worker stubs and exercise
the deterministic CLI version output.
## CLI
The scaffold CLI exposes version information:
```powershell
mxgw-py version --json
```
Additional commands are implemented with the async client/session wrapper work.
+22
View File
@@ -0,0 +1,22 @@
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
$outputRoot = Join-Path $PSScriptRoot 'src\mxgateway\generated'
$python = 'C:\Users\dohertj2\AppData\Local\Programs\Python\Python312\python.exe'
if (-not (Test-Path $python)) {
throw "Python was not found at $python. See docs/toolchain-links.md."
}
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2.py') -File | Remove-Item
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2_grpc.py') -File | Remove-Item
& $python -m grpc_tools.protoc `
"-I$protoRoot" `
"--python_out=$outputRoot" `
"--grpc_python_out=$outputRoot" `
mxaccess_gateway.proto `
mxaccess_worker.proto
+33
View File
@@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "mxaccess-gateway-client"
version = "0.1.0"
description = "Async Python client scaffold for MXAccess Gateway."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"click>=8.3,<9",
"grpcio>=1.80,<2",
"protobuf>=6.33,<7",
]
[project.optional-dependencies]
dev = [
"grpcio-tools>=1.80,<2",
"pytest>=9,<10",
"pytest-asyncio>=1.3,<2",
]
[project.scripts]
mxgw-py = "mxgateway_cli.commands:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
addopts = "-ra"
pythonpath = ["src"]
testpaths = ["tests"]
+5
View File
@@ -0,0 +1,5 @@
"""MXAccess Gateway Python client package."""
from .version import __version__
__all__ = ["__version__"]
@@ -0,0 +1,29 @@
"""Generated protobuf and gRPC modules for MXAccess Gateway.
The Python protobuf generator emits absolute imports between generated modules.
This package initializer registers package-local aliases so callers can import
the generated stubs through `mxgateway.generated` without moving the modules to
the top-level import namespace.
"""
from importlib import import_module
import sys
mxaccess_gateway_pb2 = import_module(f"{__name__}.mxaccess_gateway_pb2")
sys.modules.setdefault("mxaccess_gateway_pb2", mxaccess_gateway_pb2)
mxaccess_gateway_pb2_grpc = import_module(f"{__name__}.mxaccess_gateway_pb2_grpc")
sys.modules.setdefault("mxaccess_gateway_pb2_grpc", mxaccess_gateway_pb2_grpc)
mxaccess_worker_pb2 = import_module(f"{__name__}.mxaccess_worker_pb2")
sys.modules.setdefault("mxaccess_worker_pb2", mxaccess_worker_pb2)
mxaccess_worker_pb2_grpc = import_module(f"{__name__}.mxaccess_worker_pb2_grpc")
sys.modules.setdefault("mxaccess_worker_pb2_grpc", mxaccess_worker_pb2_grpc)
__all__ = [
"mxaccess_gateway_pb2",
"mxaccess_gateway_pb2_grpc",
"mxaccess_worker_pb2",
"mxaccess_worker_pb2_grpc",
]
File diff suppressed because one or more lines are too long
@@ -0,0 +1,229 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_gateway_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class MxAccessGatewayStub(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.OpenSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
request_serializer=mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.OpenSessionReply.FromString,
_registered_method=True)
self.CloseSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
request_serializer=mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.CloseSessionReply.FromString,
_registered_method=True)
self.Invoke = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
request_serializer=mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxCommandReply.FromString,
_registered_method=True)
self.StreamEvents = channel.unary_stream(
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
request_serializer=mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxEvent.FromString,
_registered_method=True)
class MxAccessGatewayServicer(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def OpenSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CloseSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Invoke(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamEvents(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MxAccessGatewayServicer_to_server(servicer, server):
rpc_method_handlers = {
'OpenSession': grpc.unary_unary_rpc_method_handler(
servicer.OpenSession,
request_deserializer=mxaccess__gateway__pb2.OpenSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.OpenSessionReply.SerializeToString,
),
'CloseSession': grpc.unary_unary_rpc_method_handler(
servicer.CloseSession,
request_deserializer=mxaccess__gateway__pb2.CloseSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.CloseSessionReply.SerializeToString,
),
'Invoke': grpc.unary_unary_rpc_method_handler(
servicer.Invoke,
request_deserializer=mxaccess__gateway__pb2.MxCommandRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxCommandReply.SerializeToString,
),
'StreamEvents': grpc.unary_stream_rpc_method_handler(
servicer.StreamEvents,
request_deserializer=mxaccess__gateway__pb2.StreamEventsRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxEvent.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class MxAccessGateway(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
@staticmethod
def OpenSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
mxaccess__gateway__pb2.OpenSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CloseSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
mxaccess__gateway__pb2.CloseSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def Invoke(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
mxaccess__gateway__pb2.MxCommandReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamEvents(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
mxaccess__gateway__pb2.MxEvent.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: mxaccess_worker.proto
# Protobuf Python Version: 6.31.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
6,
31,
1,
'',
'mxaccess_worker.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15mxaccess_worker.proto\x12\x12mxaccess_worker.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16mxaccess_gateway.proto\"\x95\x06\n\x0eWorkerEnvelope\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\x12\n\nsession_id\x18\x02 \x01(\t\x12\x10\n\x08sequence\x18\x03 \x01(\x04\x12\x16\n\x0e\x63orrelation_id\x18\x04 \x01(\t\x12\x39\n\rgateway_hello\x18\n \x01(\x0b\x32 .mxaccess_worker.v1.GatewayHelloH\x00\x12\x37\n\x0cworker_hello\x18\x0b \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerHelloH\x00\x12\x37\n\x0cworker_ready\x18\x0c \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerReadyH\x00\x12;\n\x0eworker_command\x18\r \x01(\x0b\x32!.mxaccess_worker.v1.WorkerCommandH\x00\x12\x46\n\x14worker_command_reply\x18\x0e \x01(\x0b\x32&.mxaccess_worker.v1.WorkerCommandReplyH\x00\x12\x39\n\rworker_cancel\x18\x0f \x01(\x0b\x32 .mxaccess_worker.v1.WorkerCancelH\x00\x12=\n\x0fworker_shutdown\x18\x10 \x01(\x0b\x32\".mxaccess_worker.v1.WorkerShutdownH\x00\x12\x44\n\x13worker_shutdown_ack\x18\x11 \x01(\x0b\x32%.mxaccess_worker.v1.WorkerShutdownAckH\x00\x12\x37\n\x0cworker_event\x18\x12 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerEventH\x00\x12?\n\x10worker_heartbeat\x18\x13 \x01(\x0b\x32#.mxaccess_worker.v1.WorkerHeartbeatH\x00\x12\x37\n\x0cworker_fault\x18\x14 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerFaultH\x00\x42\x06\n\x04\x62ody\"Z\n\x0cGatewayHello\x12\"\n\x1asupported_protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x17\n\x0fgateway_version\x18\x03 \x01(\t\"i\n\x0bWorkerHello\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x19\n\x11worker_process_id\x18\x03 \x01(\x05\x12\x16\n\x0eworker_version\x18\x04 \x01(\t\"\x8e\x01\n\x0bWorkerReady\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12\x17\n\x0fmxaccess_progid\x18\x02 \x01(\t\x12\x16\n\x0emxaccess_clsid\x18\x03 \x01(\t\x12\x33\n\x0fready_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"w\n\rWorkerCommand\x12/\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x1e.mxaccess_gateway.v1.MxCommand\x12\x35\n\x11\x65nqueue_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x81\x01\n\x12WorkerCommandReply\x12\x32\n\x05reply\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.MxCommandReply\x12\x37\n\x13\x63ompleted_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x0cWorkerCancel\x12\x0e\n\x06reason\x18\x01 \x01(\t\"Q\n\x0eWorkerShutdown\x12/\n\x0cgrace_period\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x0e\n\x06reason\x18\x02 \x01(\t\"H\n\x11WorkerShutdownAck\x12\x33\n\x06status\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatus\":\n\x0bWorkerEvent\x12+\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.mxaccess_gateway.v1.MxEvent\"\xa5\x02\n\x0fWorkerHeartbeat\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12.\n\x05state\x18\x02 \x01(\x0e\x32\x1f.mxaccess_worker.v1.WorkerState\x12?\n\x1blast_sta_activity_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1d\n\x15pending_command_count\x18\x04 \x01(\r\x12\"\n\x1aoutbound_event_queue_depth\x18\x05 \x01(\r\x12\x1b\n\x13last_event_sequence\x18\x06 \x01(\x04\x12&\n\x1e\x63urrent_command_correlation_id\x18\x07 \x01(\t\"\xf4\x01\n\x0bWorkerFault\x12\x39\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\'.mxaccess_worker.v1.WorkerFaultCategory\x12\x16\n\x0e\x63ommand_method\x18\x02 \x01(\t\x12\x14\n\x07hresult\x18\x03 \x01(\x05H\x00\x88\x01\x01\x12\x16\n\x0e\x65xception_type\x18\x04 \x01(\t\x12\x1a\n\x12\x64iagnostic_message\x18\x05 \x01(\t\x12<\n\x0fprotocol_status\x18\x06 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatusB\n\n\x08_hresult*\x97\x02\n\x0bWorkerState\x12\x1c\n\x18WORKER_STATE_UNSPECIFIED\x10\x00\x12\x19\n\x15WORKER_STATE_STARTING\x10\x01\x12\x1c\n\x18WORKER_STATE_HANDSHAKING\x10\x02\x12!\n\x1dWORKER_STATE_INITIALIZING_STA\x10\x03\x12\x16\n\x12WORKER_STATE_READY\x10\x04\x12\"\n\x1eWORKER_STATE_EXECUTING_COMMAND\x10\x05\x12\x1e\n\x1aWORKER_STATE_SHUTTING_DOWN\x10\x06\x12\x18\n\x14WORKER_STATE_STOPPED\x10\x07\x12\x18\n\x14WORKER_STATE_FAULTED\x10\x08*\xc7\x04\n\x13WorkerFaultCategory\x12%\n!WORKER_FAULT_CATEGORY_UNSPECIFIED\x10\x00\x12+\n\'WORKER_FAULT_CATEGORY_INVALID_ARGUMENTS\x10\x01\x12\x37\n3WORKER_FAULT_CATEGORY_GATEWAY_AUTHENTICATION_FAILED\x10\x02\x12+\n\'WORKER_FAULT_CATEGORY_PROTOCOL_MISMATCH\x10\x03\x12,\n(WORKER_FAULT_CATEGORY_PROTOCOL_VIOLATION\x10\x04\x12+\n\'WORKER_FAULT_CATEGORY_PIPE_DISCONNECTED\x10\x05\x12\x32\n.WORKER_FAULT_CATEGORY_MXACCESS_CREATION_FAILED\x10\x06\x12\x31\n-WORKER_FAULT_CATEGORY_MXACCESS_COMMAND_FAILED\x10\x07\x12:\n6WORKER_FAULT_CATEGORY_MXACCESS_EVENT_CONVERSION_FAILED\x10\x08\x12\"\n\x1eWORKER_FAULT_CATEGORY_STA_HUNG\x10\t\x12(\n$WORKER_FAULT_CATEGORY_QUEUE_OVERFLOW\x10\n\x12*\n&WORKER_FAULT_CATEGORY_SHUTDOWN_TIMEOUT\x10\x0b\x42\x1c\xaa\x02\x19MxGateway.Contracts.Protob\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'mxaccess_worker_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\252\002\031MxGateway.Contracts.Proto'
_globals['_WORKERSTATE']._serialized_start=2316
_globals['_WORKERSTATE']._serialized_end=2595
_globals['_WORKERFAULTCATEGORY']._serialized_start=2598
_globals['_WORKERFAULTCATEGORY']._serialized_end=3181
_globals['_WORKERENVELOPE']._serialized_start=135
_globals['_WORKERENVELOPE']._serialized_end=924
_globals['_GATEWAYHELLO']._serialized_start=926
_globals['_GATEWAYHELLO']._serialized_end=1016
_globals['_WORKERHELLO']._serialized_start=1018
_globals['_WORKERHELLO']._serialized_end=1123
_globals['_WORKERREADY']._serialized_start=1126
_globals['_WORKERREADY']._serialized_end=1268
_globals['_WORKERCOMMAND']._serialized_start=1270
_globals['_WORKERCOMMAND']._serialized_end=1389
_globals['_WORKERCOMMANDREPLY']._serialized_start=1392
_globals['_WORKERCOMMANDREPLY']._serialized_end=1521
_globals['_WORKERCANCEL']._serialized_start=1523
_globals['_WORKERCANCEL']._serialized_end=1553
_globals['_WORKERSHUTDOWN']._serialized_start=1555
_globals['_WORKERSHUTDOWN']._serialized_end=1636
_globals['_WORKERSHUTDOWNACK']._serialized_start=1638
_globals['_WORKERSHUTDOWNACK']._serialized_end=1710
_globals['_WORKEREVENT']._serialized_start=1712
_globals['_WORKEREVENT']._serialized_end=1770
_globals['_WORKERHEARTBEAT']._serialized_start=1773
_globals['_WORKERHEARTBEAT']._serialized_end=2066
_globals['_WORKERFAULT']._serialized_start=2069
_globals['_WORKERFAULT']._serialized_end=2313
# @@protoc_insertion_point(module_scope)
@@ -0,0 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_worker_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
+3
View File
@@ -0,0 +1,3 @@
"""Package version information."""
__version__ = "0.1.0"
@@ -0,0 +1 @@
"""Command-line entry points for the MXAccess Gateway Python client."""
@@ -0,0 +1,6 @@
"""Module execution entry point for `python -m mxgateway_cli`."""
from .commands import main
if __name__ == "__main__":
main()
@@ -0,0 +1,29 @@
"""CLI scaffold for the MXAccess Gateway Python client."""
import json
import click
from mxgateway import __version__
@click.group()
def main() -> None:
"""MXAccess Gateway Python test CLI."""
@main.command()
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def version(output_json: bool) -> None:
"""Print client package version information."""
payload = {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
if output_json:
click.echo(json.dumps(payload, sort_keys=True))
return
click.echo(f"mxgw-py {__version__}")
+21
View File
@@ -0,0 +1,21 @@
"""Tests for the Python CLI scaffold."""
import json
from click.testing import CliRunner
from mxgateway import __version__
from mxgateway_cli.commands import main
def test_version_json_is_deterministic() -> None:
runner = CliRunner()
result = runner.invoke(main, ["version", "--json"])
assert result.exit_code == 0
assert json.loads(result.output) == {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
@@ -0,0 +1,30 @@
"""Tests for generated protobuf and gRPC module importability."""
from mxgateway.generated import mxaccess_gateway_pb2
from mxgateway.generated import mxaccess_gateway_pb2_grpc
from mxgateway.generated import mxaccess_worker_pb2
def test_gateway_messages_import() -> None:
request = mxaccess_gateway_pb2.OpenSessionRequest(
client_session_name="pytest",
client_correlation_id="test-correlation",
)
assert request.client_session_name == "pytest"
assert hasattr(mxaccess_gateway_pb2_grpc, "MxAccessGatewayStub")
def test_worker_messages_import_gateway_types() -> None:
envelope = mxaccess_worker_pb2.WorkerEnvelope(
protocol_version=1,
session_id="test-session",
worker_command=mxaccess_worker_pb2.WorkerCommand(
command=mxaccess_gateway_pb2.MxCommand(
kind=mxaccess_gateway_pb2.MX_COMMAND_KIND_PING,
ping=mxaccess_gateway_pb2.PingCommand(message="hello"),
),
),
)
assert envelope.worker_command.command.ping.message == "hello"
+6
View File
@@ -131,6 +131,12 @@ Python clients should use `grpc_tools.protoc` and write generated modules under
`clients/python/src/mxgateway/generated` so imports stay separate from `clients/python/src/mxgateway/generated` so imports stay separate from
handwritten async wrappers. handwritten async wrappers.
The Python scaffold provides a repo-local generation script:
```powershell
clients/python/generate-proto.ps1
```
Java clients should use the Gradle protobuf plugin and write generated sources Java clients should use the Gradle protobuf plugin and write generated sources
under `clients/java/src/main/generated`. The Java client scaffold owns the under `clients/java/src/main/generated`. The Java client scaffold owns the
Gradle plugin versions and source-set wiring. Gradle plugin versions and source-set wiring.