Compare commits

..

6 Commits

Author SHA1 Message Date
Joseph Doherty 89a8fb876a Issue #44: implement Rust client session values errors and CLI 2026-04-26 20:30:04 -04:00
dohertj2 c58358fad9 Merge pull request #94 from agent-3/issue-45-scaffold-python-package
Issue #45: scaffold Python package
2026-04-26 20:28:13 -04:00
dohertj2 8d312a6d2e Merge pull request #93 from agent-1/issue-40-implement-dotnet-values-status-errors-and-cli
Issue #40: implement .NET values status errors and CLI
2026-04-26 20:22:58 -04:00
Joseph Doherty f861a8b3b8 Issue #45: scaffold Python package 2026-04-26 20:22:35 -04:00
Joseph Doherty 499708b2a2 Issue #40: implement .NET values status errors and CLI 2026-04-26 20:17:02 -04:00
dohertj2 191b724f95 Merge pull request #92 from agent-3/issue-42-implement-go-client-session-values-errors-and-cli
Issue #42: implement Go client session values errors and CLI
2026-04-26 20:14:56 -04:00
54 changed files with 4795 additions and 68 deletions
@@ -0,0 +1,117 @@
using System.Globalization;
namespace MxGateway.Client.Cli;
internal sealed class CliArguments
{
private readonly Dictionary<string, string> _values = new(StringComparer.OrdinalIgnoreCase);
private readonly HashSet<string> _flags = new(StringComparer.OrdinalIgnoreCase);
public CliArguments(IEnumerable<string> args)
{
string? pendingName = null;
foreach (string arg in args)
{
if (arg.StartsWith("--", StringComparison.Ordinal))
{
if (pendingName is not null)
{
_flags.Add(pendingName);
}
pendingName = arg[2..];
continue;
}
if (pendingName is null)
{
throw new ArgumentException($"Unexpected argument '{arg}'.");
}
_values[pendingName] = arg;
pendingName = null;
}
if (pendingName is not null)
{
_flags.Add(pendingName);
}
}
public bool HasFlag(string name)
{
return _flags.Contains(name);
}
public string? GetOptional(string name)
{
return _values.TryGetValue(name, out string? value)
? value
: null;
}
public string GetRequired(string name)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"Missing required option --{name}.");
}
return value;
}
public int GetInt32(string name, int? defaultValue = null)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
if (defaultValue.HasValue)
{
return defaultValue.Value;
}
throw new ArgumentException($"Missing required option --{name}.");
}
return int.Parse(value, CultureInfo.InvariantCulture);
}
public uint GetUInt32(string name, uint defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: uint.Parse(value, CultureInfo.InvariantCulture);
}
public ulong GetUInt64(string name, ulong defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: ulong.Parse(value, CultureInfo.InvariantCulture);
}
public TimeSpan GetDuration(string name, TimeSpan defaultValue)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
return defaultValue;
}
if (value.EndsWith("ms", StringComparison.OrdinalIgnoreCase))
{
return TimeSpan.FromMilliseconds(double.Parse(value[..^2], CultureInfo.InvariantCulture));
}
if (value.EndsWith('s'))
{
return TimeSpan.FromSeconds(double.Parse(value[..^1], CultureInfo.InvariantCulture));
}
return TimeSpan.Parse(value, CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
public interface IMxGatewayCliClient : IAsyncDisposable
{
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,40 @@
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
internal sealed class MxGatewayCliClientAdapter(MxGatewayClient client) : IMxGatewayCliClient
{
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return client.OpenSessionRawAsync(request, cancellationToken);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
return client.CloseSessionRawAsync(request, cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
return client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken)
{
return client.StreamEventsAsync(request, cancellationToken);
}
public ValueTask DisposeAsync()
{
return client.DisposeAsync();
}
}
@@ -0,0 +1,14 @@
namespace MxGateway.Client.Cli;
internal static class MxGatewayCliSecretRedactor
{
public static string Redact(string value, string? apiKey)
{
if (string.IsNullOrEmpty(value) || string.IsNullOrEmpty(apiKey))
{
return value;
}
return value.Replace(apiKey, "[redacted]", StringComparison.Ordinal);
}
}
@@ -1,34 +1,702 @@
using System.Globalization;
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
public static class MxGatewayClientCli
{
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
public static int Run(
string[] args,
TextWriter standardOutput,
TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
.GetAwaiter()
.GetResult();
}
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
ArgumentNullException.ThrowIfNull(standardError);
return RunCoreAsync(
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
}
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
{
if (args.Length is 0 || IsHelp(args[0]))
{
WriteUsage(standardOutput);
return 0;
}
if (string.Equals(args[0], "version", StringComparison.OrdinalIgnoreCase))
string command = args[0].ToLowerInvariant();
CliArguments arguments = new(args.Skip(1));
try
{
standardOutput.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
standardOutput.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
return 0;
if (command is "version")
{
WriteVersion(arguments, standardOutput);
return 0;
}
if (!IsKnownGatewayCommand(command))
{
return WriteUnknownCommand(command, standardError);
}
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
using CancellationTokenSource cancellation = CreateCancellation(arguments);
return command switch
{
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
_ => WriteUnknownCommand(command, standardError),
};
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
string? apiKey = arguments.GetOptional("api-key");
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
JsonOptions));
}
else
{
standardError.WriteLine(message);
}
return 1;
}
}
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
{
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
}
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
{
string endpoint = arguments.GetOptional("endpoint")
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
?? "http://localhost:5000";
string apiKey = ResolveApiKey(arguments);
return new MxGatewayClientOptions
{
Endpoint = new Uri(endpoint, UriKind.Absolute),
ApiKey = apiKey,
UseTls = arguments.HasFlag("tls")
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
CaCertificatePath = arguments.GetOptional("ca-file"),
ServerNameOverride = arguments.GetOptional("server-name"),
};
}
private static string ResolveApiKey(CliArguments arguments)
{
string? apiKey = arguments.GetOptional("api-key");
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
standardError.WriteLine($"Unknown command '{args[0]}'.");
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
?? "MXGATEWAY_API_KEY";
apiKey = Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
throw new ArgumentException(
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
}
private static CancellationTokenSource CreateCancellation(CliArguments arguments)
{
var cancellation = new CancellationTokenSource();
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
cancellation.CancelAfter(timeout);
return cancellation;
}
private static Task<int> OpenSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
ClientCorrelationId = CreateCorrelationId(),
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
},
cancellationToken),
arguments,
output);
}
private static Task<int> CloseSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = arguments.GetRequired("session-id"),
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken),
arguments,
output);
}
private static Task<int> PingAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
},
cancellationToken);
}
private static Task<int> RegisterAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
},
cancellationToken);
}
private static Task<int> AddItemAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemDefinition = arguments.GetRequired("item"),
},
},
cancellationToken);
}
private static Task<int> AdviseAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
},
},
cancellationToken);
}
private static Task<int> WriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
},
cancellationToken);
}
private static Task<int> Write2Async(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
TimestampValue = ParseTimestampValue(arguments),
},
},
cancellationToken);
}
private static async Task<int> StreamEventsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
var events = new List<MxEvent>();
uint maxEvents = arguments.GetUInt32("max-events", 0);
uint eventCount = 0;
var request = new StreamEventsRequest
{
SessionId = arguments.GetRequired("session-id"),
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
};
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (arguments.HasFlag("json"))
{
events.Add(gatewayEvent);
}
else
{
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
}
eventCount++;
if (maxEvents > 0 && eventCount >= maxEvents)
{
break;
}
}
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new { events = events.Select(EventToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
OpenSessionReply? openReply = null;
CloseSessionReply? closeReply = null;
var commandReplies = new List<MxCommandReply>();
var events = new List<MxEvent>();
try
{
openReply = await client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken)
.ConfigureAwait(false);
int serverHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
},
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
int itemHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = arguments.GetRequired("item"),
},
},
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}),
cancellationToken)
.ConfigureAwait(false));
if (arguments.GetOptional("value") is not null)
{
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
}),
cancellationToken)
.ConfigureAwait(false));
}
using CancellationTokenSource streamCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
streamCancellation.CancelAfter(arguments.GetDuration(
"event-timeout",
TimeSpan.FromSeconds(2)));
try
{
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
new StreamEventsRequest { SessionId = openReply.SessionId },
streamCancellation.Token)
.WithCancellation(streamCancellation.Token)
.ConfigureAwait(false))
{
events.Add(gatewayEvent);
if (events.Count >= arguments.GetUInt32("max-events", 1))
{
break;
}
}
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
}
}
finally
{
if (openReply is not null)
{
closeReply = await client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = openReply.SessionId,
ClientCorrelationId = CreateCorrelationId(),
},
CancellationToken.None)
.ConfigureAwait(false);
}
}
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
return 0;
}
private static async Task<int> InvokeAndWriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
MxCommand command,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(arguments.GetRequired("session-id"), command),
cancellationToken)
.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static async Task<int> InvokeForHandleAsync(
CliArguments arguments,
IMxGatewayCliClient client,
string sessionId,
MxCommand command,
Func<MxCommandReply, int> handleSelector,
List<MxCommandReply> replies,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, command),
cancellationToken)
.ConfigureAwait(false);
replies.Add(reply);
return handleSelector(reply);
}
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
IMxGatewayCliClient client,
MxCommandRequest request,
CancellationToken cancellationToken)
{
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply;
}
private static MxCommandRequest CreateCommandRequest(
string sessionId,
MxCommand command)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = CreateCorrelationId(),
Command = command,
};
}
private static async Task<int> WriteReplyAsync<TReply>(
Task<TReply> replyTask,
CliArguments arguments,
TextWriter output)
where TReply : IMessage
{
TReply reply = await replyTask.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static void WriteVersion(CliArguments arguments, TextWriter output)
{
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new
{
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
},
JsonOptions));
return;
}
output.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
output.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
}
private static void WriteMessage(
CliArguments arguments,
TextWriter output,
IMessage message)
{
output.WriteLine(arguments.HasFlag("json")
? ProtobufJsonFormatter.Format(message)
: message.ToString());
}
private static void WriteSmokeResult(
CliArguments arguments,
TextWriter output,
OpenSessionReply? openReply,
CloseSessionReply? closeReply,
IReadOnlyList<MxCommandReply> commandReplies,
IReadOnlyList<MxEvent> events)
{
if (!arguments.HasFlag("json"))
{
output.WriteLine($"session-id={openReply?.SessionId}");
output.WriteLine($"commands={commandReplies.Count}");
output.WriteLine($"events={events.Count}");
output.WriteLine($"closed={closeReply is not null}");
return;
}
output.WriteLine(JsonSerializer.Serialize(
new
{
sessionId = openReply?.SessionId,
closed = closeReply is not null,
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
events = events.Select(EventToJsonElement).ToArray(),
},
JsonOptions));
}
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
}
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
}
private static MxValue ParseValue(CliArguments arguments)
{
string type = arguments.GetRequired("type").ToLowerInvariant();
string value = arguments.GetRequired("value");
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
return type switch
{
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"string" => value.ToMxValue(),
"string-array" => values.ToMxValue(),
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
"time-array" or "timestamp-array" => values
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
.ToArray()
.ToMxValue(),
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
};
}
private static MxValue ParseTimestampValue(CliArguments arguments)
{
string timestamp = arguments.GetOptional("timestamp")
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
return DateTimeOffset.Parse(
timestamp,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal)
.ToMxValue();
}
private static int WriteUnknownCommand(string command, TextWriter standardError)
{
standardError.WriteLine($"Unknown command '{command}'.");
WriteUsage(standardError);
return 2;
}
@@ -40,9 +708,37 @@ public static class MxGatewayClientCli
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
}
private static bool IsKnownGatewayCommand(string command)
{
return command is "open-session"
or "close-session"
or "ping"
or "register"
or "add-item"
or "advise"
or "stream-events"
or "write"
or "write2"
or "smoke";
}
private static string CreateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet version");
writer.WriteLine("mxgw-dotnet --help");
writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
}
}
@@ -1,3 +1,3 @@
using MxGateway.Client.Cli;
return MxGatewayClientCli.Run(args, Console.Out, Console.Error);
return await MxGatewayClientCli.RunAsync(args, Console.Out, Console.Error);
@@ -0,0 +1,78 @@
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxCommandReplyExtensionsTests
{
[Fact]
public void EnsureSuccess_WithRegisterFixture_ReturnsReply()
{
MxCommandReply reply = ReadReplyFixture("register.ok.reply.json");
Assert.Same(reply, reply.EnsureProtocolSuccess());
Assert.Same(reply, reply.EnsureMxAccessSuccess());
}
[Fact]
public void EnsureMxAccessSuccess_WithFailureFixture_PreservesHResultAndStatuses()
{
MxCommandReply reply = ReadReplyFixture("write.mxaccess-failure.reply.json");
reply.EnsureProtocolSuccess();
MxAccessException exception = Assert.Throws<MxAccessException>(
reply.EnsureMxAccessSuccess);
Assert.Equal(-2147220992, exception.HResultCode);
Assert.Equal(reply.Statuses.Count, exception.Statuses.Count);
Assert.Equal(reply, exception.Reply);
Assert.Contains("0x80040200", exception.Message);
}
[Fact]
public void EnsureProtocolSuccess_WithSessionFailure_ThrowsSessionException()
{
MxCommandReply reply = new()
{
SessionId = "session-missing",
CorrelationId = "correlation",
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotFound,
Message = "Session was not found.",
},
};
MxGatewaySessionException exception = Assert.Throws<MxGatewaySessionException>(
reply.EnsureProtocolSuccess);
Assert.Equal("session-missing", exception.SessionId);
Assert.Equal(ProtocolStatusCode.SessionNotFound, exception.ProtocolStatus?.Code);
}
private static MxCommandReply ReadReplyFixture(string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
"command-replies",
fileName);
if (File.Exists(path))
{
return JsonParser.Default.Parse<MxCommandReply>(File.ReadAllText(path));
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -1,4 +1,5 @@
using MxGateway.Client.Cli;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
@@ -17,4 +18,224 @@ public sealed class MxGatewayClientCliTests
Assert.Contains("worker-protocol=1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_VersionJson_PrintsJsonProtocolVersions()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(["version", "--json"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("\"gatewayProtocolVersion\":1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_Write_BuildsWriteCommandAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.InvokeReplies.Enqueue(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"write",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"int32",
"--value",
"123",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
MxCommandRequest request = Assert.Single(fakeClient.InvokeRequests);
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(123, request.Command.Write.Value.Int32Value);
Assert.Contains("MX_COMMAND_KIND_WRITE", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_ErrorOutput_RedactsApiKey()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(
[
"open-session",
"--endpoint",
"http://localhost:5000",
"--api-key",
"secret-api-key",
],
output,
error,
_ => throw new InvalidOperationException("boom secret-api-key"));
Assert.Equal(1, exitCode);
Assert.DoesNotContain("secret-api-key", error.ToString());
Assert.Contains("[redacted]", error.ToString());
}
[Fact]
public async Task RunAsync_StreamEvents_WithMaxEventsStopsNonJsonOutput()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-events",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
Assert.Contains("workerSequence", output.ToString());
Assert.DoesNotContain("ON_WRITE_COMPLETE", output.ToString());
}
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new()
{
InvokeFailure = new InvalidOperationException("register failed"),
};
int exitCode = await MxGatewayClientCli.RunAsync(
[
"smoke",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--item",
"Area001.Pump001.Speed",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(1, exitCode);
CloseSessionRequest closeRequest = Assert.Single(fakeClient.CloseSessionRequests);
Assert.Equal("session-fixture", closeRequest.SessionId);
}
private sealed class FakeCliClient : IMxGatewayCliClient
{
public Queue<MxCommandReply> InvokeReplies { get; } = new();
public List<MxCommandRequest> InvokeRequests { get; } = [];
public List<CloseSessionRequest> CloseSessionRequests { get; } = [];
public List<MxEvent> Events { get; } = [];
public Exception? InvokeFailure { get; init; }
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return Task.FromResult(new OpenSessionReply
{
SessionId = "session-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
});
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
CloseSessionRequests.Add(request);
return Task.FromResult(new CloseSessionReply
{
SessionId = request.SessionId,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
FinalState = SessionState.Closed,
});
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
InvokeRequests.Add(request);
if (InvokeFailure is not null)
{
throw InvokeFailure;
}
return Task.FromResult(InvokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (MxEvent gatewayEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
}
}
@@ -110,6 +110,33 @@ public sealed class MxGatewayClientSessionTests
Assert.Equal(56, request.Command.Write.UserId);
}
[Fact]
public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = 123.ToMxValue();
MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue();
MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56);
Assert.Equal(MxCommandKind.Write2, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write2, request.Command.Kind);
Assert.Equal(12, request.Command.Write2.ServerHandle);
Assert.Equal(34, request.Command.Write2.ItemHandle);
Assert.Same(value, request.Command.Write2.Value);
Assert.Same(timestampValue, request.Command.Write2.TimestampValue);
Assert.Equal(56, request.Command.Write2.UserId);
}
[Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{
@@ -0,0 +1,57 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxStatusProxyExtensionsTests
{
[Fact]
public void FixtureStatuses_ProjectSuccessAndPreserveRawFields()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"statuses",
"status-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
MxStatusProxy status = JsonParser.Default.Parse<MxStatusProxy>(
testCase.GetProperty("status").GetRawText());
int success = testCase.GetProperty("status").GetProperty("success").GetInt32();
Assert.Equal(success != 0 && status.Category is MxStatusCategory.Ok, status.IsSuccess());
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawCategory").GetInt32(),
status.RawCategory);
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawDetectedBy").GetInt32(),
status.RawDetectedBy);
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -0,0 +1,79 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxValueExtensionsTests
{
[Fact]
public void ToMxValue_WithScalarValues_CreatesTypedProtobufValues()
{
Assert.Equal(MxValue.KindOneofCase.BoolValue, true.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int32Value, 123.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int64Value, 123L.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.FloatValue, 1.25F.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.DoubleValue, 2.5D.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.StringValue, "alpha".ToMxValue().KindCase);
}
[Fact]
public void ToMxValue_WithArrays_CreatesTypedArrayProtobufValues()
{
MxValue value = new[] { "alpha", "beta" }.ToMxValue();
Assert.Equal(MxValue.KindOneofCase.ArrayValue, value.KindCase);
Assert.Equal(MxArray.ValuesOneofCase.StringValues, value.ArrayValue.ValuesCase);
Assert.Equal(["alpha", "beta"], value.ArrayValue.StringValues.Values);
Assert.Equal([2U], value.ArrayValue.Dimensions);
}
[Fact]
public void FixtureValues_ProjectExpectedKindsAndPreserveRawMetadata()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"values",
"value-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
string expectedKind = testCase.GetProperty("expectedKind").GetString()!;
MxValue value = JsonParser.Default.Parse<MxValue>(
testCase.GetProperty("value").GetRawText());
Assert.Equal(expectedKind, value.GetProjectionKind());
if (testCase.GetProperty("id").GetString() is "raw-fallback.variant")
{
Assert.Equal(32767, value.RawDataType);
Assert.Equal([1, 2, 3, 4, 5], Assert.IsType<byte[]>(value.ToClrValue()));
}
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -17,27 +17,48 @@ internal sealed class GrpcMxGatewayClientTransport(
OpenSessionRequest request,
CallOptions callOptions)
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
try
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
try
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
try
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
@@ -51,10 +72,24 @@ internal sealed class GrpcMxGatewayClientTransport(
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
await foreach (MxEvent gatewayEvent in call.ResponseStream
.ReadAllAsync(effectiveCancellationToken)
.ConfigureAwait(false))
IAsyncStreamReader<MxEvent> responseStream = call.ResponseStream;
while (true)
{
MxEvent? gatewayEvent;
try
{
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
{
break;
}
gatewayEvent = responseStream.Current;
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
yield return gatewayEvent;
}
}
@@ -65,4 +100,18 @@ internal sealed class GrpcMxGatewayClientTransport(
{
return StreamEventsAsync(request, callOptions);
}
private static MxGatewayException MapRpcException(RpcException exception)
{
return exception.StatusCode switch
{
StatusCode.Unauthenticated => new MxGatewayAuthenticationException(
exception.Status.Detail,
innerException: exception),
StatusCode.PermissionDenied => new MxGatewayAuthorizationException(
exception.Status.Detail,
innerException: exception),
_ => new MxGatewayException(exception.Status.Detail, exception),
};
}
}
@@ -0,0 +1,24 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxAccessException : MxGatewayCommandException
{
public MxAccessException(
string message,
MxCommandReply reply,
Exception? innerException = null)
: base(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
reply.HasHresult ? reply.Hresult : null,
reply.Statuses.ToArray(),
innerException)
{
Reply = reply;
}
public MxCommandReply Reply { get; }
}
@@ -0,0 +1,96 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxCommandReplyExtensions
{
public static MxCommandReply EnsureProtocolSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
ProtocolStatusCode code = reply.ProtocolStatus?.Code
?? ProtocolStatusCode.Unspecified;
if (code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure)
{
return reply;
}
throw CreateProtocolException(reply, code);
}
public static MxCommandReply EnsureMxAccessSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
bool mxAccessFailure = reply.ProtocolStatus?.Code is ProtocolStatusCode.MxaccessFailure;
bool hResultFailure = reply.HasHresult && reply.Hresult != 0;
bool statusFailure = reply.Statuses.Any(status => !status.IsSuccess());
if (!mxAccessFailure && !hResultFailure && !statusFailure)
{
return reply;
}
throw new MxAccessException(CreateMxAccessMessage(reply), reply);
}
private static MxGatewayException CreateProtocolException(
MxCommandReply reply,
ProtocolStatusCode code)
{
string message = CreateProtocolMessage(reply);
int? hResult = reply.HasHresult ? reply.Hresult : null;
MxStatusProxy[] statuses = reply.Statuses.ToArray();
return code switch
{
ProtocolStatusCode.SessionNotFound or ProtocolStatusCode.SessionNotReady
=> new MxGatewaySessionException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
ProtocolStatusCode.WorkerUnavailable
=> new MxGatewayWorkerException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
_
=> new MxGatewayCommandException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
};
}
private static string CreateProtocolMessage(MxCommandReply reply)
{
string statusMessage = string.IsNullOrWhiteSpace(reply.ProtocolStatus?.Message)
? "Gateway protocol failure."
: reply.ProtocolStatus.Message;
return $"{statusMessage} code={reply.ProtocolStatus?.Code}; session={reply.SessionId}; correlation={reply.CorrelationId}";
}
private static string CreateMxAccessMessage(MxCommandReply reply)
{
string statusSummary = reply.Statuses.Count is 0
? "no MXSTATUS_PROXY entries"
: string.Join("; ", reply.Statuses.Select(status => status.ToDiagnosticSummary()));
string hResult = reply.HasHresult
? $"0x{reply.Hresult:X8}"
: "none";
return $"MXAccess command failed. kind={reply.Kind}; hresult={hResult}; statuses={statusSummary}";
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthenticationException : MxGatewayException
{
public MxGatewayAuthenticationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthorizationException : MxGatewayException
{
public MxGatewayAuthorizationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayCommandException : MxGatewayException
{
public MxGatewayCommandException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,45 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayException : Exception
{
public MxGatewayException(string message)
: base(message)
{
Statuses = [];
}
public MxGatewayException(string message, Exception? innerException)
: base(message, innerException)
{
Statuses = [];
}
public MxGatewayException(
string message,
string? sessionId,
string? correlationId,
ProtocolStatus? protocolStatus,
int? hResult,
IReadOnlyList<MxStatusProxy> statuses,
Exception? innerException = null)
: base(message, innerException)
{
SessionId = sessionId;
CorrelationId = correlationId;
ProtocolStatus = protocolStatus;
HResultCode = hResult;
Statuses = statuses;
}
public string? SessionId { get; }
public string? CorrelationId { get; }
public ProtocolStatus? ProtocolStatus { get; }
public int? HResultCode { get; }
public IReadOnlyList<MxStatusProxy> Statuses { get; }
}
@@ -56,6 +56,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
{
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
}
@@ -84,6 +85,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
itemDefinition,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
@@ -119,6 +121,7 @@ public sealed class MxGatewaySession : IAsyncDisposable
itemContext,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
@@ -149,8 +152,9 @@ public sealed class MxGatewaySession : IAsyncDisposable
int itemHandle,
CancellationToken cancellationToken = default)
{
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> AdviseRawAsync(
@@ -178,8 +182,9 @@ public sealed class MxGatewaySession : IAsyncDisposable
int userId,
CancellationToken cancellationToken = default)
{
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> WriteRawAsync(
@@ -206,6 +211,52 @@ public sealed class MxGatewaySession : IAsyncDisposable
cancellationToken);
}
public async Task Write2Async(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await Write2RawAsync(
serverHandle,
itemHandle,
value,
timestampValue,
userId,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> Write2RawAsync(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
ArgumentNullException.ThrowIfNull(timestampValue);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
TimestampValue = timestampValue,
UserId = userId,
},
},
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewaySessionException : MxGatewayException
{
public MxGatewaySessionException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayWorkerException : MxGatewayException
{
public MxGatewayWorkerException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxStatusProxyExtensions
{
public static bool IsSuccess(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
return status.Success != 0
&& status.Category is MxStatusCategory.Ok;
}
public static string ToDiagnosticSummary(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
string diagnosticText = string.IsNullOrWhiteSpace(status.DiagnosticText)
? "no diagnostic text"
: status.DiagnosticText;
return $"{status.Category} by {status.DetectedBy}; detail={status.Detail}; {diagnosticText}";
}
}
@@ -0,0 +1,284 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Creates and projects gateway MXAccess values without hiding the raw
/// protobuf value carried by command replies and events.
/// </summary>
public static class MxValueExtensions
{
public static MxValue ToMxValue(this bool value)
{
return new MxValue
{
DataType = MxDataType.Boolean,
VariantType = "VT_BOOL",
BoolValue = value,
};
}
public static MxValue ToMxValue(this int value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = value,
};
}
public static MxValue ToMxValue(this long value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I8",
Int64Value = value,
};
}
public static MxValue ToMxValue(this float value)
{
return new MxValue
{
DataType = MxDataType.Float,
VariantType = "VT_R4",
FloatValue = value,
};
}
public static MxValue ToMxValue(this double value)
{
return new MxValue
{
DataType = MxDataType.Double,
VariantType = "VT_R8",
DoubleValue = value,
};
}
public static MxValue ToMxValue(this string value)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.String,
VariantType = "VT_BSTR",
StringValue = value,
};
}
public static MxValue ToMxValue(this DateTimeOffset value)
{
return new MxValue
{
DataType = MxDataType.Time,
VariantType = "VT_DATE",
TimestampValue = Timestamp.FromDateTimeOffset(value),
};
}
public static MxValue ToMxValue(this DateTime value)
{
return new DateTimeOffset(
value.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(value, DateTimeKind.Utc)
: value.ToUniversalTime())
.ToMxValue();
}
public static MxValue ToMxValue(this IReadOnlyList<bool> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new BoolArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Boolean, "VT_ARRAY|VT_BOOL", values.Count, new MxArray
{
ElementDataType = MxDataType.Boolean,
VariantType = "VT_ARRAY|VT_BOOL",
BoolValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<int> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int32Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I4", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I4",
Int32Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<long> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int64Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I8", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I8",
Int64Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<float> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new FloatArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Float, "VT_ARRAY|VT_R4", values.Count, new MxArray
{
ElementDataType = MxDataType.Float,
VariantType = "VT_ARRAY|VT_R4",
FloatValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<double> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new DoubleArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Double, "VT_ARRAY|VT_R8", values.Count, new MxArray
{
ElementDataType = MxDataType.Double,
VariantType = "VT_ARRAY|VT_R8",
DoubleValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<string> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new StringArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.String, "VT_ARRAY|VT_BSTR", values.Count, new MxArray
{
ElementDataType = MxDataType.String,
VariantType = "VT_ARRAY|VT_BSTR",
StringValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<DateTimeOffset> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new TimestampArray();
array.Values.Add(values.Select(Timestamp.FromDateTimeOffset));
return CreateArrayValue(MxDataType.Time, "VT_ARRAY|VT_DATE", values.Count, new MxArray
{
ElementDataType = MxDataType.Time,
VariantType = "VT_ARRAY|VT_DATE",
TimestampValues = array,
});
}
public static string GetProjectionKind(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => "boolValue",
MxValue.KindOneofCase.Int32Value => "int32Value",
MxValue.KindOneofCase.Int64Value => "int64Value",
MxValue.KindOneofCase.FloatValue => "floatValue",
MxValue.KindOneofCase.DoubleValue => "doubleValue",
MxValue.KindOneofCase.StringValue => "stringValue",
MxValue.KindOneofCase.TimestampValue => "timestampValue",
MxValue.KindOneofCase.ArrayValue => "arrayValue",
MxValue.KindOneofCase.RawValue => "rawValue",
_ => value.IsNull ? "nullValue" : "unspecified",
};
}
public static object? ToClrValue(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => value.BoolValue,
MxValue.KindOneofCase.Int32Value => value.Int32Value,
MxValue.KindOneofCase.Int64Value => value.Int64Value,
MxValue.KindOneofCase.FloatValue => value.FloatValue,
MxValue.KindOneofCase.DoubleValue => value.DoubleValue,
MxValue.KindOneofCase.StringValue => value.StringValue,
MxValue.KindOneofCase.TimestampValue => value.TimestampValue.ToDateTimeOffset(),
MxValue.KindOneofCase.ArrayValue => value.ArrayValue.ToClrArrayValue(),
MxValue.KindOneofCase.RawValue => value.RawValue.ToByteArray(),
_ => value.IsNull ? null : value,
};
}
public static object? ToClrArrayValue(this MxArray array)
{
ArgumentNullException.ThrowIfNull(array);
return array.ValuesCase switch
{
MxArray.ValuesOneofCase.BoolValues => array.BoolValues.Values.ToArray(),
MxArray.ValuesOneofCase.Int32Values => array.Int32Values.Values.ToArray(),
MxArray.ValuesOneofCase.Int64Values => array.Int64Values.Values.ToArray(),
MxArray.ValuesOneofCase.FloatValues => array.FloatValues.Values.ToArray(),
MxArray.ValuesOneofCase.DoubleValues => array.DoubleValues.Values.ToArray(),
MxArray.ValuesOneofCase.StringValues => array.StringValues.Values.ToArray(),
MxArray.ValuesOneofCase.TimestampValues => array.TimestampValues.Values
.Select(timestamp => timestamp.ToDateTimeOffset())
.ToArray(),
MxArray.ValuesOneofCase.RawValues => array.RawValues.Values
.Select(value => value.ToByteArray())
.ToArray(),
_ => null,
};
}
public static MxValue ToRawMxValue(
byte[] value,
string variantType,
string rawDiagnostic,
int rawDataType = 0)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.Unknown,
VariantType = variantType,
RawDiagnostic = rawDiagnostic,
RawDataType = rawDataType,
RawValue = ByteString.CopyFrom(value),
};
}
private static MxValue CreateArrayValue(
MxDataType dataType,
string variantType,
int length,
MxArray array)
{
array.Dimensions.Add((uint)length);
return new MxValue
{
DataType = dataType,
VariantType = variantType,
ArrayValue = array,
};
}
}
+46
View File
@@ -63,3 +63,49 @@ complete `MxCommandReply`.
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
the first `CloseSessionReply` instead of sending another close request.
## Values, Status, And Errors
The client provides extension helpers for generated protobuf values. Use
`ToMxValue()` on .NET scalar values and typed arrays to create `MxValue`
instances for `Write` and `Write2`. Use `ToClrValue()` and
`GetProjectionKind()` when test or diagnostic code needs to inspect generated
`MxValue` replies while preserving `rawDiagnostic`, raw data type fields, and
raw byte payloads.
`MxStatusProxy.IsSuccess()` and `ToDiagnosticSummary()` expose MXAccess status
arrays without collapsing them into a single gateway success flag. Command
reply helpers follow the same split:
```csharp
reply.EnsureProtocolSuccess();
reply.EnsureMxAccessSuccess();
```
`EnsureProtocolSuccess()` raises gateway, session, worker, or command
exceptions for gateway-level failures. It leaves
`PROTOCOL_STATUS_CODE_MXACCESS_FAILURE` to `EnsureMxAccessSuccess()` so callers
can keep the full `MxCommandReply`, HRESULT, and status array when MXAccess
itself rejects a command. `MxAccessException.Reply` contains the raw generated
reply.
## CLI Usage
The test CLI supports deterministic JSON output for automation:
```powershell
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- version --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- open-session --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- register --session-id <id> --client-name mxgw-dotnet-cli --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- add-item --session-id <id> --server-handle 1 --item Area001.Pump001.Speed --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- advise --session-id <id> --server-handle 1 --item-handle 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write2 --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --timestamp 2026-01-01T00:00:00Z --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- stream-events --session-id <id> --max-events 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item Area001.Pump001.Speed --json
```
`smoke` opens a session, registers a client, adds one item, advises it,
optionally writes a value when `--type` and `--value` are supplied, reads a
bounded event stream, and closes the session in a `finally` block. CLI error
output redacts API keys supplied through `--api-key`.
+57
View File
@@ -0,0 +1,57 @@
# Python Client
The Python client package contains generated MXAccess Gateway protobuf
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
scaffold. The package uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
## Layout
```text
clients/python/
pyproject.toml
generate-proto.ps1
src/mxgateway/
src/mxgateway/generated/
src/mxgateway_cli/
tests/
```
`src/mxgateway/generated` contains code produced by `grpc_tools.protoc`. Do not
edit generated files by hand.
## Regenerating Protobuf Bindings
Run generation after the shared `.proto` files or the Python output path
changes:
```powershell
./generate-proto.ps1
```
The script uses the Python tool path recorded in
`../../docs/toolchain-links.md`.
## Build And Test
Run the Python checks from `clients/python`:
```powershell
python -m pip install -e ".[dev]"
python -m pytest
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
```
The scaffold tests import the generated gateway and worker stubs and exercise
the deterministic CLI version output.
## CLI
The scaffold CLI exposes version information:
```powershell
mxgw-py version --json
```
Additional commands are implemented with the async client/session wrapper work.
+22
View File
@@ -0,0 +1,22 @@
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
$outputRoot = Join-Path $PSScriptRoot 'src\mxgateway\generated'
$python = 'C:\Users\dohertj2\AppData\Local\Programs\Python\Python312\python.exe'
if (-not (Test-Path $python)) {
throw "Python was not found at $python. See docs/toolchain-links.md."
}
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2.py') -File | Remove-Item
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2_grpc.py') -File | Remove-Item
& $python -m grpc_tools.protoc `
"-I$protoRoot" `
"--python_out=$outputRoot" `
"--grpc_python_out=$outputRoot" `
mxaccess_gateway.proto `
mxaccess_worker.proto
+33
View File
@@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "mxaccess-gateway-client"
version = "0.1.0"
description = "Async Python client scaffold for MXAccess Gateway."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"click>=8.3,<9",
"grpcio>=1.80,<2",
"protobuf>=6.33,<7",
]
[project.optional-dependencies]
dev = [
"grpcio-tools>=1.80,<2",
"pytest>=9,<10",
"pytest-asyncio>=1.3,<2",
]
[project.scripts]
mxgw-py = "mxgateway_cli.commands:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
addopts = "-ra"
pythonpath = ["src"]
testpaths = ["tests"]
+5
View File
@@ -0,0 +1,5 @@
"""MXAccess Gateway Python client package."""
from .version import __version__
__all__ = ["__version__"]
@@ -0,0 +1,29 @@
"""Generated protobuf and gRPC modules for MXAccess Gateway.
The Python protobuf generator emits absolute imports between generated modules.
This package initializer registers package-local aliases so callers can import
the generated stubs through `mxgateway.generated` without moving the modules to
the top-level import namespace.
"""
from importlib import import_module
import sys
mxaccess_gateway_pb2 = import_module(f"{__name__}.mxaccess_gateway_pb2")
sys.modules.setdefault("mxaccess_gateway_pb2", mxaccess_gateway_pb2)
mxaccess_gateway_pb2_grpc = import_module(f"{__name__}.mxaccess_gateway_pb2_grpc")
sys.modules.setdefault("mxaccess_gateway_pb2_grpc", mxaccess_gateway_pb2_grpc)
mxaccess_worker_pb2 = import_module(f"{__name__}.mxaccess_worker_pb2")
sys.modules.setdefault("mxaccess_worker_pb2", mxaccess_worker_pb2)
mxaccess_worker_pb2_grpc = import_module(f"{__name__}.mxaccess_worker_pb2_grpc")
sys.modules.setdefault("mxaccess_worker_pb2_grpc", mxaccess_worker_pb2_grpc)
__all__ = [
"mxaccess_gateway_pb2",
"mxaccess_gateway_pb2_grpc",
"mxaccess_worker_pb2",
"mxaccess_worker_pb2_grpc",
]
File diff suppressed because one or more lines are too long
@@ -0,0 +1,229 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_gateway_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class MxAccessGatewayStub(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.OpenSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
request_serializer=mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.OpenSessionReply.FromString,
_registered_method=True)
self.CloseSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
request_serializer=mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.CloseSessionReply.FromString,
_registered_method=True)
self.Invoke = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
request_serializer=mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxCommandReply.FromString,
_registered_method=True)
self.StreamEvents = channel.unary_stream(
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
request_serializer=mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxEvent.FromString,
_registered_method=True)
class MxAccessGatewayServicer(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def OpenSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CloseSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Invoke(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamEvents(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MxAccessGatewayServicer_to_server(servicer, server):
rpc_method_handlers = {
'OpenSession': grpc.unary_unary_rpc_method_handler(
servicer.OpenSession,
request_deserializer=mxaccess__gateway__pb2.OpenSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.OpenSessionReply.SerializeToString,
),
'CloseSession': grpc.unary_unary_rpc_method_handler(
servicer.CloseSession,
request_deserializer=mxaccess__gateway__pb2.CloseSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.CloseSessionReply.SerializeToString,
),
'Invoke': grpc.unary_unary_rpc_method_handler(
servicer.Invoke,
request_deserializer=mxaccess__gateway__pb2.MxCommandRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxCommandReply.SerializeToString,
),
'StreamEvents': grpc.unary_stream_rpc_method_handler(
servicer.StreamEvents,
request_deserializer=mxaccess__gateway__pb2.StreamEventsRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxEvent.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class MxAccessGateway(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
@staticmethod
def OpenSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
mxaccess__gateway__pb2.OpenSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CloseSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
mxaccess__gateway__pb2.CloseSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def Invoke(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
mxaccess__gateway__pb2.MxCommandReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamEvents(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
mxaccess__gateway__pb2.MxEvent.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: mxaccess_worker.proto
# Protobuf Python Version: 6.31.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
6,
31,
1,
'',
'mxaccess_worker.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15mxaccess_worker.proto\x12\x12mxaccess_worker.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16mxaccess_gateway.proto\"\x95\x06\n\x0eWorkerEnvelope\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\x12\n\nsession_id\x18\x02 \x01(\t\x12\x10\n\x08sequence\x18\x03 \x01(\x04\x12\x16\n\x0e\x63orrelation_id\x18\x04 \x01(\t\x12\x39\n\rgateway_hello\x18\n \x01(\x0b\x32 .mxaccess_worker.v1.GatewayHelloH\x00\x12\x37\n\x0cworker_hello\x18\x0b \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerHelloH\x00\x12\x37\n\x0cworker_ready\x18\x0c \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerReadyH\x00\x12;\n\x0eworker_command\x18\r \x01(\x0b\x32!.mxaccess_worker.v1.WorkerCommandH\x00\x12\x46\n\x14worker_command_reply\x18\x0e \x01(\x0b\x32&.mxaccess_worker.v1.WorkerCommandReplyH\x00\x12\x39\n\rworker_cancel\x18\x0f \x01(\x0b\x32 .mxaccess_worker.v1.WorkerCancelH\x00\x12=\n\x0fworker_shutdown\x18\x10 \x01(\x0b\x32\".mxaccess_worker.v1.WorkerShutdownH\x00\x12\x44\n\x13worker_shutdown_ack\x18\x11 \x01(\x0b\x32%.mxaccess_worker.v1.WorkerShutdownAckH\x00\x12\x37\n\x0cworker_event\x18\x12 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerEventH\x00\x12?\n\x10worker_heartbeat\x18\x13 \x01(\x0b\x32#.mxaccess_worker.v1.WorkerHeartbeatH\x00\x12\x37\n\x0cworker_fault\x18\x14 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerFaultH\x00\x42\x06\n\x04\x62ody\"Z\n\x0cGatewayHello\x12\"\n\x1asupported_protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x17\n\x0fgateway_version\x18\x03 \x01(\t\"i\n\x0bWorkerHello\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x19\n\x11worker_process_id\x18\x03 \x01(\x05\x12\x16\n\x0eworker_version\x18\x04 \x01(\t\"\x8e\x01\n\x0bWorkerReady\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12\x17\n\x0fmxaccess_progid\x18\x02 \x01(\t\x12\x16\n\x0emxaccess_clsid\x18\x03 \x01(\t\x12\x33\n\x0fready_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"w\n\rWorkerCommand\x12/\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x1e.mxaccess_gateway.v1.MxCommand\x12\x35\n\x11\x65nqueue_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x81\x01\n\x12WorkerCommandReply\x12\x32\n\x05reply\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.MxCommandReply\x12\x37\n\x13\x63ompleted_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x0cWorkerCancel\x12\x0e\n\x06reason\x18\x01 \x01(\t\"Q\n\x0eWorkerShutdown\x12/\n\x0cgrace_period\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x0e\n\x06reason\x18\x02 \x01(\t\"H\n\x11WorkerShutdownAck\x12\x33\n\x06status\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatus\":\n\x0bWorkerEvent\x12+\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.mxaccess_gateway.v1.MxEvent\"\xa5\x02\n\x0fWorkerHeartbeat\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12.\n\x05state\x18\x02 \x01(\x0e\x32\x1f.mxaccess_worker.v1.WorkerState\x12?\n\x1blast_sta_activity_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1d\n\x15pending_command_count\x18\x04 \x01(\r\x12\"\n\x1aoutbound_event_queue_depth\x18\x05 \x01(\r\x12\x1b\n\x13last_event_sequence\x18\x06 \x01(\x04\x12&\n\x1e\x63urrent_command_correlation_id\x18\x07 \x01(\t\"\xf4\x01\n\x0bWorkerFault\x12\x39\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\'.mxaccess_worker.v1.WorkerFaultCategory\x12\x16\n\x0e\x63ommand_method\x18\x02 \x01(\t\x12\x14\n\x07hresult\x18\x03 \x01(\x05H\x00\x88\x01\x01\x12\x16\n\x0e\x65xception_type\x18\x04 \x01(\t\x12\x1a\n\x12\x64iagnostic_message\x18\x05 \x01(\t\x12<\n\x0fprotocol_status\x18\x06 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatusB\n\n\x08_hresult*\x97\x02\n\x0bWorkerState\x12\x1c\n\x18WORKER_STATE_UNSPECIFIED\x10\x00\x12\x19\n\x15WORKER_STATE_STARTING\x10\x01\x12\x1c\n\x18WORKER_STATE_HANDSHAKING\x10\x02\x12!\n\x1dWORKER_STATE_INITIALIZING_STA\x10\x03\x12\x16\n\x12WORKER_STATE_READY\x10\x04\x12\"\n\x1eWORKER_STATE_EXECUTING_COMMAND\x10\x05\x12\x1e\n\x1aWORKER_STATE_SHUTTING_DOWN\x10\x06\x12\x18\n\x14WORKER_STATE_STOPPED\x10\x07\x12\x18\n\x14WORKER_STATE_FAULTED\x10\x08*\xc7\x04\n\x13WorkerFaultCategory\x12%\n!WORKER_FAULT_CATEGORY_UNSPECIFIED\x10\x00\x12+\n\'WORKER_FAULT_CATEGORY_INVALID_ARGUMENTS\x10\x01\x12\x37\n3WORKER_FAULT_CATEGORY_GATEWAY_AUTHENTICATION_FAILED\x10\x02\x12+\n\'WORKER_FAULT_CATEGORY_PROTOCOL_MISMATCH\x10\x03\x12,\n(WORKER_FAULT_CATEGORY_PROTOCOL_VIOLATION\x10\x04\x12+\n\'WORKER_FAULT_CATEGORY_PIPE_DISCONNECTED\x10\x05\x12\x32\n.WORKER_FAULT_CATEGORY_MXACCESS_CREATION_FAILED\x10\x06\x12\x31\n-WORKER_FAULT_CATEGORY_MXACCESS_COMMAND_FAILED\x10\x07\x12:\n6WORKER_FAULT_CATEGORY_MXACCESS_EVENT_CONVERSION_FAILED\x10\x08\x12\"\n\x1eWORKER_FAULT_CATEGORY_STA_HUNG\x10\t\x12(\n$WORKER_FAULT_CATEGORY_QUEUE_OVERFLOW\x10\n\x12*\n&WORKER_FAULT_CATEGORY_SHUTDOWN_TIMEOUT\x10\x0b\x42\x1c\xaa\x02\x19MxGateway.Contracts.Protob\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'mxaccess_worker_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\252\002\031MxGateway.Contracts.Proto'
_globals['_WORKERSTATE']._serialized_start=2316
_globals['_WORKERSTATE']._serialized_end=2595
_globals['_WORKERFAULTCATEGORY']._serialized_start=2598
_globals['_WORKERFAULTCATEGORY']._serialized_end=3181
_globals['_WORKERENVELOPE']._serialized_start=135
_globals['_WORKERENVELOPE']._serialized_end=924
_globals['_GATEWAYHELLO']._serialized_start=926
_globals['_GATEWAYHELLO']._serialized_end=1016
_globals['_WORKERHELLO']._serialized_start=1018
_globals['_WORKERHELLO']._serialized_end=1123
_globals['_WORKERREADY']._serialized_start=1126
_globals['_WORKERREADY']._serialized_end=1268
_globals['_WORKERCOMMAND']._serialized_start=1270
_globals['_WORKERCOMMAND']._serialized_end=1389
_globals['_WORKERCOMMANDREPLY']._serialized_start=1392
_globals['_WORKERCOMMANDREPLY']._serialized_end=1521
_globals['_WORKERCANCEL']._serialized_start=1523
_globals['_WORKERCANCEL']._serialized_end=1553
_globals['_WORKERSHUTDOWN']._serialized_start=1555
_globals['_WORKERSHUTDOWN']._serialized_end=1636
_globals['_WORKERSHUTDOWNACK']._serialized_start=1638
_globals['_WORKERSHUTDOWNACK']._serialized_end=1710
_globals['_WORKEREVENT']._serialized_start=1712
_globals['_WORKEREVENT']._serialized_end=1770
_globals['_WORKERHEARTBEAT']._serialized_start=1773
_globals['_WORKERHEARTBEAT']._serialized_end=2066
_globals['_WORKERFAULT']._serialized_start=2069
_globals['_WORKERFAULT']._serialized_end=2313
# @@protoc_insertion_point(module_scope)
@@ -0,0 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_worker_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
+3
View File
@@ -0,0 +1,3 @@
"""Package version information."""
__version__ = "0.1.0"
@@ -0,0 +1 @@
"""Command-line entry points for the MXAccess Gateway Python client."""
@@ -0,0 +1,6 @@
"""Module execution entry point for `python -m mxgateway_cli`."""
from .commands import main
if __name__ == "__main__":
main()
@@ -0,0 +1,29 @@
"""CLI scaffold for the MXAccess Gateway Python client."""
import json
import click
from mxgateway import __version__
@click.group()
def main() -> None:
"""MXAccess Gateway Python test CLI."""
@main.command()
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def version(output_json: bool) -> None:
"""Print client package version information."""
payload = {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
if output_json:
click.echo(json.dumps(payload, sort_keys=True))
return
click.echo(f"mxgw-py {__version__}")
+21
View File
@@ -0,0 +1,21 @@
"""Tests for the Python CLI scaffold."""
import json
from click.testing import CliRunner
from mxgateway import __version__
from mxgateway_cli.commands import main
def test_version_json_is_deterministic() -> None:
runner = CliRunner()
result = runner.invoke(main, ["version", "--json"])
assert result.exit_code == 0
assert json.loads(result.output) == {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
@@ -0,0 +1,30 @@
"""Tests for generated protobuf and gRPC module importability."""
from mxgateway.generated import mxaccess_gateway_pb2
from mxgateway.generated import mxaccess_gateway_pb2_grpc
from mxgateway.generated import mxaccess_worker_pb2
def test_gateway_messages_import() -> None:
request = mxaccess_gateway_pb2.OpenSessionRequest(
client_session_name="pytest",
client_correlation_id="test-correlation",
)
assert request.client_session_name == "pytest"
assert hasattr(mxaccess_gateway_pb2_grpc, "MxAccessGatewayStub")
def test_worker_messages_import_gateway_types() -> None:
envelope = mxaccess_worker_pb2.WorkerEnvelope(
protocol_version=1,
session_id="test-session",
worker_command=mxaccess_worker_pb2.WorkerCommand(
command=mxaccess_gateway_pb2.MxCommand(
kind=mxaccess_gateway_pb2.MX_COMMAND_KIND_PING,
ping=mxaccess_gateway_pb2.PingCommand(message="hello"),
),
),
)
assert envelope.worker_command.command.ping.message == "hello"
+131 -1
View File
@@ -145,6 +145,16 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cc"
version = "1.2.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d"
dependencies = [
"find-msvc-tools",
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.4"
@@ -225,6 +235,12 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "fixedbitset"
version = "0.5.7"
@@ -258,6 +274,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@@ -277,11 +304,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "getrandom"
version = "0.4.2"
@@ -537,11 +576,14 @@ checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
name = "mxgateway-client"
version = "0.1.0"
dependencies = [
"futures-core",
"futures-util",
"prost",
"prost-types",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
]
@@ -551,8 +593,11 @@ name = "mxgw-cli"
version = "0.1.0"
dependencies = [
"clap",
"futures-util",
"mxgateway-client",
"serde",
"serde_json",
"tokio",
]
[[package]]
@@ -724,6 +769,20 @@ version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.17",
"libc",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.1.4"
@@ -737,6 +796,41 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "rustls"
version = "0.23.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pki-types"
version = "1.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "semver"
version = "1.0.28"
@@ -750,6 +844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
@@ -785,6 +880,12 @@ dependencies = [
"zmij",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "slab"
version = "0.4.12"
@@ -823,6 +924,12 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.117"
@@ -847,7 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -899,6 +1006,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
dependencies = [
"rustls",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
@@ -945,6 +1062,7 @@ dependencies = [
"prost",
"socket2 0.5.10",
"tokio",
"tokio-rustls",
"tokio-stream",
"tower",
"tower-layer",
@@ -1046,6 +1164,12 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf8parse"
version = "0.2.2"
@@ -1301,6 +1425,12 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "zeroize"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
[[package]]
name = "zmij"
version = "1.0.21"
+9 -2
View File
@@ -16,24 +16,31 @@ publish = false
[workspace.dependencies]
clap = { version = "4.5.53", features = ["derive"] }
futures-core = "0.3.31"
futures-util = "0.3.31"
prost = "0.13.5"
prost-types = "0.13.5"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["transport"] }
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "sync", "time"] }
tokio-stream = { version = "0.1.17", features = ["net"] }
tonic = { version = "0.13.1", features = ["transport", "tls-ring"] }
tonic-build = "0.13.1"
[dependencies]
futures-core = { workspace = true }
futures-util = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }
+40 -3
View File
@@ -1,7 +1,8 @@
# Rust Client Workspace
The Rust client workspace contains the MXAccess Gateway client library, a
test CLI, and scaffold tests for generated contract wiring. The library uses
test CLI, and tests for generated contract wiring plus wrapper behavior. The
library uses
the shared protobuf inputs documented in
`../../docs/client-proto-generation.md` so the Rust bindings compile against
the same public gateway and worker contracts as the server.
@@ -31,6 +32,7 @@ Run the Rust workspace checks from `clients/rust`:
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
cargo clippy --workspace --all-targets -- -D warnings
```
The build script uses `protoc` from `PATH` or the Windows path recorded in
@@ -38,13 +40,48 @@ The build script uses `protoc` from `PATH` or the Windows path recorded in
## CLI
The scaffold CLI exposes version information:
The CLI exposes version, session, command, event stream, write, and smoke
commands over the same client wrapper used by tests:
```powershell
cargo run -p mxgw-cli -- version --json
cargo run -p mxgw-cli -- open-session --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
cargo run -p mxgw-cli -- register --session-id <session-id> --client-name mxgw-rust-cli --json
cargo run -p mxgw-cli -- add-item --session-id <session-id> --server-handle 1 --item TestChildObject.TestInt --json
cargo run -p mxgw-cli -- advise --session-id <session-id> --server-handle 1 --item-handle 1 --json
cargo run -p mxgw-cli -- stream-events --session-id <session-id> --max-events 1 --json
cargo run -p mxgw-cli -- write --session-id <session-id> --server-handle 1 --item-handle 1 --value-type int32 --value 123 --json
```
Additional commands are implemented with the client/session wrapper work.
Use `--tls`, `--ca-file`, and `--server-name-override` for TLS endpoints. The
CLI reads the API key from `--api-key` or from `--api-key-env`, which defaults
to `MXGATEWAY_API_KEY`. API keys are redacted by the library option and secret
types.
## Library Surface
`ClientOptions` configures endpoint, API key, plaintext or TLS transport,
timeouts, custom CA files, and server name override. `GatewayClient::connect`
creates an authenticated `tonic` client and attaches `authorization: Bearer
<api-key>` metadata to unary and streaming calls.
`GatewayClient` exposes raw generated calls through `open_session_raw`,
`close_session_raw`, `invoke_raw`, `stream_events`, and `raw_client`. The
session helpers keep MXAccess handles visible:
```rust
let session = client.open_session(request).await?;
let server_handle = session.register("mxgw-rust").await?;
let item_handle = session.add_item(server_handle, "TestChildObject.TestInt").await?;
session.advise(server_handle, item_handle).await?;
let mut events = session.events().await?;
session.close().await?;
```
`MxValue`, `MxArrayValue`, and `MxStatus` wrap generated protobuf messages while
preserving the raw message for parity diagnostics. Command replies whose
protocol status is not `PROTOCOL_STATUS_CODE_OK` become `Error::Command` and
retain the raw `MxCommandReply`.
## Related Documentation
+1 -1
View File
@@ -19,7 +19,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("cargo:rerun-if-changed={}", worker_proto.display());
tonic_build::configure()
.build_server(false)
.build_server(true)
.build_client(true)
.file_descriptor_set_path(descriptor_path)
.compile_protos(
+3
View File
@@ -10,5 +10,8 @@ path = "src/main.rs"
[dependencies]
clap = { workspace = true }
futures-util = { workspace = true }
mxgateway-client = { path = "../.." }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
+518 -14
View File
@@ -1,8 +1,20 @@
use std::env;
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
use clap::{Parser, Subcommand};
use mxgateway_client::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest,
PingCommand, StreamEventsRequest,
};
use mxgateway_client::{
ApiKey, ClientOptions, Error, GatewayClient, MxValue, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION,
WORKER_PROTOCOL_VERSION,
};
use serde_json::json;
use serde_json::Value;
#[derive(Debug, Parser)]
#[command(name = "mxgw")]
@@ -18,30 +30,428 @@ enum Command {
#[arg(long)]
json: bool,
},
Ping {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "ping")]
message: String,
#[arg(long)]
json: bool,
},
OpenSession {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "mxgw-rust-cli")]
client_name: String,
#[arg(long)]
json: bool,
},
CloseSession {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
json: bool,
},
Register {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long, default_value = "mxgw-rust-cli")]
client_name: String,
#[arg(long)]
json: bool,
},
AddItem {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item: String,
#[arg(long)]
json: bool,
},
Advise {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long)]
json: bool,
},
StreamEvents {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long, default_value_t = 0)]
after_worker_sequence: u64,
#[arg(long, default_value_t = 1)]
max_events: usize,
#[arg(long)]
json: bool,
},
Write {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long)]
value: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
Write2 {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long)]
value: String,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
Smoke {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
item: String,
#[arg(long, default_value = "mxgw-rust-smoke")]
client_name: String,
#[arg(long)]
json: bool,
},
}
fn main() -> ExitCode {
#[derive(Debug, Args, Clone)]
struct ConnectionArgs {
#[arg(long, default_value = "http://127.0.0.1:5000")]
endpoint: String,
#[arg(long)]
api_key: Option<String>,
#[arg(long, default_value = "MXGATEWAY_API_KEY")]
api_key_env: String,
#[arg(long)]
plaintext: bool,
#[arg(long)]
tls: bool,
#[arg(long)]
ca_file: Option<PathBuf>,
#[arg(long)]
server_name_override: Option<String>,
#[arg(long, default_value_t = 10)]
connect_timeout_seconds: u64,
#[arg(long, default_value_t = 30)]
call_timeout_seconds: u64,
}
impl ConnectionArgs {
fn options(&self) -> ClientOptions {
let mut options = ClientOptions::new(self.endpoint.clone())
.with_plaintext(!self.tls || self.plaintext)
.with_connect_timeout(Duration::from_secs(self.connect_timeout_seconds))
.with_call_timeout(Duration::from_secs(self.call_timeout_seconds));
if let Some(api_key) = self
.api_key
.clone()
.or_else(|| env::var(&self.api_key_env).ok())
.filter(|value| !value.is_empty())
{
options = options.with_api_key(ApiKey::new(api_key));
}
if let Some(ca_file) = &self.ca_file {
options = options.with_ca_file(ca_file);
}
if let Some(server_name_override) = &self.server_name_override {
options = options.with_server_name_override(server_name_override);
}
options
}
}
#[derive(Clone, Copy, Debug, ValueEnum)]
enum CliValueType {
Bool,
Int32,
Int64,
Float,
Double,
String,
}
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
run(cli);
ExitCode::SUCCESS
match run(cli).await {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
eprintln!("{error}");
ExitCode::FAILURE
}
}
}
fn run(cli: Cli) {
async fn run(cli: Cli) -> Result<(), Error> {
match cli.command {
Command::Version { json } => print_version(json),
Command::Ping {
connection,
message,
json,
} => {
let client = connect(connection).await?;
let reply = client
.invoke(MxCommandRequest {
client_correlation_id: "rust-cli-ping".to_owned(),
command: Some(MxCommand {
kind: MxCommandKind::Ping as i32,
payload: Some(mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping(
PingCommand { message },
)),
}),
..MxCommandRequest::default()
})
.await?;
print_command_reply("ping", &reply, json);
}
Command::OpenSession {
connection,
client_name,
json,
} => {
let client = connect(connection).await?;
let reply = client
.open_session_raw(OpenSessionRequest {
client_session_name: client_name,
..OpenSessionRequest::default()
})
.await?;
if json {
println!(
"{}",
json!({
"sessionId": reply.session_id,
"backendName": reply.backend_name,
"gatewayProtocolVersion": reply.gateway_protocol_version,
"workerProtocolVersion": reply.worker_protocol_version,
})
);
} else {
println!("{}", reply.session_id);
}
}
Command::CloseSession {
connection,
session_id,
json,
} => {
let client = connect(connection).await?;
let reply = client
.close_session_raw(CloseSessionRequest {
session_id,
client_correlation_id: "rust-cli-close-session".to_owned(),
})
.await?;
if json {
println!("{}", json!({ "sessionId": reply.session_id }));
} else {
println!("closed {}", reply.session_id);
}
}
Command::Register {
connection,
session_id,
client_name,
json,
} => {
let session = session_for(connection, session_id).await?;
let server_handle = session.register(&client_name).await?;
print_handle("serverHandle", server_handle, json);
}
Command::AddItem {
connection,
session_id,
server_handle,
item,
json,
} => {
let session = session_for(connection, session_id).await?;
let item_handle = session.add_item(server_handle, &item).await?;
print_handle("itemHandle", item_handle, json);
}
Command::Advise {
connection,
session_id,
server_handle,
item_handle,
json,
} => {
let session = session_for(connection, session_id).await?;
session.advise(server_handle, item_handle).await?;
print_ok("advise", json);
}
Command::StreamEvents {
connection,
session_id,
after_worker_sequence,
max_events,
json,
} => {
let client = connect(connection).await?;
let mut stream = client
.stream_events(StreamEventsRequest {
session_id,
after_worker_sequence,
})
.await?;
let mut events = Vec::new();
while events.len() < max_events {
let Some(event) = stream.next().await else {
break;
};
events.push(event?);
}
if json {
println!("{}", json!({ "eventCount": events.len() }));
} else {
for event in events {
println!("{} {}", event.worker_sequence, event.family);
}
}
}
Command::Write {
connection,
session_id,
server_handle,
item_handle,
value_type,
value,
user_id,
json,
} => {
let session = session_for(connection, session_id).await?;
session
.write(
server_handle,
item_handle,
parse_value(value_type, &value)?,
user_id,
)
.await?;
print_ok("write", json);
}
Command::Write2 {
connection,
session_id,
server_handle,
item_handle,
value_type,
value,
timestamp,
user_id,
json,
} => {
let session = session_for(connection, session_id).await?;
session
.write2(
server_handle,
item_handle,
parse_value(value_type, &value)?,
MxValue::string(timestamp),
user_id,
)
.await?;
print_ok("write2", json);
}
Command::Smoke {
connection,
item,
client_name,
json,
} => {
let client = connect(connection).await?;
let session = client
.open_session(OpenSessionRequest {
client_session_name: client_name.clone(),
..OpenSessionRequest::default()
})
.await?;
let result = async {
let server_handle = session.register(&client_name).await?;
let item_handle = session.add_item(server_handle, &item).await?;
session.advise(server_handle, item_handle).await?;
Ok::<_, Error>((server_handle, item_handle))
}
.await;
let close_result = session.close().await;
let (server_handle, item_handle) = result?;
close_result?;
if json {
println!(
"{}",
json!({
"sessionId": session.id(),
"serverHandle": server_handle,
"itemHandle": item_handle,
"closed": true,
})
);
} else {
println!(
"session {} registered server {server_handle}, item {item_handle}, closed",
session.id()
);
}
}
}
Ok(())
}
async fn connect(connection: ConnectionArgs) -> Result<GatewayClient, Error> {
GatewayClient::connect(connection.options()).await
}
async fn session_for(
connection: ConnectionArgs,
session_id: String,
) -> Result<mxgateway_client::Session, Error> {
let client = connect(connection).await?;
Ok(client.session(session_id))
}
fn print_version(use_json: bool) {
if use_json {
println!(
"{}",
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
);
println!("{}", version_json());
return;
}
@@ -50,6 +460,73 @@ fn print_version(use_json: bool) {
println!("worker protocol {WORKER_PROTOCOL_VERSION}");
}
fn version_json() -> Value {
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
}
fn print_command_reply(
operation: &str,
reply: &mxgateway_client::generated::mxaccess_gateway::v1::MxCommandReply,
use_json: bool,
) {
if use_json {
println!(
"{}",
json!({
"operation": operation,
"sessionId": reply.session_id,
"correlationId": reply.correlation_id,
"kind": reply.kind,
})
);
} else {
println!("{operation} completed");
}
}
fn print_handle(name: &str, handle: i32, use_json: bool) {
if use_json {
println!("{}", json!({ name: handle }));
} else {
println!("{handle}");
}
}
fn print_ok(operation: &str, use_json: bool) {
if use_json {
println!("{}", json!({ "operation": operation, "ok": true }));
} else {
println!("{operation} completed");
}
}
fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error> {
let parsed = match value_type {
CliValueType::Bool => MxValue::bool(parse_cli_value(value)?),
CliValueType::Int32 => MxValue::int32(parse_cli_value(value)?),
CliValueType::Int64 => MxValue::int64(parse_cli_value(value)?),
CliValueType::Float => MxValue::float(parse_cli_value(value)?),
CliValueType::Double => MxValue::double(parse_cli_value(value)?),
CliValueType::String => MxValue::string(value),
};
Ok(parsed)
}
fn parse_cli_value<T>(value: &str) -> Result<T, Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
{
value.parse::<T>().map_err(|source| Error::InvalidArgument {
name: "value".to_owned(),
detail: source.to_string(),
})
}
#[cfg(test)]
mod tests {
use clap::Parser;
@@ -61,4 +538,31 @@ mod tests {
let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]);
assert!(parsed.is_ok());
}
#[test]
fn parses_write_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"write",
"--session-id",
"session-1",
"--server-handle",
"12",
"--item-handle",
"34",
"--value-type",
"int32",
"--value",
"123",
]);
assert!(parsed.is_ok());
}
#[test]
fn version_json_output_has_protocol_versions() {
let value = super::version_json();
assert_eq!(value["gatewayProtocolVersion"], 1);
assert_eq!(value["workerProtocolVersion"], 1);
}
}
+57
View File
@@ -1,5 +1,9 @@
use std::fmt;
use tonic::metadata::MetadataValue;
use tonic::service::Interceptor;
use tonic::{Request, Status};
/// API key wrapper that avoids exposing raw credentials in formatted output.
#[derive(Clone, Eq, PartialEq)]
pub struct ApiKey(String);
@@ -28,3 +32,56 @@ impl fmt::Display for ApiKey {
formatter.write_str("<redacted>")
}
}
/// `tonic` interceptor that attaches gateway API key metadata.
#[derive(Clone, Debug, Default)]
pub struct AuthInterceptor {
api_key: Option<ApiKey>,
}
impl AuthInterceptor {
pub fn new(api_key: Option<ApiKey>) -> Self {
Self { api_key }
}
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(api_key) = &self.api_key {
let header_value = format!("Bearer {}", api_key.expose_secret())
.parse::<MetadataValue<_>>()
.map_err(|_| Status::unauthenticated("invalid API key metadata"))?;
request.metadata_mut().insert("authorization", header_value);
}
Ok(request)
}
}
#[cfg(test)]
mod tests {
use tonic::service::Interceptor;
use tonic::Request;
use super::{ApiKey, AuthInterceptor};
#[test]
fn api_key_debug_is_redacted() {
let key = ApiKey::new("mxgw_visible_secret");
assert_eq!(format!("{key:?}"), "ApiKey(\"<redacted>\")");
assert!(!format!("{key:?}").contains("visible_secret"));
assert_eq!(key.to_string(), "<redacted>");
}
#[test]
fn interceptor_attaches_bearer_metadata() {
let mut interceptor = AuthInterceptor::new(Some(ApiKey::new("mxgw_fixture_secret")));
let request = interceptor.call(Request::new(())).unwrap();
assert_eq!(
request.metadata().get("authorization").unwrap(),
"Bearer mxgw_fixture_secret"
);
}
}
+103 -10
View File
@@ -1,30 +1,123 @@
use tonic::transport::Channel;
use std::fs;
use crate::error::Error;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::auth::AuthInterceptor;
use crate::error::{ensure_command_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::generated::mxaccess_gateway::v1::{
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
OpenSessionReply, OpenSessionRequest, StreamEventsRequest,
};
use crate::options::ClientOptions;
use crate::session::Session;
pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, AuthInterceptor>>;
pub type EventStream =
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
/// Thin owner for the generated gateway client.
#[derive(Clone)]
pub struct GatewayClient {
inner: MxAccessGatewayClient<Channel>,
inner: RawGatewayClient,
call_timeout: std::time::Duration,
}
impl GatewayClient {
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
let mut endpoint =
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
endpoint = endpoint.connect_timeout(options.connect_timeout());
if !options.plaintext() {
let mut tls = ClientTlsConfig::new();
if let Some(server_name) = options.server_name_override() {
tls = tls.domain_name(server_name.to_owned());
}
})?;
if let Some(ca_file) = options.ca_file() {
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
})?;
tls = tls.ca_certificate(Certificate::from_pem(certificate));
}
endpoint = endpoint.tls_config(tls)?;
}
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
Ok(Self {
inner: MxAccessGatewayClient::new(channel),
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor),
call_timeout: options.call_timeout(),
})
}
pub fn into_inner(self) -> MxAccessGatewayClient<Channel> {
pub fn raw_client(&mut self) -> &mut RawGatewayClient {
&mut self.inner
}
pub fn into_inner(self) -> RawGatewayClient {
self.inner
}
pub fn session(&self, session_id: impl Into<String>) -> Session {
Session::new(session_id, self.clone())
}
pub async fn open_session_raw(
&self,
request: OpenSessionRequest,
) -> Result<OpenSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.open_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn open_session(&self, request: OpenSessionRequest) -> Result<Session, Error> {
let reply = self.open_session_raw(request).await?;
Ok(Session::new(reply.session_id, self.clone()))
}
pub async fn close_session_raw(
&self,
request: CloseSessionRequest,
) -> Result<CloseSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.close_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
let mut client = self.inner.clone();
let response = client.invoke(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
ensure_command_success(self.invoke_raw(request).await?)
}
pub async fn stream_events(&self, request: StreamEventsRequest) -> Result<EventStream, Error> {
let mut client = self.inner.clone();
let response = client.stream_events(self.unary_request(request)).await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
fn unary_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
request.set_timeout(self.call_timeout);
request
}
}
+149 -1
View File
@@ -1,13 +1,161 @@
use thiserror::Error as ThisError;
use tonic::Code;
use crate::generated::mxaccess_gateway::v1::{MxCommandReply, ProtocolStatusCode};
#[derive(Debug, ThisError)]
pub enum Error {
#[error("invalid gateway endpoint `{endpoint}`: {detail}")]
InvalidEndpoint { endpoint: String, detail: String },
#[error("invalid argument `{name}`: {detail}")]
InvalidArgument { name: String, detail: String },
#[error("gateway transport error: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("authentication failed: {message}")]
Authentication {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("authorization failed: {message}")]
Authorization {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway call timed out: {message}")]
Timeout {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway call cancelled: {message}")]
Cancelled {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway status error: {0}")]
Status(#[from] tonic::Status),
Status(Box<tonic::Status>),
#[error("gateway command failed: {0}")]
Command(#[from] Box<CommandError>),
}
#[derive(Clone, Debug)]
pub struct CommandError {
reply: MxCommandReply,
}
impl CommandError {
pub fn new(reply: MxCommandReply) -> Self {
Self { reply }
}
pub fn reply(&self) -> &MxCommandReply {
&self.reply
}
pub fn into_reply(self) -> MxCommandReply {
self.reply
}
}
impl std::fmt::Display for CommandError {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = self.reply.protocol_status.as_ref();
let code = status
.and_then(|status| ProtocolStatusCode::try_from(status.code).ok())
.unwrap_or(ProtocolStatusCode::Unspecified);
let message = status.map(|status| status.message.as_str()).unwrap_or("");
if message.is_empty() {
write!(formatter, "{code:?}")
} else {
write!(formatter, "{code:?}: {message}")
}
}
}
impl std::error::Error for CommandError {}
impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
let message = redact_credentials(status.message());
match status.code() {
Code::Unauthenticated => Self::Authentication {
message,
status: Box::new(status),
},
Code::PermissionDenied => Self::Authorization {
message,
status: Box::new(status),
},
Code::DeadlineExceeded => Self::Timeout {
message,
status: Box::new(status),
},
Code::Cancelled => Self::Cancelled {
message,
status: Box::new(status),
},
_ => Self::Status(Box::new(status)),
}
}
}
pub fn ensure_command_success(reply: MxCommandReply) -> Result<MxCommandReply, Error> {
let code = reply
.protocol_status
.as_ref()
.and_then(|status| ProtocolStatusCode::try_from(status.code).ok())
.unwrap_or(ProtocolStatusCode::Unspecified);
if code == ProtocolStatusCode::Ok {
Ok(reply)
} else {
Err(Box::new(CommandError::new(reply)).into())
}
}
fn redact_credentials(message: &str) -> String {
message
.split_whitespace()
.map(|part| {
if part.starts_with("mxgw_") || part.eq_ignore_ascii_case("bearer") {
"<redacted>"
} else {
part
}
})
.collect::<Vec<_>>()
.join(" ")
}
#[cfg(test)]
mod tests {
use tonic::{Code, Status};
use super::Error;
#[test]
fn classifies_authentication_status() {
let error = Error::from(Status::new(
Code::Unauthenticated,
"invalid API key mxgw_visible_secret",
));
let message = error.to_string();
assert!(matches!(error, Error::Authentication { .. }));
assert!(message.contains("<redacted>"));
assert!(!message.contains("visible_secret"));
}
}
+4 -3
View File
@@ -13,9 +13,10 @@ pub mod session;
pub mod value;
pub mod version;
pub use auth::ApiKey;
pub use client::GatewayClient;
pub use error::Error;
pub use auth::{ApiKey, AuthInterceptor};
pub use client::{EventStream, GatewayClient};
pub use error::{CommandError, Error};
pub use options::ClientOptions;
pub use session::Session;
pub use value::{MxArrayProjection, MxArrayValue, MxStatus, MxValue, MxValueProjection};
pub use version::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
+72
View File
@@ -1,4 +1,6 @@
use std::fmt;
use std::path::PathBuf;
use std::time::Duration;
use crate::auth::ApiKey;
@@ -7,6 +9,10 @@ pub struct ClientOptions {
endpoint: String,
api_key: Option<ApiKey>,
plaintext: bool,
ca_file: Option<PathBuf>,
server_name_override: Option<String>,
connect_timeout: Duration,
call_timeout: Duration,
}
impl ClientOptions {
@@ -15,6 +21,10 @@ impl ClientOptions {
endpoint: endpoint.into(),
api_key: None,
plaintext: true,
ca_file: None,
server_name_override: None,
connect_timeout: Duration::from_secs(10),
call_timeout: Duration::from_secs(30),
}
}
@@ -23,6 +33,31 @@ impl ClientOptions {
self
}
pub fn with_plaintext(mut self, plaintext: bool) -> Self {
self.plaintext = plaintext;
self
}
pub fn with_ca_file(mut self, ca_file: impl Into<PathBuf>) -> Self {
self.ca_file = Some(ca_file.into());
self
}
pub fn with_server_name_override(mut self, server_name_override: impl Into<String>) -> Self {
self.server_name_override = Some(server_name_override.into());
self
}
pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.connect_timeout = connect_timeout;
self
}
pub fn with_call_timeout(mut self, call_timeout: Duration) -> Self {
self.call_timeout = call_timeout;
self
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
@@ -34,6 +69,22 @@ impl ClientOptions {
pub fn plaintext(&self) -> bool {
self.plaintext
}
pub fn ca_file(&self) -> Option<&PathBuf> {
self.ca_file.as_ref()
}
pub fn server_name_override(&self) -> Option<&str> {
self.server_name_override.as_deref()
}
pub fn connect_timeout(&self) -> Duration {
self.connect_timeout
}
pub fn call_timeout(&self) -> Duration {
self.call_timeout
}
}
impl Default for ClientOptions {
@@ -49,6 +100,27 @@ impl fmt::Debug for ClientOptions {
.field("endpoint", &self.endpoint)
.field("api_key", &self.api_key.as_ref().map(|_| "<redacted>"))
.field("plaintext", &self.plaintext)
.field("ca_file", &self.ca_file)
.field("server_name_override", &self.server_name_override)
.field("connect_timeout", &self.connect_timeout)
.field("call_timeout", &self.call_timeout)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::ClientOptions;
use crate::auth::ApiKey;
#[test]
fn debug_redacts_api_key() {
let options =
ClientOptions::new("http://localhost:5000").with_api_key(ApiKey::new("mxgw_secret"));
let debug = format!("{options:?}");
assert!(debug.contains("<redacted>"));
assert!(!debug.contains("mxgw_secret"));
}
}
+222 -3
View File
@@ -1,15 +1,234 @@
use crate::client::{EventStream, GatewayClient};
use crate::error::Error;
use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
use crate::generated::mxaccess_gateway::v1::mx_command_reply;
use crate::generated::mxaccess_gateway::v1::{
AddItem2Command, AddItemCommand, AdviseCommand, CloseSessionRequest, MxCommand, MxCommandKind,
MxCommandReply, MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand,
StreamEventsRequest, Write2Command, WriteCommand,
};
use crate::value::MxValue;
/// Session identifier returned by the gateway.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone)]
pub struct Session {
id: String,
client: GatewayClient,
}
impl Session {
pub fn new(id: impl Into<String>) -> Self {
Self { id: id.into() }
pub(crate) fn new(id: impl Into<String>, client: GatewayClient) -> Self {
Self {
id: id.into(),
client,
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn open(client: GatewayClient, client_session_name: &str) -> Result<Self, Error> {
client
.open_session(OpenSessionRequest {
client_session_name: client_session_name.to_owned(),
..OpenSessionRequest::default()
})
.await
}
pub async fn close(&self) -> Result<(), Error> {
self.client
.close_session_raw(CloseSessionRequest {
session_id: self.id.clone(),
client_correlation_id: "rust-client-close-session".to_owned(),
})
.await?;
Ok(())
}
pub async fn register(&self, client_name: &str) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::Register,
Payload::Register(RegisterCommand {
client_name: client_name.to_owned(),
}),
)
.await?;
Ok(register_server_handle(&reply))
}
pub async fn add_item(&self, server_handle: i32, item_definition: &str) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::AddItem,
Payload::AddItem(AddItemCommand {
server_handle,
item_definition: item_definition.to_owned(),
}),
)
.await?;
Ok(add_item_handle(&reply))
}
pub async fn add_item2(
&self,
server_handle: i32,
item_definition: &str,
item_context: &str,
) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::AddItem2,
Payload::AddItem2(AddItem2Command {
server_handle,
item_definition: item_definition.to_owned(),
item_context: item_context.to_owned(),
}),
)
.await?;
Ok(add_item2_handle(&reply))
}
pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
self.invoke(
MxCommandKind::Advise,
Payload::Advise(AdviseCommand {
server_handle,
item_handle,
}),
)
.await?;
Ok(())
}
pub async fn write(
&self,
server_handle: i32,
item_handle: i32,
value: MxValue,
user_id: i32,
) -> Result<(), Error> {
self.invoke(
MxCommandKind::Write,
Payload::Write(WriteCommand {
server_handle,
item_handle,
value: Some(value.into_proto()),
user_id,
}),
)
.await?;
Ok(())
}
pub async fn write2(
&self,
server_handle: i32,
item_handle: i32,
value: MxValue,
timestamp_value: MxValue,
user_id: i32,
) -> Result<(), Error> {
self.invoke(
MxCommandKind::Write2,
Payload::Write2(Write2Command {
server_handle,
item_handle,
value: Some(value.into_proto()),
timestamp_value: Some(timestamp_value.into_proto()),
user_id,
}),
)
.await?;
Ok(())
}
pub async fn events(&self) -> Result<EventStream, Error> {
self.events_after(0).await
}
pub async fn events_after(&self, after_worker_sequence: u64) -> Result<EventStream, Error> {
self.client
.stream_events(StreamEventsRequest {
session_id: self.id.clone(),
after_worker_sequence,
})
.await
}
pub async fn invoke_raw(
&self,
kind: MxCommandKind,
payload: Payload,
) -> Result<MxCommandReply, Error> {
self.client
.invoke_raw(self.command_request(kind, payload))
.await
}
pub async fn invoke(
&self,
kind: MxCommandKind,
payload: Payload,
) -> Result<MxCommandReply, Error> {
self.client
.invoke(self.command_request(kind, payload))
.await
}
fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest {
MxCommandRequest {
session_id: self.id.clone(),
client_correlation_id: format!("rust-client-{}", kind.as_str_name()),
command: Some(MxCommand {
kind: kind as i32,
payload: Some(payload),
}),
}
}
}
fn register_server_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::Register(register)) => register.server_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn add_item_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn add_item2_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> {
match value.kind.as_ref()? {
crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value),
_ => None,
}
}
+236 -6
View File
@@ -1,9 +1,239 @@
use crate::generated::mxaccess_gateway::v1::MxValue;
use crate::generated::mxaccess_gateway::v1::mx_array::Values;
use crate::generated::mxaccess_gateway::v1::mx_value::Kind;
use crate::generated::mxaccess_gateway::v1::{
BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, MxArray, MxDataType,
MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue as ProtoMxValue, RawArray,
StringArray, TimestampArray,
};
pub fn int32_value(value: i32) -> MxValue {
MxValue {
data_type: crate::generated::mxaccess_gateway::v1::MxDataType::Integer as i32,
kind: Some(crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value)),
..MxValue::default()
#[derive(Clone, Debug, PartialEq)]
pub struct MxValue {
raw: ProtoMxValue,
projection: MxValueProjection,
}
impl MxValue {
pub fn from_proto(raw: ProtoMxValue) -> Self {
let projection = MxValueProjection::from_proto(&raw);
Self { raw, projection }
}
pub fn bool(value: bool) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Boolean as i32,
variant_type: "VT_BOOL".to_owned(),
kind: Some(Kind::BoolValue(value)),
..ProtoMxValue::default()
})
}
pub fn int32(value: i32) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I4".to_owned(),
kind: Some(Kind::Int32Value(value)),
..ProtoMxValue::default()
})
}
pub fn int64(value: i64) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I8".to_owned(),
kind: Some(Kind::Int64Value(value)),
..ProtoMxValue::default()
})
}
pub fn float(value: f32) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Float as i32,
variant_type: "VT_R4".to_owned(),
kind: Some(Kind::FloatValue(value)),
..ProtoMxValue::default()
})
}
pub fn double(value: f64) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Double as i32,
variant_type: "VT_R8".to_owned(),
kind: Some(Kind::DoubleValue(value)),
..ProtoMxValue::default()
})
}
pub fn string(value: impl Into<String>) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::String as i32,
variant_type: "VT_BSTR".to_owned(),
kind: Some(Kind::StringValue(value.into())),
..ProtoMxValue::default()
})
}
pub fn raw(&self) -> &ProtoMxValue {
&self.raw
}
pub fn projection(&self) -> &MxValueProjection {
&self.projection
}
pub fn into_proto(self) -> ProtoMxValue {
self.raw
}
}
impl From<MxValue> for ProtoMxValue {
fn from(value: MxValue) -> Self {
value.into_proto()
}
}
impl From<ProtoMxValue> for MxValue {
fn from(value: ProtoMxValue) -> Self {
Self::from_proto(value)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum MxValueProjection {
Unset,
Null,
Bool(bool),
Int32(i32),
Int64(i64),
Float(f32),
Double(f64),
String(String),
Timestamp(prost_types::Timestamp),
Array(MxArrayValue),
Raw(Vec<u8>),
}
impl MxValueProjection {
fn from_proto(value: &ProtoMxValue) -> Self {
if value.is_null {
return Self::Null;
}
match value.kind.as_ref() {
Some(Kind::BoolValue(value)) => Self::Bool(*value),
Some(Kind::Int32Value(value)) => Self::Int32(*value),
Some(Kind::Int64Value(value)) => Self::Int64(*value),
Some(Kind::FloatValue(value)) => Self::Float(*value),
Some(Kind::DoubleValue(value)) => Self::Double(*value),
Some(Kind::StringValue(value)) => Self::String(value.clone()),
Some(Kind::TimestampValue(value)) => Self::Timestamp(*value),
Some(Kind::ArrayValue(value)) => Self::Array(MxArrayValue::from_proto(value.clone())),
Some(Kind::RawValue(value)) => Self::Raw(value.clone()),
None => Self::Unset,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MxArrayValue {
raw: MxArray,
projection: MxArrayProjection,
}
impl MxArrayValue {
pub fn from_proto(raw: MxArray) -> Self {
let projection = MxArrayProjection::from_proto(&raw);
Self { raw, projection }
}
pub fn string(values: Vec<String>) -> Self {
Self::from_proto(MxArray {
element_data_type: MxDataType::String as i32,
variant_type: "VT_ARRAY|VT_BSTR".to_owned(),
dimensions: vec![values.len() as u32],
values: Some(Values::StringValues(StringArray { values })),
..MxArray::default()
})
}
pub fn raw(&self) -> &MxArray {
&self.raw
}
pub fn projection(&self) -> &MxArrayProjection {
&self.projection
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum MxArrayProjection {
Unset,
Bool(Vec<bool>),
Int32(Vec<i32>),
Int64(Vec<i64>),
Float(Vec<f32>),
Double(Vec<f64>),
String(Vec<String>),
Timestamp(Vec<prost_types::Timestamp>),
Raw(Vec<Vec<u8>>),
}
impl MxArrayProjection {
fn from_proto(array: &MxArray) -> Self {
match array.values.as_ref() {
Some(Values::BoolValues(BoolArray { values })) => Self::Bool(values.clone()),
Some(Values::Int32Values(Int32Array { values })) => Self::Int32(values.clone()),
Some(Values::Int64Values(Int64Array { values })) => Self::Int64(values.clone()),
Some(Values::FloatValues(FloatArray { values })) => Self::Float(values.clone()),
Some(Values::DoubleValues(DoubleArray { values })) => Self::Double(values.clone()),
Some(Values::StringValues(StringArray { values })) => Self::String(values.clone()),
Some(Values::TimestampValues(TimestampArray { values })) => {
Self::Timestamp(values.clone())
}
Some(Values::RawValues(RawArray { values })) => Self::Raw(values.clone()),
None => Self::Unset,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MxStatus {
raw: MxStatusProxy,
}
impl MxStatus {
pub fn from_proto(raw: MxStatusProxy) -> Self {
Self { raw }
}
pub fn raw(&self) -> &MxStatusProxy {
&self.raw
}
pub fn success(&self) -> i32 {
self.raw.success
}
pub fn category(&self) -> Option<MxStatusCategory> {
MxStatusCategory::try_from(self.raw.category).ok()
}
pub fn detected_by(&self) -> Option<MxStatusSource> {
MxStatusSource::try_from(self.raw.detected_by).ok()
}
pub fn detail(&self) -> i32 {
self.raw.detail
}
pub fn raw_category(&self) -> i32 {
self.raw.raw_category
}
pub fn raw_detected_by(&self) -> i32 {
self.raw.raw_detected_by
}
pub fn diagnostic_text(&self) -> &str {
&self.raw.diagnostic_text
}
}
+398
View File
@@ -0,0 +1,398 @@
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
MxAccessGateway, MxAccessGatewayServer,
};
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use mxgateway_client::generated::mxaccess_gateway::v1::{
AddItemReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply,
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState,
StreamEventsRequest,
};
use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
MxValueProjection,
};
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
#[tokio::test]
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let mut client = GatewayClient::connect(
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")),
)
.await
.unwrap();
let _raw = client.raw_client();
let session = client
.open_session(OpenSessionRequest {
client_session_name: "rust-test".to_owned(),
..OpenSessionRequest::default()
})
.await
.unwrap();
assert_eq!(session.id(), "session-fixture");
assert_eq!(
state.authorization.lock().await.as_deref(),
Some("Bearer mxgw_fixture_secret")
);
}
#[tokio::test]
async fn session_helpers_build_commands_and_preserve_command_errors() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap();
assert_eq!(item_handle, 34);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32));
drop(last_command);
let error = session
.write(12, 34, ClientMxValue::int32(123), 0)
.await
.unwrap_err();
let Error::Command(error) = error else {
panic!("write failure should preserve the raw command reply: {error:?}");
};
assert_eq!(
error.reply().protocol_status.as_ref().unwrap().code,
ProtocolStatusCode::MxaccessFailure as i32
);
assert_eq!(error.reply().hresult, Some(-2147220992));
assert_eq!(error.reply().statuses.len(), 2);
}
#[tokio::test]
async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_events(StreamEventsRequest {
session_id: "session-fixture".to_owned(),
after_worker_sequence: 0,
})
.await
.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
drop(stream);
for _ in 0..20 {
if state.stream_dropped.load(Ordering::SeqCst) {
return;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(state.stream_dropped.load(Ordering::SeqCst));
}
#[test]
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
let fixture = behavior_fixture("values/value-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let int64_case = case_by_id(cases, "int64.large");
let int64_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I8".to_owned(),
kind: Some(Kind::Int64Value(
int64_case["value"]["int64Value"]
.as_str()
.unwrap()
.parse()
.unwrap(),
)),
..MxValue::default()
});
assert_eq!(
int64_value.projection(),
&MxValueProjection::Int64(9_223_372_036_854_770_000)
);
let raw_case = case_by_id(cases, "raw-fallback.variant");
let raw_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Unknown as i32,
variant_type: "VT_RECORD".to_owned(),
raw_diagnostic: raw_case["value"]["rawDiagnostic"]
.as_str()
.unwrap()
.to_owned(),
raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32,
kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])),
..MxValue::default()
});
assert_eq!(
raw_value.projection(),
&MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
);
assert_eq!(raw_value.raw().raw_data_type, 32767);
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
}
#[test]
fn status_conversion_fixtures_preserve_raw_fields() {
let fixture = behavior_fixture("statuses/status-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let raw_case = case_by_id(cases, "raw-unknown-category");
let status = MxStatus::from_proto(MxStatusProxy {
success: raw_case["status"]["success"].as_i64().unwrap() as i32,
category: MxStatusCategory::Unknown as i32,
detected_by: MxStatusSource::Unknown as i32,
detail: raw_case["status"]["detail"].as_i64().unwrap() as i32,
raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32,
raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32,
diagnostic_text: raw_case["status"]["diagnosticText"]
.as_str()
.unwrap()
.to_owned(),
});
assert_eq!(status.success(), 0);
assert_eq!(status.category(), Some(MxStatusCategory::Unknown));
assert_eq!(status.raw_category(), 99);
assert_eq!(status.raw_detected_by(), 77);
assert!(status.diagnostic_text().contains("preserved"));
}
#[test]
fn authentication_and_authorization_statuses_are_distinct_and_redacted() {
let auth = Error::from(Status::unauthenticated(
"invalid API key mxgw_visible_secret",
));
let denied = Error::from(Status::permission_denied("missing scope mxaccess.write"));
assert!(matches!(auth, Error::Authentication { .. }));
assert!(matches!(denied, Error::Authorization { .. }));
assert!(!auth.to_string().contains("visible_secret"));
}
#[test]
fn command_error_display_keeps_raw_reply_accessible() {
let reply = mxaccess_failure_reply();
let error = CommandError::new(reply.clone());
assert_eq!(error.reply().hresult, Some(-2147220992));
assert!(error.to_string().contains("MxaccessFailure"));
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
last_command_kind: Mutex<Option<i32>>,
stream_dropped: Arc<AtomicBool>,
}
#[derive(Clone)]
struct FakeGateway {
state: Arc<FakeState>,
}
#[tonic::async_trait]
impl MxAccessGateway for FakeGateway {
async fn open_session(
&self,
request: Request<OpenSessionRequest>,
) -> Result<Response<OpenSessionReply>, Status> {
*self.state.authorization.lock().await = request
.metadata()
.get("authorization")
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
Ok(Response::new(OpenSessionReply {
session_id: "session-fixture".to_owned(),
backend_name: "fake".to_owned(),
worker_process_id: 1234,
worker_protocol_version: 1,
gateway_protocol_version: 1,
protocol_status: Some(ok_status("opened")),
..OpenSessionReply::default()
}))
}
async fn close_session(
&self,
request: Request<CloseSessionRequest>,
) -> Result<Response<CloseSessionReply>, Status> {
Ok(Response::new(CloseSessionReply {
session_id: request.into_inner().session_id,
final_state: SessionState::Closed as i32,
protocol_status: Some(ok_status("closed")),
}))
}
async fn invoke(
&self,
request: Request<mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest>,
) -> Result<Response<MxCommandReply>, Status> {
let request = request.into_inner();
let kind = request
.command
.as_ref()
.map(|command| command.kind)
.unwrap_or_default();
*self.state.last_command_kind.lock().await = Some(kind);
if kind == MxCommandKind::Write as i32 {
return Ok(Response::new(mxaccess_failure_reply()));
}
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 34,
})),
..MxCommandReply::default()
}))
}
type StreamEventsStream = DropAwareStream;
async fn stream_events(
&self,
_request: Request<StreamEventsRequest>,
) -> Result<Response<Self::StreamEventsStream>, Status> {
let (sender, receiver) = mpsc::channel(4);
sender.send(Ok(event(1))).await.unwrap();
sender.send(Ok(event(2))).await.unwrap();
Ok(Response::new(DropAwareStream {
inner: ReceiverStream::new(receiver),
dropped: self.state.stream_dropped.clone(),
}))
}
}
struct DropAwareStream {
inner: ReceiverStream<Result<MxEvent, Status>>,
dropped: Arc<AtomicBool>,
}
impl Stream for DropAwareStream {
type Item = Result<MxEvent, Status>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(context)
}
}
impl Drop for DropAwareStream {
fn drop(&mut self) {
self.dropped.store(true, Ordering::SeqCst);
}
}
async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let incoming = TcpListenerStream::new(listener);
let service = MxAccessGatewayServer::new(FakeGateway { state });
tokio::spawn(async move {
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
.unwrap();
});
format!("http://{address}")
}
fn ok_status(message: &str) -> ProtocolStatus {
ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,
message: message.to_owned(),
}
}
fn mxaccess_failure_reply() -> MxCommandReply {
MxCommandReply {
session_id: "session-fixture".to_owned(),
correlation_id: "gateway-correlation-write-1".to_owned(),
kind: MxCommandKind::Write as i32,
protocol_status: Some(ProtocolStatus {
code: ProtocolStatusCode::MxaccessFailure as i32,
message: "MXAccess rejected the write.".to_owned(),
}),
hresult: Some(-2147220992),
statuses: vec![
MxStatusProxy {
success: 0,
category: MxStatusCategory::SecurityError as i32,
detected_by: MxStatusSource::RespondingLmx as i32,
detail: 321,
raw_category: 8,
raw_detected_by: 3,
diagnostic_text: "Write denied by provider security.".to_owned(),
},
MxStatusProxy {
success: 0,
category: MxStatusCategory::OperationalError as i32,
detected_by: MxStatusSource::RespondingNmx as i32,
detail: 902,
raw_category: 7,
raw_detected_by: 5,
diagnostic_text: "Provider rejected the item state.".to_owned(),
},
],
..MxCommandReply::default()
}
}
fn event(sequence: u64) -> MxEvent {
MxEvent {
family: MxEventFamily::OnDataChange as i32,
session_id: "session-fixture".to_owned(),
worker_sequence: sequence,
..MxEvent::default()
}
}
fn behavior_fixture(path: &str) -> Value {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../proto/fixtures/behavior")
.join(path);
let data = std::fs::read_to_string(&path).unwrap();
serde_json::from_str(&data).unwrap()
}
fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value {
cases
.iter()
.find(|case| case["id"].as_str() == Some(id))
.unwrap_or_else(|| panic!("missing fixture case {id}"))
}
+6
View File
@@ -131,6 +131,12 @@ Python clients should use `grpc_tools.protoc` and write generated modules under
`clients/python/src/mxgateway/generated` so imports stay separate from
handwritten async wrappers.
The Python scaffold provides a repo-local generation script:
```powershell
clients/python/generate-proto.ps1
```
Java clients should use the Gradle protobuf plugin and write generated sources
under `clients/java/src/main/generated`. The Java client scaffold owns the
Gradle plugin versions and source-set wiring.