Compare commits

...

89 Commits

Author SHA1 Message Date
Joseph Doherty af42891d5a Issue #47: scaffold Java Gradle build 2026-04-26 20:36:27 -04:00
dohertj2 01a51df053 Merge pull request #95 from agent-2/issue-44-implement-rust-client-session-values-errors-and-cli
Issue #44: implement Rust client session values errors and CLI
2026-04-26 20:34:28 -04:00
Joseph Doherty 89a8fb876a Issue #44: implement Rust client session values errors and CLI 2026-04-26 20:30:04 -04:00
dohertj2 c58358fad9 Merge pull request #94 from agent-3/issue-45-scaffold-python-package
Issue #45: scaffold Python package
2026-04-26 20:28:13 -04:00
dohertj2 8d312a6d2e Merge pull request #93 from agent-1/issue-40-implement-dotnet-values-status-errors-and-cli
Issue #40: implement .NET values status errors and CLI
2026-04-26 20:22:58 -04:00
Joseph Doherty f861a8b3b8 Issue #45: scaffold Python package 2026-04-26 20:22:35 -04:00
Joseph Doherty 499708b2a2 Issue #40: implement .NET values status errors and CLI 2026-04-26 20:17:02 -04:00
dohertj2 191b724f95 Merge pull request #92 from agent-3/issue-42-implement-go-client-session-values-errors-and-cli
Issue #42: implement Go client session values errors and CLI
2026-04-26 20:14:56 -04:00
Joseph Doherty 8793011838 Issue #42: implement Go client session values errors and CLI 2026-04-26 20:09:58 -04:00
dohertj2 b275eedb44 Merge pull request #91 from agent-2/issue-34-worker-live-mxaccess-smoke-test
Issue #34: Worker Live MXAccess Smoke Test
2026-04-26 20:06:58 -04:00
Joseph Doherty a9ef6d10d4 Issue #34: handle worktree roots in live smoke tests 2026-04-26 20:03:21 -04:00
Joseph Doherty 0f17a1d1d9 Add live MXAccess worker smoke test 2026-04-26 19:58:33 -04:00
dohertj2 160343aff4 Merge pull request #90 from agent-3/issue-43-scaffold-rust-workspace
Issue #43: scaffold Rust workspace
2026-04-26 19:52:33 -04:00
dohertj2 8ef98b8beb Merge pull request #89 from agent-1/issue-39-implement-dotnet-gatewayclient-and-session
Issue #39: implement .NET GatewayClient and session
2026-04-26 19:50:37 -04:00
Joseph Doherty f049d3e603 Merge remote-tracking branch 'origin/main' into agent-3/issue-43-scaffold-rust-workspace 2026-04-26 19:49:30 -04:00
Joseph Doherty ee88f9d647 Scaffold Rust client workspace 2026-04-26 19:47:26 -04:00
Joseph Doherty 6e34efd1a5 Merge remote-tracking branch 'origin/main' into agent-1/issue-39-implement-dotnet-gatewayclient-and-session 2026-04-26 19:47:17 -04:00
Joseph Doherty 01d6c33156 Implement .NET gateway client sessions 2026-04-26 19:45:43 -04:00
dohertj2 ec4e2f687e Merge pull request #88 from agent-2/issue-33-implement-graceful-shutdown
Issue #33: implement graceful shutdown
2026-04-26 19:44:00 -04:00
Joseph Doherty f7929cc12f Merge remote-tracking branch 'origin/main' into agent-2/issue-33-implement-graceful-shutdown
# Conflicts:
#	src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeClient.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
2026-04-26 19:41:04 -04:00
Joseph Doherty d890eff862 Implement graceful worker shutdown 2026-04-26 19:36:22 -04:00
dohertj2 9dcd4baff2 Merge pull request #86 from agent-3/issue-41-scaffold-go-module
Issue #41: scaffold Go module
2026-04-26 19:33:44 -04:00
dohertj2 7a0743496f Merge pull request #87 from agent-1/issue-38-scaffold-dotnet-client-projects
Issue #38: scaffold .NET client projects
2026-04-26 19:31:28 -04:00
Joseph Doherty bcfbd1cfc8 Merge remote-tracking branch 'origin/main' into agent-3/issue-41-scaffold-go-module 2026-04-26 19:30:16 -04:00
Joseph Doherty 8e3b0c1c4a Scaffold Go client module 2026-04-26 19:27:27 -04:00
Joseph Doherty bd4be85f26 Merge remote-tracking branch 'origin/main' into agent-1/issue-38-scaffold-dotnet-client-projects 2026-04-26 19:25:15 -04:00
Joseph Doherty 7331c6157a Scaffold .NET client projects 2026-04-26 19:25:07 -04:00
dohertj2 cbc317e3e7 Merge pull request #85 from agent-3/issue-32-implement-heartbeat-and-watchdog
Issue #32: implement heartbeat and watchdog
2026-04-26 19:20:15 -04:00
Joseph Doherty 7242cf772b Merge remote-tracking branch 'origin/main' into agent-3/issue-32-implement-heartbeat-and-watchdog 2026-04-26 19:16:56 -04:00
Joseph Doherty 7d67313a7d Merge remote-tracking branch 'origin/main' into agent-3/issue-32-implement-heartbeat-and-watchdog
# Conflicts:
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
#	src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs
2026-04-26 19:16:42 -04:00
dohertj2 044b16c5db Merge pull request #84 from agent-1/issue-37-create-cross-language-client-behavior-fixtures
Issue #37: create cross-language client behavior fixtures
2026-04-26 19:15:40 -04:00
Joseph Doherty 1f92078777 Merge remote-tracking branch 'origin/main' into agent-1/issue-37-create-cross-language-client-behavior-fixtures 2026-04-26 19:12:19 -04:00
Joseph Doherty 4a3560c7ee Implement worker heartbeat watchdog 2026-04-26 19:12:06 -04:00
Joseph Doherty 108a3d3f8a Add client behavior fixtures 2026-04-26 19:11:04 -04:00
dohertj2 95e71cd819 Merge pull request #83 from agent-2/issue-29-implement-event-sink-and-event-queue
Issue #29: implement event sink and event queue
2026-04-26 19:08:52 -04:00
Joseph Doherty 647fe9a4b5 Merge remote-tracking branch 'origin/main' into agent-2/issue-29-implement-event-sink-and-event-queue 2026-04-26 19:05:31 -04:00
Joseph Doherty dd455089b4 Implement worker MXAccess event queue 2026-04-26 19:04:56 -04:00
dohertj2 d0bc4e3c01 Merge pull request #82 from agent-1/issue-36-publish-stable-client-proto-generation-inputs
Issue #36: publish stable client proto generation inputs
2026-04-26 18:56:37 -04:00
Joseph Doherty 6a40d26366 Publish stable client proto inputs 2026-04-26 18:52:39 -04:00
dohertj2 366f57198f Merge pull request #81 from agent-2/issue-28-implement-advise-unadvise-advisesupervisory
Issue #28: implement Advise, UnAdvise, AdviseSupervisory
2026-04-26 18:45:59 -04:00
dohertj2 aab41e04ab Merge pull request #80 from agent-1/issue-16-implement-blazor-server-dashboard
Issue #16: implement Blazor Server dashboard
2026-04-26 18:41:26 -04:00
Joseph Doherty 3be92a17bd Merge remote-tracking branch 'origin/main' into agent-2/issue-28-implement-advise-unadvise-advisesupervisory 2026-04-26 18:41:17 -04:00
Joseph Doherty a871f2f2e5 Implement worker advise commands 2026-04-26 18:41:10 -04:00
Joseph Doherty 7b86bab705 Merge remote-tracking branch 'origin/main' into agent-1/issue-16-implement-blazor-server-dashboard 2026-04-26 18:38:19 -04:00
Joseph Doherty 56886c3b4e Implement Blazor Server dashboard 2026-04-26 18:37:16 -04:00
dohertj2 a3ccd5c80b Merge pull request #79 from agent-3/issue-19-gateway-e2e-smoke-with-fake-worker
Issue #19: gateway end-to-end smoke with fake worker
2026-04-26 18:34:54 -04:00
dohertj2 0fd954d94c Merge pull request #78 from agent-2/issue-27-implement-additem-additem2-removeitem
Issue #27: implement AddItem, AddItem2, RemoveItem
2026-04-26 18:32:13 -04:00
Joseph Doherty 91f2d8dc14 Merge remote-tracking branch 'origin/main' into agent-3/issue-19-gateway-e2e-smoke-with-fake-worker 2026-04-26 18:31:49 -04:00
Joseph Doherty fb425da009 Add gateway fake worker end-to-end smoke 2026-04-26 18:30:11 -04:00
Joseph Doherty c7e4c4b614 Merge remote-tracking branch 'origin/main' into agent-2/issue-27-implement-additem-additem2-removeitem 2026-04-26 18:27:00 -04:00
Joseph Doherty 59c710d789 Implement worker AddItem commands 2026-04-26 18:26:44 -04:00
dohertj2 862f119b91 Merge pull request #77 from agent-3/issue-18-build-fake-worker-test-harness
Issue #18: build fake worker test harness
2026-04-26 18:25:00 -04:00
Joseph Doherty 35e4442c7b Build fake worker test harness 2026-04-26 18:20:45 -04:00
dohertj2 ed1018c3bb Merge pull request #76 from agent-1/issue-17-implement-dashboard-authentication
Issue #17: implement dashboard authentication
2026-04-26 18:18:43 -04:00
Joseph Doherty 2e4ba11a9f Merge remote-tracking branch 'origin/main' into agent-1/issue-17-implement-dashboard-authentication 2026-04-26 18:15:38 -04:00
Joseph Doherty ff86b3f0b0 Implement dashboard authentication 2026-04-26 18:15:22 -04:00
dohertj2 653f17c669 Merge pull request #75 from agent-2/issue-26-implement-register-and-unregister
Issue #26: implement Register and Unregister
2026-04-26 18:13:41 -04:00
Joseph Doherty 556c3bfa83 Implement worker register and unregister 2026-04-26 18:08:45 -04:00
dohertj2 9b3637257c Merge pull request #74 from agent-3/issue-14-implement-event-streaming-and-backpressure
Issue #14: implement event streaming and backpressure
2026-04-26 18:03:32 -04:00
Joseph Doherty 77eac95f33 Issue #14: implement event streaming and backpressure 2026-04-26 17:59:37 -04:00
dohertj2 015fa1f50d Merge pull request #73 from agent-1/issue-15-implement-dashboard-snapshot-service
Issue #15: implement dashboard snapshot service
2026-04-26 17:56:01 -04:00
dohertj2 dede407304 Merge pull request #72 from agent-2/issue-25-implement-sta-command-dispatcher
Issue #25: implement STA command dispatcher
2026-04-26 17:53:32 -04:00
Joseph Doherty 0d96963c99 Merge remote-tracking branch 'origin/main' into agent-1/issue-15-implement-dashboard-snapshot-service 2026-04-26 17:52:58 -04:00
Joseph Doherty 3661420f0a Issue #15: implement dashboard snapshot service 2026-04-26 17:49:59 -04:00
Joseph Doherty 14419853c7 Issue #25: implement sta command dispatcher 2026-04-26 17:49:01 -04:00
dohertj2 a20517f5ad Merge pull request #71 from agent-3/issue-13-implement-public-grpc-service
Issue #13: implement public grpc service
2026-04-26 17:48:35 -04:00
dohertj2 626e7762d9 Merge PR #70: Issue #31 implement MXSTATUS_PROXY and HRESULT conversion
Verified after merging current main with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:43:08 -04:00
Joseph Doherty 8d6d3f6188 Issue #13: implement public grpc service 2026-04-26 17:42:46 -04:00
Joseph Doherty 276288ad87 Merge remote-tracking branch 'origin/main' into agent-2/issue-31-implement-mxstatus-proxy-and-hresult-conversion 2026-04-26 17:39:48 -04:00
dohertj2 76bd3de5a2 Merge PR #69: Issue #24 create MXAccess COM object on STA
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:39:04 -04:00
Joseph Doherty 29455fc1f6 Issue #31: implement mxstatus proxy and hresult conversion 2026-04-26 17:35:30 -04:00
dohertj2 5511609880 Merge PR #68: Issue #12 implement session manager and registry
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:34:19 -04:00
Joseph Doherty 451dccf7e3 Issue #24: create mxaccess com object on sta 2026-04-26 17:34:12 -04:00
dohertj2 cde9c89386 Merge PR #67: Issue #30 implement value conversion
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:30:19 -04:00
Joseph Doherty d496f1fd75 Issue #12: implement session manager and registry 2026-04-26 17:29:47 -04:00
Joseph Doherty 6559672fc1 Issue #30: implement value conversion 2026-04-26 17:26:36 -04:00
dohertj2 97c30b9d00 Merge PR #66: Issue #23 implement STA runtime and message pump
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:23:02 -04:00
dohertj2 603aff7004 Merge PR #65: Issue #22 implement pipe client and frame protocol
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:20:28 -04:00
Joseph Doherty e81682e367 Issue #23: implement sta runtime and message pump 2026-04-26 17:19:00 -04:00
Joseph Doherty d5a982152b Issue #22: implement pipe client and frame protocol 2026-04-26 17:16:49 -04:00
dohertj2 0b0be7098e Merge PR #64: Issue #11 implement gateway WorkerClient
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:14:03 -04:00
Joseph Doherty fce9e99553 Issue #11: implement gateway workerclient 2026-04-26 17:09:51 -04:00
dohertj2 c8fb3e91a3 Merge PR #63: Issue #8 add gRPC authentication and scope authorization
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:06:23 -04:00
dohertj2 8ce327e6f4 Merge PR #62: Issue #7 implement local api key admin cli
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:02:09 -04:00
Joseph Doherty fad0ac9948 Issue #8: add grpc authentication and scope authorization 2026-04-26 17:01:59 -04:00
dohertj2 9cb2f1c5cd Merge PR #61: Issue #21 implement worker bootstrap and options
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:56:52 -04:00
Joseph Doherty da9ffe0e11 Issue #7: implement local api key admin cli 2026-04-26 16:56:12 -04:00
Joseph Doherty 0af1427859 Issue #21: implement worker bootstrap and options 2026-04-26 16:53:06 -04:00
dohertj2 e2b4dfcb32 Merge PR #60: Issue #10 implement worker process launcher
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:50:00 -04:00
323 changed files with 105883 additions and 364 deletions
@@ -0,0 +1,117 @@
using System.Globalization;
namespace MxGateway.Client.Cli;
internal sealed class CliArguments
{
private readonly Dictionary<string, string> _values = new(StringComparer.OrdinalIgnoreCase);
private readonly HashSet<string> _flags = new(StringComparer.OrdinalIgnoreCase);
public CliArguments(IEnumerable<string> args)
{
string? pendingName = null;
foreach (string arg in args)
{
if (arg.StartsWith("--", StringComparison.Ordinal))
{
if (pendingName is not null)
{
_flags.Add(pendingName);
}
pendingName = arg[2..];
continue;
}
if (pendingName is null)
{
throw new ArgumentException($"Unexpected argument '{arg}'.");
}
_values[pendingName] = arg;
pendingName = null;
}
if (pendingName is not null)
{
_flags.Add(pendingName);
}
}
public bool HasFlag(string name)
{
return _flags.Contains(name);
}
public string? GetOptional(string name)
{
return _values.TryGetValue(name, out string? value)
? value
: null;
}
public string GetRequired(string name)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"Missing required option --{name}.");
}
return value;
}
public int GetInt32(string name, int? defaultValue = null)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
if (defaultValue.HasValue)
{
return defaultValue.Value;
}
throw new ArgumentException($"Missing required option --{name}.");
}
return int.Parse(value, CultureInfo.InvariantCulture);
}
public uint GetUInt32(string name, uint defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: uint.Parse(value, CultureInfo.InvariantCulture);
}
public ulong GetUInt64(string name, ulong defaultValue)
{
string? value = GetOptional(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: ulong.Parse(value, CultureInfo.InvariantCulture);
}
public TimeSpan GetDuration(string name, TimeSpan defaultValue)
{
string? value = GetOptional(name);
if (string.IsNullOrWhiteSpace(value))
{
return defaultValue;
}
if (value.EndsWith("ms", StringComparison.OrdinalIgnoreCase))
{
return TimeSpan.FromMilliseconds(double.Parse(value[..^2], CultureInfo.InvariantCulture));
}
if (value.EndsWith('s'))
{
return TimeSpan.FromSeconds(double.Parse(value[..^1], CultureInfo.InvariantCulture));
}
return TimeSpan.Parse(value, CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
public interface IMxGatewayCliClient : IAsyncDisposable
{
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
@@ -0,0 +1,40 @@
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
internal sealed class MxGatewayCliClientAdapter(MxGatewayClient client) : IMxGatewayCliClient
{
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return client.OpenSessionRawAsync(request, cancellationToken);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
return client.CloseSessionRawAsync(request, cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
return client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken)
{
return client.StreamEventsAsync(request, cancellationToken);
}
public ValueTask DisposeAsync()
{
return client.DisposeAsync();
}
}
@@ -0,0 +1,14 @@
namespace MxGateway.Client.Cli;
internal static class MxGatewayCliSecretRedactor
{
public static string Redact(string value, string? apiKey)
{
if (string.IsNullOrEmpty(value) || string.IsNullOrEmpty(apiKey))
{
return value;
}
return value.Replace(apiKey, "[redacted]", StringComparison.Ordinal);
}
}
@@ -0,0 +1,744 @@
using System.Globalization;
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Cli;
public static class MxGatewayClientCli
{
private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default;
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
public static int Run(
string[] args,
TextWriter standardOutput,
TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
.GetAwaiter()
.GetResult();
}
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
ArgumentNullException.ThrowIfNull(standardError);
return RunCoreAsync(
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
}
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
{
if (args.Length is 0 || IsHelp(args[0]))
{
WriteUsage(standardOutput);
return 0;
}
string command = args[0].ToLowerInvariant();
CliArguments arguments = new(args.Skip(1));
try
{
if (command is "version")
{
WriteVersion(arguments, standardOutput);
return 0;
}
if (!IsKnownGatewayCommand(command))
{
return WriteUnknownCommand(command, standardError);
}
await using IMxGatewayCliClient client = clientFactory(CreateOptions(arguments));
using CancellationTokenSource cancellation = CreateCancellation(arguments);
return command switch
{
"open-session" => await OpenSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"close-session" => await CloseSessionAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"ping" => await PingAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"register" => await RegisterAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"add-item" => await AddItemAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"advise" => await AdviseAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"smoke" => await SmokeAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
_ => WriteUnknownCommand(command, standardError),
};
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
string? apiKey = arguments.GetOptional("api-key");
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
JsonOptions));
}
else
{
standardError.WriteLine(message);
}
return 1;
}
}
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
{
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
}
private static MxGatewayClientOptions CreateOptions(CliArguments arguments)
{
string endpoint = arguments.GetOptional("endpoint")
?? Environment.GetEnvironmentVariable("MXGATEWAY_ENDPOINT")
?? "http://localhost:5000";
string apiKey = ResolveApiKey(arguments);
return new MxGatewayClientOptions
{
Endpoint = new Uri(endpoint, UriKind.Absolute),
ApiKey = apiKey,
UseTls = arguments.HasFlag("tls")
|| endpoint.StartsWith("https://", StringComparison.OrdinalIgnoreCase),
DefaultCallTimeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30)),
ConnectTimeout = arguments.GetDuration("connect-timeout", TimeSpan.FromSeconds(10)),
CaCertificatePath = arguments.GetOptional("ca-file"),
ServerNameOverride = arguments.GetOptional("server-name"),
};
}
private static string ResolveApiKey(CliArguments arguments)
{
string? apiKey = arguments.GetOptional("api-key");
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
string apiKeyEnvironmentName = arguments.GetOptional("api-key-env")
?? "MXGATEWAY_API_KEY";
apiKey = Environment.GetEnvironmentVariable(apiKeyEnvironmentName);
if (!string.IsNullOrWhiteSpace(apiKey))
{
return apiKey;
}
throw new ArgumentException(
$"Gateway API key is required. Pass --api-key or set {apiKeyEnvironmentName}.");
}
private static CancellationTokenSource CreateCancellation(CliArguments arguments)
{
var cancellation = new CancellationTokenSource();
TimeSpan timeout = arguments.GetDuration("timeout", TimeSpan.FromSeconds(30));
cancellation.CancelAfter(timeout);
return cancellation;
}
private static Task<int> OpenSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli",
ClientCorrelationId = CreateCorrelationId(),
RequestedBackend = arguments.GetOptional("backend") ?? string.Empty,
},
cancellationToken),
arguments,
output);
}
private static Task<int> CloseSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return WriteReplyAsync(
client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = arguments.GetRequired("session-id"),
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken),
arguments,
output);
}
private static Task<int> PingAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = arguments.GetOptional("message") ?? "ping" },
},
cancellationToken);
}
private static Task<int> RegisterAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-cli" },
},
cancellationToken);
}
private static Task<int> AddItemAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemDefinition = arguments.GetRequired("item"),
},
},
cancellationToken);
}
private static Task<int> AdviseAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
},
},
cancellationToken);
}
private static Task<int> WriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
},
cancellationToken);
}
private static Task<int> Write2Async(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = arguments.GetInt32("server-handle"),
ItemHandle = arguments.GetInt32("item-handle"),
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
TimestampValue = ParseTimestampValue(arguments),
},
},
cancellationToken);
}
private static async Task<int> StreamEventsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
var events = new List<MxEvent>();
uint maxEvents = arguments.GetUInt32("max-events", 0);
uint eventCount = 0;
var request = new StreamEventsRequest
{
SessionId = arguments.GetRequired("session-id"),
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
};
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (arguments.HasFlag("json"))
{
events.Add(gatewayEvent);
}
else
{
output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent));
}
eventCount++;
if (maxEvents > 0 && eventCount >= maxEvents)
{
break;
}
}
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new { events = events.Select(EventToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
OpenSessionReply? openReply = null;
CloseSessionReply? closeReply = null;
var commandReplies = new List<MxCommandReply>();
var events = new List<MxEvent>();
try
{
openReply = await client.OpenSessionAsync(
new OpenSessionRequest
{
ClientSessionName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke",
ClientCorrelationId = CreateCorrelationId(),
},
cancellationToken)
.ConfigureAwait(false);
int serverHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
},
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
int itemHandle = await InvokeForHandleAsync(
arguments,
client,
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = arguments.GetRequired("item"),
},
},
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}),
cancellationToken)
.ConfigureAwait(false));
if (arguments.GetOptional("value") is not null)
{
commandReplies.Add(await InvokeAndEnsureAsync(
client,
CreateCommandRequest(
openReply.SessionId,
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
UserId = arguments.GetInt32("user-id", 0),
Value = ParseValue(arguments),
},
}),
cancellationToken)
.ConfigureAwait(false));
}
using CancellationTokenSource streamCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
streamCancellation.CancelAfter(arguments.GetDuration(
"event-timeout",
TimeSpan.FromSeconds(2)));
try
{
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(
new StreamEventsRequest { SessionId = openReply.SessionId },
streamCancellation.Token)
.WithCancellation(streamCancellation.Token)
.ConfigureAwait(false))
{
events.Add(gatewayEvent);
if (events.Count >= arguments.GetUInt32("max-events", 1))
{
break;
}
}
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
}
}
finally
{
if (openReply is not null)
{
closeReply = await client.CloseSessionAsync(
new CloseSessionRequest
{
SessionId = openReply.SessionId,
ClientCorrelationId = CreateCorrelationId(),
},
CancellationToken.None)
.ConfigureAwait(false);
}
}
WriteSmokeResult(arguments, output, openReply, closeReply, commandReplies, events);
return 0;
}
private static async Task<int> InvokeAndWriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
MxCommand command,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(arguments.GetRequired("session-id"), command),
cancellationToken)
.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static async Task<int> InvokeForHandleAsync(
CliArguments arguments,
IMxGatewayCliClient client,
string sessionId,
MxCommand command,
Func<MxCommandReply, int> handleSelector,
List<MxCommandReply> replies,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, command),
cancellationToken)
.ConfigureAwait(false);
replies.Add(reply);
return handleSelector(reply);
}
private static async Task<MxCommandReply> InvokeAndEnsureAsync(
IMxGatewayCliClient client,
MxCommandRequest request,
CancellationToken cancellationToken)
{
MxCommandReply reply = await client.InvokeAsync(request, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply;
}
private static MxCommandRequest CreateCommandRequest(
string sessionId,
MxCommand command)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = CreateCorrelationId(),
Command = command,
};
}
private static async Task<int> WriteReplyAsync<TReply>(
Task<TReply> replyTask,
CliArguments arguments,
TextWriter output)
where TReply : IMessage
{
TReply reply = await replyTask.ConfigureAwait(false);
WriteMessage(arguments, output, reply);
return 0;
}
private static void WriteVersion(CliArguments arguments, TextWriter output)
{
if (arguments.HasFlag("json"))
{
output.WriteLine(JsonSerializer.Serialize(
new
{
gatewayProtocolVersion = MxGatewayClientContractInfo.GatewayProtocolVersion,
workerProtocolVersion = MxGatewayClientContractInfo.WorkerProtocolVersion,
},
JsonOptions));
return;
}
output.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
output.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
}
private static void WriteMessage(
CliArguments arguments,
TextWriter output,
IMessage message)
{
output.WriteLine(arguments.HasFlag("json")
? ProtobufJsonFormatter.Format(message)
: message.ToString());
}
private static void WriteSmokeResult(
CliArguments arguments,
TextWriter output,
OpenSessionReply? openReply,
CloseSessionReply? closeReply,
IReadOnlyList<MxCommandReply> commandReplies,
IReadOnlyList<MxEvent> events)
{
if (!arguments.HasFlag("json"))
{
output.WriteLine($"session-id={openReply?.SessionId}");
output.WriteLine($"commands={commandReplies.Count}");
output.WriteLine($"events={events.Count}");
output.WriteLine($"closed={closeReply is not null}");
return;
}
output.WriteLine(JsonSerializer.Serialize(
new
{
sessionId = openReply?.SessionId,
closed = closeReply is not null,
commandReplies = commandReplies.Select(CommandReplyToJsonElement).ToArray(),
events = events.Select(EventToJsonElement).ToArray(),
},
JsonOptions));
}
private static JsonElement CommandReplyToJsonElement(MxCommandReply reply)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(reply)).RootElement.Clone();
}
private static JsonElement EventToJsonElement(MxEvent gatewayEvent)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(gatewayEvent)).RootElement.Clone();
}
private static MxValue ParseValue(CliArguments arguments)
{
string type = arguments.GetRequired("type").ToLowerInvariant();
string value = arguments.GetRequired("value");
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
return type switch
{
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
"int32" or "integer" => int.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int32-array" or "integer-array" => values.Select(item => int.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"int64" => long.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"int64-array" => values.Select(item => long.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"float" => float.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"float-array" => values.Select(item => float.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"double" => double.Parse(value, CultureInfo.InvariantCulture).ToMxValue(),
"double-array" => values.Select(item => double.Parse(item, CultureInfo.InvariantCulture)).ToArray().ToMxValue(),
"string" => value.ToMxValue(),
"string-array" => values.ToMxValue(),
"time" or "timestamp" => DateTimeOffset.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal).ToMxValue(),
"time-array" or "timestamp-array" => values
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
.ToArray()
.ToMxValue(),
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
};
}
private static MxValue ParseTimestampValue(CliArguments arguments)
{
string timestamp = arguments.GetOptional("timestamp")
?? DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
return DateTimeOffset.Parse(
timestamp,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal)
.ToMxValue();
}
private static int WriteUnknownCommand(string command, TextWriter standardError)
{
standardError.WriteLine($"Unknown command '{command}'.");
WriteUsage(standardError);
return 2;
}
private static bool IsHelp(string value)
{
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
}
private static bool IsKnownGatewayCommand(string command)
{
return command is "open-session"
or "close-session"
or "ping"
or "register"
or "add-item"
or "advise"
or "stream-events"
or "write"
or "write2"
or "smoke";
}
private static string CreateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
writer.WriteLine("mxgw-dotnet close-session --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet register --session-id <id> --client-name <name> [--json]");
writer.WriteLine("mxgw-dotnet add-item --session-id <id> --server-handle <n> --item <ref> [--json]");
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
}
}
@@ -0,0 +1,3 @@
using MxGateway.Client.Cli;
return await MxGatewayClientCli.RunAsync(args, Console.Out, Console.Error);
@@ -0,0 +1,86 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
{
private readonly Queue<MxCommandReply> _invokeReplies = new();
private readonly List<MxEvent> _events = [];
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
public OpenSessionReply OpenSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
BackendName = "mxaccess-worker",
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public CloseSessionReply CloseSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
FinalState = SessionState.Closed,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
OpenSessionCalls.Add((request, callOptions));
return Task.FromResult(OpenSessionReply);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
CloseSessionCalls.Add((request, callOptions));
return Task.FromResult(CloseSessionReply);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
InvokeCalls.Add((request, callOptions));
return Task.FromResult(_invokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
StreamEventsCalls.Add((request, callOptions));
foreach (MxEvent gatewayEvent in _events)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
public void AddInvokeReply(MxCommandReply reply)
{
_invokeReplies.Enqueue(reply);
}
public void AddEvent(MxEvent gatewayEvent)
{
_events.Add(gatewayEvent);
}
}
@@ -0,0 +1,78 @@
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxCommandReplyExtensionsTests
{
[Fact]
public void EnsureSuccess_WithRegisterFixture_ReturnsReply()
{
MxCommandReply reply = ReadReplyFixture("register.ok.reply.json");
Assert.Same(reply, reply.EnsureProtocolSuccess());
Assert.Same(reply, reply.EnsureMxAccessSuccess());
}
[Fact]
public void EnsureMxAccessSuccess_WithFailureFixture_PreservesHResultAndStatuses()
{
MxCommandReply reply = ReadReplyFixture("write.mxaccess-failure.reply.json");
reply.EnsureProtocolSuccess();
MxAccessException exception = Assert.Throws<MxAccessException>(
reply.EnsureMxAccessSuccess);
Assert.Equal(-2147220992, exception.HResultCode);
Assert.Equal(reply.Statuses.Count, exception.Statuses.Count);
Assert.Equal(reply, exception.Reply);
Assert.Contains("0x80040200", exception.Message);
}
[Fact]
public void EnsureProtocolSuccess_WithSessionFailure_ThrowsSessionException()
{
MxCommandReply reply = new()
{
SessionId = "session-missing",
CorrelationId = "correlation",
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotFound,
Message = "Session was not found.",
},
};
MxGatewaySessionException exception = Assert.Throws<MxGatewaySessionException>(
reply.EnsureProtocolSuccess);
Assert.Equal("session-missing", exception.SessionId);
Assert.Equal(ProtocolStatusCode.SessionNotFound, exception.ProtocolStatus?.Code);
}
private static MxCommandReply ReadReplyFixture(string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
"command-replies",
fileName);
if (File.Exists(path))
{
return JsonParser.Default.Parse<MxCommandReply>(File.ReadAllText(path));
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
<ProjectReference Include="..\MxGateway.Client.Cli\MxGateway.Client.Cli.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,241 @@
using MxGateway.Client.Cli;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientCliTests
{
[Fact]
public void Run_Version_PrintsCompiledProtocolVersions()
{
using var output = new StringWriter();
using var error = new StringWriter();
var exitCode = MxGatewayClientCli.Run(["version"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("gateway-protocol=1", output.ToString());
Assert.Contains("worker-protocol=1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_VersionJson_PrintsJsonProtocolVersions()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(["version", "--json"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("\"gatewayProtocolVersion\":1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_Write_BuildsWriteCommandAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.InvokeReplies.Enqueue(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"write",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"int32",
"--value",
"123",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
MxCommandRequest request = Assert.Single(fakeClient.InvokeRequests);
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(123, request.Command.Write.Value.Int32Value);
Assert.Contains("MX_COMMAND_KIND_WRITE", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
[Fact]
public async Task RunAsync_ErrorOutput_RedactsApiKey()
{
using var output = new StringWriter();
using var error = new StringWriter();
int exitCode = await MxGatewayClientCli.RunAsync(
[
"open-session",
"--endpoint",
"http://localhost:5000",
"--api-key",
"secret-api-key",
],
output,
error,
_ => throw new InvalidOperationException("boom secret-api-key"));
Assert.Equal(1, exitCode);
Assert.DoesNotContain("secret-api-key", error.ToString());
Assert.Contains("[redacted]", error.ToString());
}
[Fact]
public async Task RunAsync_StreamEvents_WithMaxEventsStopsNonJsonOutput()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
fakeClient.Events.Add(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-events",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--session-id",
"session-fixture",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
Assert.Contains("workerSequence", output.ToString());
Assert.DoesNotContain("ON_WRITE_COMPLETE", output.ToString());
}
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new()
{
InvokeFailure = new InvalidOperationException("register failed"),
};
int exitCode = await MxGatewayClientCli.RunAsync(
[
"smoke",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--item",
"Area001.Pump001.Speed",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(1, exitCode);
CloseSessionRequest closeRequest = Assert.Single(fakeClient.CloseSessionRequests);
Assert.Equal("session-fixture", closeRequest.SessionId);
}
private sealed class FakeCliClient : IMxGatewayCliClient
{
public Queue<MxCommandReply> InvokeReplies { get; } = new();
public List<MxCommandRequest> InvokeRequests { get; } = [];
public List<CloseSessionRequest> CloseSessionRequests { get; } = [];
public List<MxEvent> Events { get; } = [];
public Exception? InvokeFailure { get; init; }
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CancellationToken cancellationToken)
{
return Task.FromResult(new OpenSessionReply
{
SessionId = "session-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
});
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CancellationToken cancellationToken)
{
CloseSessionRequests.Add(request);
return Task.FromResult(new CloseSessionReply
{
SessionId = request.SessionId,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
FinalState = SessionState.Closed,
});
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken)
{
InvokeRequests.Add(request);
if (InvokeFailure is not null)
{
throw InvokeFailure;
}
return Task.FromResult(InvokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (MxEvent gatewayEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientContractInfoTests
{
[Fact]
public void GatewayProtocolVersion_MatchesSharedContract()
{
Assert.Equal(
GatewayContractInfo.GatewayProtocolVersion,
MxGatewayClientContractInfo.GatewayProtocolVersion);
}
[Fact]
public void WorkerProtocolVersion_MatchesSharedContract()
{
Assert.Equal(
GatewayContractInfo.WorkerProtocolVersion,
MxGatewayClientContractInfo.WorkerProtocolVersion);
}
}
@@ -0,0 +1,28 @@
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientOptionsTests
{
[Fact]
public void Validate_WithAbsoluteEndpointAndApiKey_Succeeds()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
};
options.Validate();
}
[Fact]
public void Validate_WithEmptyApiKey_Throws()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "",
};
Assert.Throws<ArgumentException>(options.Validate);
}
}
@@ -0,0 +1,217 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientSessionTests
{
[Fact]
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
var call = Assert.Single(transport.OpenSessionCalls);
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
}
[Fact]
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
{
FakeGatewayTransport transport = CreateTransport();
transport.OpenSessionReply.WorkerProcessId = 1234;
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
Assert.Equal("session-fixture", session.SessionId);
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
}
[Fact]
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Register,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Register = new RegisterReply { ServerHandle = 12 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int serverHandle = await session.RegisterAsync("fixture-client");
Assert.Equal(12, serverHandle);
var call = Assert.Single(transport.InvokeCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
}
[Fact]
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.AddItem2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
Assert.Equal(34, itemHandle);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
}
[Fact]
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = new()
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = 123,
};
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
Assert.Equal(MxCommandKind.Write, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(12, request.Command.Write.ServerHandle);
Assert.Equal(34, request.Command.Write.ItemHandle);
Assert.Same(value, request.Command.Write.Value);
Assert.Equal(56, request.Command.Write.UserId);
}
[Fact]
public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = 123.ToMxValue();
MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue();
MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56);
Assert.Equal(MxCommandKind.Write2, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write2, request.Command.Kind);
Assert.Equal(12, request.Command.Write2.ServerHandle);
Assert.Equal(34, request.Command.Write2.ItemHandle);
Assert.Same(value, request.Command.Write2.Value);
Assert.Same(timestampValue, request.Command.Write2.TimestampValue);
Assert.Equal(56, request.Command.Write2.UserId);
}
[Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
List<ulong> sequences = [];
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
{
sequences.Add(gatewayEvent.WorkerSequence);
}
Assert.Equal([1UL, 2UL], sequences);
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
Assert.Equal("session-fixture", request.SessionId);
}
[Fact]
public async Task CloseAsync_IsExplicitAndIdempotent()
{
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
CloseSessionReply first = await session.CloseAsync();
CloseSessionReply second = await session.CloseAsync();
Assert.Same(first, second);
var call = Assert.Single(transport.CloseSessionCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
}
[Fact]
public async Task InvokeHelpers_PassCancellationTokenToTransport()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Advise,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
await session.AdviseAsync(12, 34, cancellation.Token);
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
}
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
{
return new MxGatewayClient(transport.Options, transport);
}
private static FakeGatewayTransport CreateTransport()
{
return new FakeGatewayTransport(new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
});
}
}
@@ -0,0 +1,18 @@
namespace MxGateway.Client.Tests;
public sealed class MxGatewayGeneratedContractTests
{
[Fact]
public async Task GeneratedGrpcClient_CanBeConstructedFromClientFactory()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
};
await using var client = MxGatewayClient.Create(options);
Assert.NotNull(client.RawClient);
}
}
@@ -0,0 +1,57 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxStatusProxyExtensionsTests
{
[Fact]
public void FixtureStatuses_ProjectSuccessAndPreserveRawFields()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"statuses",
"status-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
MxStatusProxy status = JsonParser.Default.Parse<MxStatusProxy>(
testCase.GetProperty("status").GetRawText());
int success = testCase.GetProperty("status").GetProperty("success").GetInt32();
Assert.Equal(success != 0 && status.Category is MxStatusCategory.Ok, status.IsSuccess());
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawCategory").GetInt32(),
status.RawCategory);
Assert.Equal(
testCase.GetProperty("status").GetProperty("rawDetectedBy").GetInt32(),
status.RawDetectedBy);
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
@@ -0,0 +1,79 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxValueExtensionsTests
{
[Fact]
public void ToMxValue_WithScalarValues_CreatesTypedProtobufValues()
{
Assert.Equal(MxValue.KindOneofCase.BoolValue, true.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int32Value, 123.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.Int64Value, 123L.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.FloatValue, 1.25F.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.DoubleValue, 2.5D.ToMxValue().KindCase);
Assert.Equal(MxValue.KindOneofCase.StringValue, "alpha".ToMxValue().KindCase);
}
[Fact]
public void ToMxValue_WithArrays_CreatesTypedArrayProtobufValues()
{
MxValue value = new[] { "alpha", "beta" }.ToMxValue();
Assert.Equal(MxValue.KindOneofCase.ArrayValue, value.KindCase);
Assert.Equal(MxArray.ValuesOneofCase.StringValues, value.ArrayValue.ValuesCase);
Assert.Equal(["alpha", "beta"], value.ArrayValue.StringValues.Values);
Assert.Equal([2U], value.ArrayValue.Dimensions);
}
[Fact]
public void FixtureValues_ProjectExpectedKindsAndPreserveRawMetadata()
{
using JsonDocument document = JsonDocument.Parse(ReadFixture(
"values",
"value-conversion-cases.json"));
foreach (JsonElement testCase in document.RootElement.GetProperty("cases").EnumerateArray())
{
string expectedKind = testCase.GetProperty("expectedKind").GetString()!;
MxValue value = JsonParser.Default.Parse<MxValue>(
testCase.GetProperty("value").GetRawText());
Assert.Equal(expectedKind, value.GetProjectionKind());
if (testCase.GetProperty("id").GetString() is "raw-fallback.variant")
{
Assert.Equal(32767, value.RawDataType);
Assert.Equal([1, 2, 3, 4, 5], Assert.IsType<byte[]>(value.ToClrValue()));
}
}
}
private static string ReadFixture(string category, string fileName)
{
DirectoryInfo directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
string path = Path.Combine(
directory.FullName,
"clients",
"proto",
"fixtures",
"behavior",
category,
fileName);
if (File.Exists(path))
{
return File.ReadAllText(path);
}
directory = directory.Parent!;
}
throw new FileNotFoundException(fileName);
}
}
+76
View File
@@ -0,0 +1,76 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client", "MxGateway.Client\MxGateway.Client.csproj", "{7CF9ED88-1F32-4040-BEB1-D0902E304C70}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Contracts", "..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj", "{9AB807A8-0469-40F7-A000-D240F36B6E5D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Cli", "MxGateway.Client.Cli\MxGateway.Client.Cli.csproj", "{EB061E77-2475-4322-9257-3F2456DD141C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Tests", "MxGateway.Client.Tests\MxGateway.Client.Tests.csproj", "{B77B5A8E-0C53-4419-9BCD-227C9753A074}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.Build.0 = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.Build.0 = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
@@ -0,0 +1,117 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal sealed class GrpcMxGatewayClientTransport(
MxGatewayClientOptions options,
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
{
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
public async Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
? cancellationToken
: callOptions.CancellationToken;
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
IAsyncStreamReader<MxEvent> responseStream = call.ResponseStream;
while (true)
{
MxEvent? gatewayEvent;
try
{
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
{
break;
}
gatewayEvent = responseStream.Current;
}
catch (RpcException exception)
{
throw MapRpcException(exception);
}
yield return gatewayEvent;
}
}
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
return StreamEventsAsync(request, callOptions);
}
private static MxGatewayException MapRpcException(RpcException exception)
{
return exception.StatusCode switch
{
StatusCode.Unauthenticated => new MxGatewayAuthenticationException(
exception.Status.Detail,
innerException: exception),
StatusCode.PermissionDenied => new MxGatewayAuthorizationException(
exception.Status.Detail,
innerException: exception),
_ => new MxGatewayException(exception.Status.Detail, exception),
};
}
}
@@ -0,0 +1,27 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal interface IMxGatewayClientTransport
{
MxGatewayClientOptions Options { get; }
MxAccessGateway.MxAccessGatewayClient? RawClient { get; }
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions);
}
@@ -0,0 +1,24 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxAccessException : MxGatewayCommandException
{
public MxAccessException(
string message,
MxCommandReply reply,
Exception? innerException = null)
: base(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
reply.HasHresult ? reply.Hresult : null,
reply.Statuses.ToArray(),
innerException)
{
Reply = reply;
}
public MxCommandReply Reply { get; }
}
@@ -0,0 +1,96 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxCommandReplyExtensions
{
public static MxCommandReply EnsureProtocolSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
ProtocolStatusCode code = reply.ProtocolStatus?.Code
?? ProtocolStatusCode.Unspecified;
if (code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure)
{
return reply;
}
throw CreateProtocolException(reply, code);
}
public static MxCommandReply EnsureMxAccessSuccess(this MxCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
bool mxAccessFailure = reply.ProtocolStatus?.Code is ProtocolStatusCode.MxaccessFailure;
bool hResultFailure = reply.HasHresult && reply.Hresult != 0;
bool statusFailure = reply.Statuses.Any(status => !status.IsSuccess());
if (!mxAccessFailure && !hResultFailure && !statusFailure)
{
return reply;
}
throw new MxAccessException(CreateMxAccessMessage(reply), reply);
}
private static MxGatewayException CreateProtocolException(
MxCommandReply reply,
ProtocolStatusCode code)
{
string message = CreateProtocolMessage(reply);
int? hResult = reply.HasHresult ? reply.Hresult : null;
MxStatusProxy[] statuses = reply.Statuses.ToArray();
return code switch
{
ProtocolStatusCode.SessionNotFound or ProtocolStatusCode.SessionNotReady
=> new MxGatewaySessionException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
ProtocolStatusCode.WorkerUnavailable
=> new MxGatewayWorkerException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
_
=> new MxGatewayCommandException(
message,
reply.SessionId,
reply.CorrelationId,
reply.ProtocolStatus,
hResult,
statuses),
};
}
private static string CreateProtocolMessage(MxCommandReply reply)
{
string statusMessage = string.IsNullOrWhiteSpace(reply.ProtocolStatus?.Message)
? "Gateway protocol failure."
: reply.ProtocolStatus.Message;
return $"{statusMessage} code={reply.ProtocolStatus?.Code}; session={reply.SessionId}; correlation={reply.CorrelationId}";
}
private static string CreateMxAccessMessage(MxCommandReply reply)
{
string statusSummary = reply.Statuses.Count is 0
? "no MXSTATUS_PROXY entries"
: string.Join("; ", reply.Statuses.Select(status => status.ToDiagnosticSummary()));
string hResult = reply.HasHresult
? $"0x{reply.Hresult:X8}"
: "none";
return $"MXAccess command failed. kind={reply.Kind}; hresult={hResult}; statuses={statusSummary}";
}
}
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.76.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
</ItemGroup>
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthenticationException : MxGatewayException
{
public MxGatewayAuthenticationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayAuthorizationException : MxGatewayException
{
public MxGatewayAuthorizationException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,143 @@
using Grpc.Core;
using Grpc.Net.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
/// </summary>
public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private readonly IMxGatewayClientTransport _transport;
private bool _disposed;
internal MxGatewayClient(
MxGatewayClientOptions options,
IMxGatewayClientTransport transport)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
Options = options;
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_channel = null!;
}
private MxGatewayClient(
GrpcChannel channel,
IMxGatewayClientTransport transport)
{
_channel = channel;
_transport = transport;
Options = transport.Options;
}
public MxGatewayClientOptions Options { get; }
public MxAccessGateway.MxAccessGatewayClient RawClient =>
_transport.RawClient
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
public static MxGatewayClient Create(MxGatewayClientOptions options)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
var channel = GrpcChannel.ForAddress(
options.Endpoint,
new GrpcChannelOptions
{
LoggerFactory = options.LoggerFactory,
});
return new MxGatewayClient(
channel,
new GrpcMxGatewayClientTransport(
options,
new MxAccessGateway.MxAccessGatewayClient(channel)));
}
public async Task<MxGatewaySession> OpenSessionAsync(
OpenSessionRequest? request = null,
CancellationToken cancellationToken = default)
{
OpenSessionReply reply = await OpenSessionRawAsync(
request ?? new OpenSessionRequest(),
cancellationToken)
.ConfigureAwait(false);
return new MxGatewaySession(this, reply);
}
public Task<OpenSessionReply> OpenSessionRawAsync(
OpenSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<CloseSessionReply> CloseSessionRawAsync(
CloseSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken));
}
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
return ValueTask.CompletedTask;
}
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
{
Metadata headers = new()
{
{ "authorization", $"Bearer {Options.ApiKey}" },
};
return new CallOptions(
headers,
DateTime.UtcNow.Add(Options.DefaultCallTimeout),
cancellationToken);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
}
@@ -0,0 +1,15 @@
using MxGateway.Contracts;
namespace MxGateway.Client;
/// <summary>
/// Exposes the protocol versions compiled into this client package.
/// </summary>
public static class MxGatewayClientContractInfo
{
public const uint GatewayProtocolVersion =
GatewayContractInfo.GatewayProtocolVersion;
public const uint WorkerProtocolVersion =
GatewayContractInfo.WorkerProtocolVersion;
}
@@ -0,0 +1,58 @@
using Microsoft.Extensions.Logging;
namespace MxGateway.Client;
/// <summary>
/// Configures the gRPC channel used by the .NET MXAccess Gateway client.
/// </summary>
public sealed class MxGatewayClientOptions
{
public required Uri Endpoint { get; init; }
public required string ApiKey { get; init; }
public bool UseTls { get; init; }
public string? CaCertificatePath { get; init; }
public string? ServerNameOverride { get; init; }
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30);
public ILoggerFactory? LoggerFactory { get; init; }
public void Validate()
{
ArgumentNullException.ThrowIfNull(Endpoint);
if (!Endpoint.IsAbsoluteUri)
{
throw new ArgumentException(
"The gateway endpoint must be an absolute URI.",
nameof(Endpoint));
}
if (string.IsNullOrWhiteSpace(ApiKey))
{
throw new ArgumentException(
"The gateway API key must not be empty.",
nameof(ApiKey));
}
if (ConnectTimeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(ConnectTimeout),
"The connect timeout must be greater than zero.");
}
if (DefaultCallTimeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(DefaultCallTimeout),
"The default call timeout must be greater than zero.");
}
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayCommandException : MxGatewayException
{
public MxGatewayCommandException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,45 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public class MxGatewayException : Exception
{
public MxGatewayException(string message)
: base(message)
{
Statuses = [];
}
public MxGatewayException(string message, Exception? innerException)
: base(message, innerException)
{
Statuses = [];
}
public MxGatewayException(
string message,
string? sessionId,
string? correlationId,
ProtocolStatus? protocolStatus,
int? hResult,
IReadOnlyList<MxStatusProxy> statuses,
Exception? innerException = null)
: base(message, innerException)
{
SessionId = sessionId;
CorrelationId = correlationId;
ProtocolStatus = protocolStatus;
HResultCode = hResult;
Statuses = statuses;
}
public string? SessionId { get; }
public string? CorrelationId { get; }
public ProtocolStatus? ProtocolStatus { get; }
public int? HResultCode { get; }
public IReadOnlyList<MxStatusProxy> Statuses { get; }
}
@@ -0,0 +1,300 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Represents one gateway-backed MXAccess session.
/// </summary>
public sealed class MxGatewaySession : IAsyncDisposable
{
private readonly MxGatewayClient _client;
private readonly SemaphoreSlim _closeLock = new(1, 1);
private CloseSessionReply? _closeReply;
internal MxGatewaySession(
MxGatewayClient client,
OpenSessionReply openSessionReply)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
}
public string SessionId => OpenSessionReply.SessionId;
public OpenSessionReply OpenSessionReply { get; }
public async Task<CloseSessionReply> CloseAsync(CancellationToken cancellationToken = default)
{
if (_closeReply is not null)
{
return _closeReply;
}
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_closeReply is not null)
{
return _closeReply;
}
_closeReply = await _client.CloseSessionRawAsync(
new CloseSessionRequest { SessionId = SessionId },
cancellationToken)
.ConfigureAwait(false);
return _closeReply;
}
finally
{
_closeLock.Release();
}
}
public async Task<int> RegisterAsync(
string clientName,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> RegisterRawAsync(
string clientName,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = clientName },
},
cancellationToken);
}
public async Task<int> AddItemAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItemRawAsync(
serverHandle,
itemDefinition,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItemRawAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
},
},
cancellationToken);
}
public async Task<int> AddItem2Async(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItem2RawAsync(
serverHandle,
itemDefinition,
itemContext,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItem2RawAsync(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
ItemContext = itemContext ?? string.Empty,
},
},
cancellationToken);
}
public async Task AdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> AdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
public async Task WriteAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> WriteRawAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
UserId = userId,
},
},
cancellationToken);
}
public async Task Write2Async(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await Write2RawAsync(
serverHandle,
itemHandle,
value,
timestampValue,
userId,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> Write2RawAsync(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
ArgumentNullException.ThrowIfNull(timestampValue);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
TimestampValue = timestampValue,
UserId = userId,
},
},
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return _client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
ulong afterWorkerSequence = 0,
CancellationToken cancellationToken = default)
{
return _client.StreamEventsAsync(
new StreamEventsRequest
{
SessionId = SessionId,
AfterWorkerSequence = afterWorkerSequence,
},
cancellationToken);
}
public async ValueTask DisposeAsync()
{
await CloseAsync().ConfigureAwait(false);
_closeLock.Dispose();
}
private Task<MxCommandReply> InvokeCommandAsync(
MxCommand command,
CancellationToken cancellationToken)
{
return _client.InvokeAsync(
new MxCommandRequest
{
SessionId = SessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
Command = command,
},
cancellationToken);
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewaySessionException : MxGatewayException
{
public MxGatewaySessionException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public sealed class MxGatewayWorkerException : MxGatewayException
{
public MxGatewayWorkerException(
string message,
string? sessionId = null,
string? correlationId = null,
ProtocolStatus? protocolStatus = null,
int? hResult = null,
IReadOnlyList<MxStatusProxy>? statuses = null,
Exception? innerException = null)
: base(
message,
sessionId,
correlationId,
protocolStatus,
hResult,
statuses ?? [],
innerException)
{
}
}
@@ -0,0 +1,25 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
public static class MxStatusProxyExtensions
{
public static bool IsSuccess(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
return status.Success != 0
&& status.Category is MxStatusCategory.Ok;
}
public static string ToDiagnosticSummary(this MxStatusProxy status)
{
ArgumentNullException.ThrowIfNull(status);
string diagnosticText = string.IsNullOrWhiteSpace(status.DiagnosticText)
? "no diagnostic text"
: status.DiagnosticText;
return $"{status.Category} by {status.DetectedBy}; detail={status.Detail}; {diagnosticText}";
}
}
@@ -0,0 +1,284 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Creates and projects gateway MXAccess values without hiding the raw
/// protobuf value carried by command replies and events.
/// </summary>
public static class MxValueExtensions
{
public static MxValue ToMxValue(this bool value)
{
return new MxValue
{
DataType = MxDataType.Boolean,
VariantType = "VT_BOOL",
BoolValue = value,
};
}
public static MxValue ToMxValue(this int value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = value,
};
}
public static MxValue ToMxValue(this long value)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = "VT_I8",
Int64Value = value,
};
}
public static MxValue ToMxValue(this float value)
{
return new MxValue
{
DataType = MxDataType.Float,
VariantType = "VT_R4",
FloatValue = value,
};
}
public static MxValue ToMxValue(this double value)
{
return new MxValue
{
DataType = MxDataType.Double,
VariantType = "VT_R8",
DoubleValue = value,
};
}
public static MxValue ToMxValue(this string value)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.String,
VariantType = "VT_BSTR",
StringValue = value,
};
}
public static MxValue ToMxValue(this DateTimeOffset value)
{
return new MxValue
{
DataType = MxDataType.Time,
VariantType = "VT_DATE",
TimestampValue = Timestamp.FromDateTimeOffset(value),
};
}
public static MxValue ToMxValue(this DateTime value)
{
return new DateTimeOffset(
value.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(value, DateTimeKind.Utc)
: value.ToUniversalTime())
.ToMxValue();
}
public static MxValue ToMxValue(this IReadOnlyList<bool> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new BoolArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Boolean, "VT_ARRAY|VT_BOOL", values.Count, new MxArray
{
ElementDataType = MxDataType.Boolean,
VariantType = "VT_ARRAY|VT_BOOL",
BoolValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<int> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int32Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I4", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I4",
Int32Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<long> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new Int64Array();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Integer, "VT_ARRAY|VT_I8", values.Count, new MxArray
{
ElementDataType = MxDataType.Integer,
VariantType = "VT_ARRAY|VT_I8",
Int64Values = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<float> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new FloatArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Float, "VT_ARRAY|VT_R4", values.Count, new MxArray
{
ElementDataType = MxDataType.Float,
VariantType = "VT_ARRAY|VT_R4",
FloatValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<double> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new DoubleArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.Double, "VT_ARRAY|VT_R8", values.Count, new MxArray
{
ElementDataType = MxDataType.Double,
VariantType = "VT_ARRAY|VT_R8",
DoubleValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<string> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new StringArray();
array.Values.Add(values);
return CreateArrayValue(MxDataType.String, "VT_ARRAY|VT_BSTR", values.Count, new MxArray
{
ElementDataType = MxDataType.String,
VariantType = "VT_ARRAY|VT_BSTR",
StringValues = array,
});
}
public static MxValue ToMxValue(this IReadOnlyList<DateTimeOffset> values)
{
ArgumentNullException.ThrowIfNull(values);
var array = new TimestampArray();
array.Values.Add(values.Select(Timestamp.FromDateTimeOffset));
return CreateArrayValue(MxDataType.Time, "VT_ARRAY|VT_DATE", values.Count, new MxArray
{
ElementDataType = MxDataType.Time,
VariantType = "VT_ARRAY|VT_DATE",
TimestampValues = array,
});
}
public static string GetProjectionKind(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => "boolValue",
MxValue.KindOneofCase.Int32Value => "int32Value",
MxValue.KindOneofCase.Int64Value => "int64Value",
MxValue.KindOneofCase.FloatValue => "floatValue",
MxValue.KindOneofCase.DoubleValue => "doubleValue",
MxValue.KindOneofCase.StringValue => "stringValue",
MxValue.KindOneofCase.TimestampValue => "timestampValue",
MxValue.KindOneofCase.ArrayValue => "arrayValue",
MxValue.KindOneofCase.RawValue => "rawValue",
_ => value.IsNull ? "nullValue" : "unspecified",
};
}
public static object? ToClrValue(this MxValue value)
{
ArgumentNullException.ThrowIfNull(value);
return value.KindCase switch
{
MxValue.KindOneofCase.BoolValue => value.BoolValue,
MxValue.KindOneofCase.Int32Value => value.Int32Value,
MxValue.KindOneofCase.Int64Value => value.Int64Value,
MxValue.KindOneofCase.FloatValue => value.FloatValue,
MxValue.KindOneofCase.DoubleValue => value.DoubleValue,
MxValue.KindOneofCase.StringValue => value.StringValue,
MxValue.KindOneofCase.TimestampValue => value.TimestampValue.ToDateTimeOffset(),
MxValue.KindOneofCase.ArrayValue => value.ArrayValue.ToClrArrayValue(),
MxValue.KindOneofCase.RawValue => value.RawValue.ToByteArray(),
_ => value.IsNull ? null : value,
};
}
public static object? ToClrArrayValue(this MxArray array)
{
ArgumentNullException.ThrowIfNull(array);
return array.ValuesCase switch
{
MxArray.ValuesOneofCase.BoolValues => array.BoolValues.Values.ToArray(),
MxArray.ValuesOneofCase.Int32Values => array.Int32Values.Values.ToArray(),
MxArray.ValuesOneofCase.Int64Values => array.Int64Values.Values.ToArray(),
MxArray.ValuesOneofCase.FloatValues => array.FloatValues.Values.ToArray(),
MxArray.ValuesOneofCase.DoubleValues => array.DoubleValues.Values.ToArray(),
MxArray.ValuesOneofCase.StringValues => array.StringValues.Values.ToArray(),
MxArray.ValuesOneofCase.TimestampValues => array.TimestampValues.Values
.Select(timestamp => timestamp.ToDateTimeOffset())
.ToArray(),
MxArray.ValuesOneofCase.RawValues => array.RawValues.Values
.Select(value => value.ToByteArray())
.ToArray(),
_ => null,
};
}
public static MxValue ToRawMxValue(
byte[] value,
string variantType,
string rawDiagnostic,
int rawDataType = 0)
{
ArgumentNullException.ThrowIfNull(value);
return new MxValue
{
DataType = MxDataType.Unknown,
VariantType = variantType,
RawDiagnostic = rawDiagnostic,
RawDataType = rawDataType,
RawValue = ByteString.CopyFrom(value),
};
}
private static MxValue CreateArrayValue(
MxDataType dataType,
string variantType,
int length,
MxArray array)
{
array.Dimensions.Add((uint)length);
return new MxValue
{
DataType = dataType,
VariantType = variantType,
ArrayValue = array,
};
}
}
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("MxGateway.Client.Tests")]
+111
View File
@@ -0,0 +1,111 @@
# .NET Client Projects
The .NET client workspace contains the MXAccess Gateway client library, test
CLI, and unit tests.
## Projects
| Project | Purpose |
|---------|---------|
| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. |
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. |
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
the client compiles against the same generated protobuf and gRPC types as the
gateway. `clients/dotnet/generated` remains reserved for generator output if a
future client build switches to client-local `Grpc.Tools` generation.
## Build And Test
```powershell
dotnet build clients/dotnet/MxGateway.Client.sln
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
```
## Client Usage
`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key
to every unary and streaming call as `authorization: Bearer <api-key>`.
Cancellation tokens passed to the public methods flow to the generated gRPC
call. Client-side cancellation stops waiting for the gateway response; it does
not abort an MXAccess COM call that is already executing inside a worker.
```csharp
await using MxGatewayClient client = MxGatewayClient.Create(
new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = apiKey,
});
MxGatewaySession session = await client.OpenSessionAsync();
try
{
int serverHandle = await session.RegisterAsync("sample-client");
int itemHandle = await session.AddItemAsync(
serverHandle,
"Area001.Pump001.Speed");
await session.AdviseAsync(serverHandle, itemHandle);
}
finally
{
await session.CloseAsync();
}
```
Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and
`StreamEventsAsync` when tests or parity tools need direct generated protobuf
messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply
available, and command helpers have `*RawAsync` variants when callers need the
complete `MxCommandReply`.
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
the first `CloseSessionReply` instead of sending another close request.
## Values, Status, And Errors
The client provides extension helpers for generated protobuf values. Use
`ToMxValue()` on .NET scalar values and typed arrays to create `MxValue`
instances for `Write` and `Write2`. Use `ToClrValue()` and
`GetProjectionKind()` when test or diagnostic code needs to inspect generated
`MxValue` replies while preserving `rawDiagnostic`, raw data type fields, and
raw byte payloads.
`MxStatusProxy.IsSuccess()` and `ToDiagnosticSummary()` expose MXAccess status
arrays without collapsing them into a single gateway success flag. Command
reply helpers follow the same split:
```csharp
reply.EnsureProtocolSuccess();
reply.EnsureMxAccessSuccess();
```
`EnsureProtocolSuccess()` raises gateway, session, worker, or command
exceptions for gateway-level failures. It leaves
`PROTOCOL_STATUS_CODE_MXACCESS_FAILURE` to `EnsureMxAccessSuccess()` so callers
can keep the full `MxCommandReply`, HRESULT, and status array when MXAccess
itself rejects a command. `MxAccessException.Reply` contains the raw generated
reply.
## CLI Usage
The test CLI supports deterministic JSON output for automation:
```powershell
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- version --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- open-session --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- register --session-id <id> --client-name mxgw-dotnet-cli --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- add-item --session-id <id> --server-handle 1 --item Area001.Pump001.Speed --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- advise --session-id <id> --server-handle 1 --item-handle 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- write2 --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --timestamp 2026-01-01T00:00:00Z --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- stream-events --session-id <id> --max-events 1 --json
dotnet run --project clients/dotnet/MxGateway.Client.Cli -- smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item Area001.Pump001.Speed --json
```
`smoke` opens a session, registers a client, adds one item, advises it,
optionally writes a value when `--type` and `--value` are supplied, reads a
bounded event stream, and closes the session in a `finally` block. CLI error
output redacts API keys supplied through `--api-key`.
+83
View File
@@ -0,0 +1,83 @@
# Go Client
The Go client module contains the generated MXAccess Gateway protobuf bindings,
a small handwritten `mxgateway` package, and the `mxgw-go` test CLI scaffold.
The module uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
## Layout
```text
clients/go/
go.mod
generate-proto.ps1
internal/generated/
mxgateway/
cmd/mxgw-go/
```
`internal/generated` contains code produced by `protoc`, `protoc-gen-go`, and
`protoc-gen-go-grpc`. Do not edit generated files by hand.
## Regenerating Protobuf Bindings
Run generation after the shared `.proto` files or the Go output path changes:
```powershell
./generate-proto.ps1
```
The script uses the tool paths recorded in `../../docs/toolchain-links.md`.
## Build And Test
Run the Go module checks from `clients/go`:
```powershell
go test ./...
go build ./...
go vet ./...
```
The tests parse the shared JSON fixtures, exercise value and status conversion,
use `bufconn` for fake gateway auth and streaming behavior, and cover CLI JSON
redaction.
## Client API
Use `mxgateway.Dial` with `mxgateway.Options` to configure plaintext or TLS
transport, API-key metadata, dial timeout, and per-call timeout:
```go
client, err := mxgateway.Dial(ctx, mxgateway.Options{
Endpoint: "localhost:5000",
APIKey: os.Getenv("MXGATEWAY_API_KEY"),
Plaintext: true,
})
```
`Client.OpenSession` returns a `Session` with helpers for `Register`,
`AddItem`, `AddItem2`, `Advise`, `Write`, `Events`, and `Close`. Raw protobuf
messages remain available through the `mxgateway` package aliases and the
`Raw` helper methods. Typed errors support `errors.As` for `GatewayError`,
`CommandError`, and `MxAccessError`; command errors preserve the raw reply.
## CLI
The `mxgw-go` CLI emits JSON with redacted API keys for commands that connect to
the gateway:
```powershell
go run ./cmd/mxgw-go version -json
go run ./cmd/mxgw-go open-session -endpoint localhost:5000 -plaintext -json
go run ./cmd/mxgw-go register -session-id <id> -client-name mxgw-go -plaintext -json
go run ./cmd/mxgw-go add-item -session-id <id> -server-handle 1 -item Area001.Tag.Value -plaintext -json
go run ./cmd/mxgw-go advise -session-id <id> -server-handle 1 -item-handle 1 -plaintext -json
go run ./cmd/mxgw-go write -session-id <id> -server-handle 1 -item-handle 1 -type int32 -value 123 -plaintext -json
go run ./cmd/mxgw-go stream-events -session-id <id> -plaintext -json
go run ./cmd/mxgw-go smoke -item Area001.Tag.Value -plaintext -json
```
Use `-api-key-env MXGATEWAY_API_KEY` or `-api-key <key>` when authentication is
enabled. CLI output redacts the key value and never writes the raw secret.
+530
View File
@@ -0,0 +1,530 @@
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"os"
"strconv"
"time"
"gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)
type versionOutput struct {
ClientVersion string `json:"clientVersion"`
GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"`
WorkerProtocolVersion uint32 `json:"workerProtocolVersion"`
}
type commonOptions struct {
Endpoint string `json:"endpoint"`
APIKey string `json:"apiKey"`
APIKeyEnv string `json:"apiKeyEnv,omitempty"`
Plaintext bool `json:"plaintext"`
CACertFile string `json:"caCertFile,omitempty"`
ServerName string `json:"serverNameOverride,omitempty"`
CallTimeout string `json:"callTimeout,omitempty"`
apiKeyValue string
timeout time.Duration
}
type openSessionOutput struct {
Command string `json:"command"`
Options commonOptions `json:"options"`
Reply json.RawMessage `json:"reply"`
}
type commandReplyOutput struct {
Command string `json:"command"`
Options commonOptions `json:"options"`
Reply json.RawMessage `json:"reply"`
}
func main() {
if err := runWithIO(context.Background(), os.Args[1:], os.Stdout, os.Stderr); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
}
func run(args []string) error {
return runWithIO(context.Background(), args, os.Stdout, os.Stderr)
}
func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) error {
if len(args) == 0 {
writeUsage(stderr)
return errors.New("missing command")
}
switch args[0] {
case "version":
return runVersion(args[1:], stdout, stderr)
case "open-session":
return runOpenSession(ctx, args[1:], stdout, stderr)
case "close-session":
return runCloseSession(ctx, args[1:], stdout, stderr)
case "register":
return runRegister(ctx, args[1:], stdout, stderr)
case "add-item":
return runAddItem(ctx, args[1:], stdout, stderr)
case "advise":
return runAdvise(ctx, args[1:], stdout, stderr)
case "write":
return runWrite(ctx, args[1:], stdout, stderr)
case "stream-events":
return runStreamEvents(ctx, args[1:], stdout, stderr)
case "smoke":
return runSmoke(ctx, args[1:], stdout, stderr)
default:
writeUsage(stderr)
return fmt.Errorf("unknown command %q", args[0])
}
}
func runVersion(args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("version", flag.ContinueOnError)
flags.SetOutput(stderr)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
output := versionOutput{
ClientVersion: mxgateway.ClientVersion,
GatewayProtocolVersion: mxgateway.GatewayProtocolVersion,
WorkerProtocolVersion: mxgateway.WorkerProtocolVersion,
}
if *jsonOutput {
return writeJSON(stdout, output)
}
fmt.Fprintf(stdout, "mxgw-go %s\n", output.ClientVersion)
fmt.Fprintf(stdout, "gateway protocol %d\n", output.GatewayProtocolVersion)
fmt.Fprintf(stdout, "worker protocol %d\n", output.WorkerProtocolVersion)
return nil
}
func runOpenSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("open-session", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-session-name", "", "client session name")
backend := flags.String("backend", "", "requested backend")
if err := flags.Parse(args); err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.OpenSessionRaw(ctx, (&mxgateway.OpenSessionOptions{
RequestedBackend: *backend,
ClientSessionName: *clientName,
}).Request())
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, openSessionOutput{
Command: "open-session",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetSessionId())
return nil
}
func runCloseSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("close-session", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.CloseSessionRaw(ctx, &mxgateway.CloseSessionRequest{SessionId: *sessionID})
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: "close-session",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetFinalState())
return nil
}
func runRegister(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("register", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
clientName := flags.String("client-name", "", "MXAccess client name")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *clientName == "" {
return errors.New("session-id and client-name are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.RegisterRaw(ctx, *clientName)
return writeCommandOutput(stdout, *jsonOutput, "register", options, reply, err)
}
func runAddItem(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("add-item", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
item := flags.String("item", "", "item definition")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *item == "" {
return errors.New("session-id and item are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.AddItemRaw(ctx, int32(*serverHandle), *item)
return writeCommandOutput(stdout, *jsonOutput, "add-item", options, reply, err)
}
func runAdvise(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("advise", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.AdviseRaw(ctx, int32(*serverHandle), int32(*itemHandle))
return writeCommandOutput(stdout, *jsonOutput, "advise", options, reply, err)
}
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("write", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
valueText := flags.String("value", "", "value text")
userID := flags.Int("user-id", 0, "MXAccess user id")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
value, err := parseValue(*valueType, *valueText)
if err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.WriteRaw(ctx, int32(*serverHandle), int32(*itemHandle), value, int32(*userID))
return writeCommandOutput(stdout, *jsonOutput, "write", options, reply, err)
}
func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("stream-events", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
after := flags.Uint64("after-worker-sequence", 0, "first worker sequence to read after")
limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, _, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
streamCtx, cancelStream := context.WithCancel(ctx)
defer cancelStream()
events, err := session.EventsAfter(streamCtx, *after)
if err != nil {
return err
}
count := 0
for result := range events {
if result.Err != nil {
return result.Err
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(result.Event)))
} else {
fmt.Fprintf(stdout, "%d %s\n", result.Event.GetWorkerSequence(), result.Event.GetFamily())
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
return nil
}
}
return nil
}
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-name", "mxgw-go-smoke", "MXAccess client name")
item := flags.String("item", "", "item definition")
if err := flags.Parse(args); err != nil {
return err
}
if *item == "" {
return errors.New("item is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
if err != nil {
return err
}
defer session.Close(context.Background())
serverHandle, err := session.Register(ctx, *clientName)
if err != nil {
return err
}
itemHandle, err := session.AddItem(ctx, serverHandle, *item)
if err != nil {
return err
}
if err := session.Advise(ctx, serverHandle, itemHandle); err != nil {
return err
}
output := map[string]any{
"command": "smoke",
"options": options,
"sessionId": session.ID(),
"serverHandle": serverHandle,
"itemHandle": itemHandle,
}
if *jsonOutput {
return writeJSON(stdout, output)
}
fmt.Fprintf(stdout, "session=%s server=%d item=%d\n", session.ID(), serverHandle, itemHandle)
return nil
}
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
common := &commonOptions{}
flags.StringVar(&common.Endpoint, "endpoint", "localhost:5000", "gateway endpoint")
flags.StringVar(&common.APIKey, "api-key", "", "gateway API key")
flags.StringVar(&common.APIKeyEnv, "api-key-env", "MXGATEWAY_API_KEY", "environment variable containing the API key")
flags.BoolVar(&common.Plaintext, "plaintext", false, "use plaintext transport")
flags.StringVar(&common.CACertFile, "ca-cert", "", "CA certificate file")
flags.StringVar(&common.ServerName, "server-name-override", "", "TLS server name override")
flags.StringVar(&common.CallTimeout, "call-timeout", "30s", "per-call timeout")
return common
}
func dialForCommand(ctx context.Context, common *commonOptions) (*mxgateway.Client, commonOptions, error) {
options, err := common.resolved()
if err != nil {
return nil, options, err
}
client, err := mxgateway.Dial(ctx, mxgateway.Options{
Endpoint: options.Endpoint,
APIKey: options.apiKeyValue,
Plaintext: options.Plaintext,
CACertFile: options.CACertFile,
ServerNameOverride: options.ServerName,
CallTimeout: options.timeout,
})
return client, options, err
}
func (o *commonOptions) resolved() (commonOptions, error) {
resolved := *o
if resolved.APIKey == "" && resolved.APIKeyEnv != "" {
resolved.apiKeyValue = os.Getenv(resolved.APIKeyEnv)
} else {
resolved.apiKeyValue = resolved.APIKey
}
resolved.APIKey = mxgateway.RedactAPIKey(resolved.apiKeyValue)
if resolved.CallTimeout != "" {
timeout, err := time.ParseDuration(resolved.CallTimeout)
if err != nil {
return resolved, err
}
resolved.timeout = timeout
}
return resolved, nil
}
func parseValue(valueType, valueText string) (*mxgateway.MxValue, error) {
switch valueType {
case "bool":
value, err := strconv.ParseBool(valueText)
if err != nil {
return nil, err
}
return mxgateway.BoolValue(value), nil
case "int32":
value, err := strconv.ParseInt(valueText, 10, 32)
if err != nil {
return nil, err
}
return mxgateway.Int32Value(int32(value)), nil
case "int64":
value, err := strconv.ParseInt(valueText, 10, 64)
if err != nil {
return nil, err
}
return mxgateway.Int64Value(value), nil
case "float":
value, err := strconv.ParseFloat(valueText, 32)
if err != nil {
return nil, err
}
return mxgateway.FloatValue(float32(value)), nil
case "double":
value, err := strconv.ParseFloat(valueText, 64)
if err != nil {
return nil, err
}
return mxgateway.DoubleValue(value), nil
case "string":
return mxgateway.StringValue(valueText), nil
default:
return nil, fmt.Errorf("unsupported value type %q", valueType)
}
}
func writeCommandOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, reply *mxgateway.MxCommandReply, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: command,
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetKind())
return nil
}
func writeJSON(writer io.Writer, value any) error {
encoder := json.NewEncoder(writer)
encoder.SetIndent("", " ")
return encoder.Encode(value)
}
func mustMarshalProto(message protojsonMessage) json.RawMessage {
data, err := protojson.MarshalOptions{UseProtoNames: false}.Marshal(message)
if err != nil {
panic(err)
}
return data
}
type protojsonMessage interface {
ProtoReflect() protoreflect.Message
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|write|stream-events|smoke>")
}
+58
View File
@@ -0,0 +1,58 @@
package main
import (
"bytes"
"encoding/json"
"strings"
"testing"
)
func TestRunVersionJSON(t *testing.T) {
var stdout bytes.Buffer
var stderr bytes.Buffer
if err := runWithIO(t.Context(), []string{"version", "-json"}, &stdout, &stderr); err != nil {
t.Fatalf("runWithIO() error = %v; stderr = %s", err, stderr.String())
}
var output versionOutput
if err := json.Unmarshal(stdout.Bytes(), &output); err != nil {
t.Fatalf("parse JSON: %v", err)
}
if output.GatewayProtocolVersion == 0 || output.WorkerProtocolVersion == 0 {
t.Fatalf("protocol versions were not populated: %+v", output)
}
}
func TestCommonOptionsRedactsAPIKey(t *testing.T) {
options, err := (&commonOptions{
Endpoint: "localhost:5000",
APIKey: "mxgw_super_secret",
Plaintext: true,
CallTimeout: "2s",
}).resolved()
if err != nil {
t.Fatalf("resolved() error = %v", err)
}
data, err := json.Marshal(options)
if err != nil {
t.Fatalf("marshal options: %v", err)
}
if strings.Contains(string(data), "super_secret") {
t.Fatalf("redacted JSON leaked API key: %s", data)
}
if !strings.Contains(string(data), "mxgw") {
t.Fatalf("redacted JSON did not preserve key shape: %s", data)
}
}
func TestParseValueBuildsTypedValue(t *testing.T) {
value, err := parseValue("int32", "123")
if err != nil {
t.Fatalf("parseValue() error = %v", err)
}
if got := value.GetInt32Value(); got != 123 {
t.Fatalf("int32 value = %d, want 123", got)
}
}
+42
View File
@@ -0,0 +1,42 @@
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
$outputRoot = Join-Path $PSScriptRoot 'internal\generated'
$modulePath = 'gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated'
$protoc = 'C:\Users\dohertj2\AppData\Local\Microsoft\WinGet\Packages\Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe\bin\protoc.exe'
$goPluginPath = 'C:\Users\dohertj2\go\bin'
if (-not (Test-Path $protoc)) {
throw "protoc was not found at $protoc. See docs/toolchain-links.md."
}
foreach ($pluginName in @('protoc-gen-go.exe', 'protoc-gen-go-grpc.exe')) {
$pluginPath = Join-Path $goPluginPath $pluginName
if (-not (Test-Path $pluginPath)) {
throw "$pluginName was not found at $pluginPath. See docs/toolchain-links.md."
}
}
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
Get-ChildItem -Path $outputRoot -Filter '*.pb.go' -File | Remove-Item
$env:Path = "$goPluginPath;$env:Path"
& $protoc `
--proto_path=$protoRoot `
--go_out=$outputRoot `
--go_opt=paths=source_relative `
"--go_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
"--go_opt=Mmxaccess_worker.proto=$modulePath;generated" `
mxaccess_gateway.proto `
mxaccess_worker.proto
& $protoc `
--proto_path=$protoRoot `
--go-grpc_out=$outputRoot `
--go-grpc_opt=paths=source_relative `
"--go-grpc_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
mxaccess_gateway.proto
+15
View File
@@ -0,0 +1,15 @@
module gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go
go 1.26
require (
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
)
require (
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
)
+38
View File
@@ -0,0 +1,38 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+1
View File
@@ -0,0 +1 @@
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,243 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc v7.34.1
// source: mxaccess_gateway.proto
package generated
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
MxAccessGateway_OpenSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/OpenSession"
MxAccessGateway_CloseSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/CloseSession"
MxAccessGateway_Invoke_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/Invoke"
MxAccessGateway_StreamEvents_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/StreamEvents"
)
// MxAccessGatewayClient is the client API for MxAccessGateway service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// Public client API for MXAccess sessions hosted by the gateway.
type MxAccessGatewayClient interface {
OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error)
CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error)
Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error)
StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error)
}
type mxAccessGatewayClient struct {
cc grpc.ClientConnInterface
}
func NewMxAccessGatewayClient(cc grpc.ClientConnInterface) MxAccessGatewayClient {
return &mxAccessGatewayClient{cc}
}
func (c *mxAccessGatewayClient) OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(OpenSessionReply)
err := c.cc.Invoke(ctx, MxAccessGateway_OpenSession_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CloseSessionReply)
err := c.cc.Invoke(ctx, MxAccessGateway_CloseSession_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(MxCommandReply)
err := c.cc.Invoke(ctx, MxAccessGateway_Invoke_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &MxAccessGateway_ServiceDesc.Streams[0], MxAccessGateway_StreamEvents_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamEventsRequest, MxEvent]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type MxAccessGateway_StreamEventsClient = grpc.ServerStreamingClient[MxEvent]
// MxAccessGatewayServer is the server API for MxAccessGateway service.
// All implementations must embed UnimplementedMxAccessGatewayServer
// for forward compatibility.
//
// Public client API for MXAccess sessions hosted by the gateway.
type MxAccessGatewayServer interface {
OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error)
CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error)
Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error)
StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error
mustEmbedUnimplementedMxAccessGatewayServer()
}
// UnimplementedMxAccessGatewayServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedMxAccessGatewayServer struct{}
func (UnimplementedMxAccessGatewayServer) OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error) {
return nil, status.Error(codes.Unimplemented, "method OpenSession not implemented")
}
func (UnimplementedMxAccessGatewayServer) CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error) {
return nil, status.Error(codes.Unimplemented, "method CloseSession not implemented")
}
func (UnimplementedMxAccessGatewayServer) Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error) {
return nil, status.Error(codes.Unimplemented, "method Invoke not implemented")
}
func (UnimplementedMxAccessGatewayServer) StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error {
return status.Error(codes.Unimplemented, "method StreamEvents not implemented")
}
func (UnimplementedMxAccessGatewayServer) mustEmbedUnimplementedMxAccessGatewayServer() {}
func (UnimplementedMxAccessGatewayServer) testEmbeddedByValue() {}
// UnsafeMxAccessGatewayServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to MxAccessGatewayServer will
// result in compilation errors.
type UnsafeMxAccessGatewayServer interface {
mustEmbedUnimplementedMxAccessGatewayServer()
}
func RegisterMxAccessGatewayServer(s grpc.ServiceRegistrar, srv MxAccessGatewayServer) {
// If the following call panics, it indicates UnimplementedMxAccessGatewayServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&MxAccessGateway_ServiceDesc, srv)
}
func _MxAccessGateway_OpenSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(OpenSessionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).OpenSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_OpenSession_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).OpenSession(ctx, req.(*OpenSessionRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_CloseSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CloseSessionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).CloseSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_CloseSession_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).CloseSession(ctx, req.(*CloseSessionRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_Invoke_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(MxCommandRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).Invoke(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_Invoke_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).Invoke(ctx, req.(*MxCommandRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamEventsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(MxAccessGatewayServer).StreamEvents(m, &grpc.GenericServerStream[StreamEventsRequest, MxEvent]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type MxAccessGateway_StreamEventsServer = grpc.ServerStreamingServer[MxEvent]
// MxAccessGateway_ServiceDesc is the grpc.ServiceDesc for MxAccessGateway service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var MxAccessGateway_ServiceDesc = grpc.ServiceDesc{
ServiceName: "mxaccess_gateway.v1.MxAccessGateway",
HandlerType: (*MxAccessGatewayServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "OpenSession",
Handler: _MxAccessGateway_OpenSession_Handler,
},
{
MethodName: "CloseSession",
Handler: _MxAccessGateway_CloseSession_Handler,
},
{
MethodName: "Invoke",
Handler: _MxAccessGateway_Invoke_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamEvents",
Handler: _MxAccessGateway_StreamEvents_Handler,
ServerStreams: true,
},
},
Metadata: "mxaccess_gateway.proto",
}
File diff suppressed because it is too large Load Diff
+30
View File
@@ -0,0 +1,30 @@
package mxgateway
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
const authorizationHeader = "authorization"
func unaryAuthInterceptor(apiKey string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(authContext(ctx, apiKey), method, req, reply, cc, opts...)
}
}
func streamAuthInterceptor(apiKey string) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(authContext(ctx, apiKey), desc, cc, method, opts...)
}
}
func authContext(ctx context.Context, apiKey string) context.Context {
if apiKey == "" {
return ctx
}
return metadata.AppendToOutgoingContext(ctx, authorizationHeader, "Bearer "+apiKey)
}
+236
View File
@@ -0,0 +1,236 @@
package mxgateway
import (
"context"
"crypto/tls"
"errors"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/durationpb"
)
const (
defaultDialTimeout = 10 * time.Second
defaultCallTimeout = 30 * time.Second
)
// Client owns a gateway gRPC connection and exposes session-oriented helpers.
type Client struct {
conn *grpc.ClientConn
raw pb.MxAccessGatewayClient
opts Options
}
// Dial opens a gRPC connection to the gateway and configures auth metadata,
// transport security, and blocking dial cancellation from ctx.
func Dial(ctx context.Context, opts Options) (*Client, error) {
if opts.Endpoint == "" {
return nil, errors.New("mxgateway: endpoint is required")
}
dialCtx := ctx
cancel := func() {}
if opts.DialTimeout > 0 {
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
} else if _, ok := ctx.Deadline(); !ok {
dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
}
defer cancel()
transportCredentials, err := resolveTransportCredentials(opts)
if err != nil {
return nil, err
}
dialOptions := []grpc.DialOption{
grpc.WithTransportCredentials(transportCredentials),
grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)),
grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)),
grpc.WithBlock(),
}
dialOptions = append(dialOptions, opts.DialOptions...)
conn, err := grpc.DialContext(dialCtx, opts.Endpoint, dialOptions...)
if err != nil {
return nil, &GatewayError{Op: "dial", Err: err}
}
return NewClient(conn, opts), nil
}
// NewClient wraps an existing gRPC connection. The caller owns closing conn
// unless it calls Close on the returned Client.
func NewClient(conn *grpc.ClientConn, opts Options) *Client {
return &Client{
conn: conn,
raw: pb.NewMxAccessGatewayClient(conn),
opts: opts,
}
}
// RawClient returns the generated gRPC client for command-specific parity tests.
func (c *Client) RawClient() RawGatewayClient {
return c.raw
}
// OpenSession creates a gateway-backed MXAccess session.
func (c *Client) OpenSession(ctx context.Context, opts OpenSessionOptions) (*Session, error) {
reply, err := c.OpenSessionRaw(ctx, opts.Request())
if err != nil {
return nil, err
}
return newSession(c, reply), nil
}
// OpenSessionRaw sends a raw OpenSession request and validates protocol status.
func (c *Client) OpenSessionRaw(ctx context.Context, req *OpenSessionRequest) (*OpenSessionReply, error) {
if req == nil {
return nil, errors.New("mxgateway: open session request is required")
}
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.OpenSession(callCtx, req)
if err != nil {
return nil, &GatewayError{Op: "open session", Err: err}
}
if err := EnsureProtocolSuccess("open session", reply.GetProtocolStatus(), nil); err != nil {
return reply, err
}
return reply, nil
}
// Invoke sends a raw MXAccess command request and validates protocol and
// MXAccess status fields while preserving the raw reply on typed errors.
func (c *Client) Invoke(ctx context.Context, req *MxCommandRequest) (*MxCommandReply, error) {
if req == nil {
return nil, errors.New("mxgateway: command request is required")
}
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.Invoke(callCtx, req)
if err != nil {
return nil, &GatewayError{Op: "invoke", Err: err}
}
if err := EnsureProtocolSuccess("invoke", reply.GetProtocolStatus(), reply); err != nil {
return reply, err
}
if err := EnsureMxAccessSuccess("invoke", reply); err != nil {
return reply, err
}
return reply, nil
}
// CloseSessionRaw sends a raw CloseSession request and validates protocol
// status.
func (c *Client) CloseSessionRaw(ctx context.Context, req *CloseSessionRequest) (*CloseSessionReply, error) {
if req == nil {
return nil, errors.New("mxgateway: close session request is required")
}
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.CloseSession(callCtx, req)
if err != nil {
return nil, &GatewayError{Op: "close session", Err: err}
}
if err := EnsureProtocolSuccess("close session", reply.GetProtocolStatus(), nil); err != nil {
return reply, err
}
return reply, nil
}
// StreamEventsRaw starts the generated event stream for callers that need direct
// control over Recv.
func (c *Client) StreamEventsRaw(ctx context.Context, req *StreamEventsRequest) (RawEventStream, error) {
if req == nil {
return nil, errors.New("mxgateway: stream events request is required")
}
stream, err := c.raw.StreamEvents(ctx, req)
if err != nil {
return nil, &GatewayError{Op: "stream events", Err: err}
}
return stream, nil
}
// Close closes the underlying gRPC connection.
func (c *Client) Close() error {
if c == nil || c.conn == nil {
return nil
}
return c.conn.Close()
}
func (c *Client) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
timeout := c.opts.CallTimeout
if timeout == 0 {
timeout = defaultCallTimeout
}
if timeout < 0 {
return ctx, func() {}
}
if _, ok := ctx.Deadline(); ok {
return ctx, func() {}
}
return context.WithTimeout(ctx, timeout)
}
func resolveTransportCredentials(opts Options) (credentials.TransportCredentials, error) {
if opts.TransportCredentials != nil {
return opts.TransportCredentials, nil
}
if opts.Plaintext {
return insecure.NewCredentials(), nil
}
if opts.CACertFile != "" {
return credentials.NewClientTLSFromFile(opts.CACertFile, opts.ServerNameOverride)
}
if opts.TLSConfig != nil {
cfg := opts.TLSConfig.Clone()
if opts.ServerNameOverride != "" {
cfg.ServerName = opts.ServerNameOverride
}
return credentials.NewTLS(cfg), nil
}
return credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
ServerName: opts.ServerNameOverride,
}), nil
}
// OpenSessionOptions describes fields used to create an OpenSessionRequest.
type OpenSessionOptions struct {
RequestedBackend string
ClientSessionName string
ClientCorrelationID string
CommandTimeout time.Duration
}
// Request returns the raw protobuf OpenSessionRequest for these options.
func (o OpenSessionOptions) Request() *OpenSessionRequest {
req := &OpenSessionRequest{
RequestedBackend: o.RequestedBackend,
ClientSessionName: o.ClientSessionName,
ClientCorrelationId: o.ClientCorrelationID,
}
if o.CommandTimeout > 0 {
req.CommandTimeout = durationpb.New(o.CommandTimeout)
}
return req
}
+261
View File
@@ -0,0 +1,261 @@
package mxgateway
import (
"context"
"errors"
"io"
"net"
"testing"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"
)
const bufSize = 1024 * 1024
func TestDialAttachesAuthMetadataToUnaryCalls(t *testing.T) {
fake := &fakeGatewayServer{
openReply: &pb.OpenSessionReply{
SessionId: "session-1",
GatewayProtocolVersion: GatewayProtocolVersion,
WorkerProtocolVersion: WorkerProtocolVersion,
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
},
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
_, err := client.OpenSession(context.Background(), OpenSessionOptions{ClientSessionName: "fixture"})
if err != nil {
t.Fatalf("OpenSession() error = %v", err)
}
if got := fake.openAuth; got != "Bearer test-api-key" {
t.Fatalf("authorization metadata = %q, want %q", got, "Bearer test-api-key")
}
}
func TestStreamEventsAttachesAuthMetadataAndClosesOnCancellation(t *testing.T) {
fake := &fakeGatewayServer{
streamStarted: make(chan struct{}),
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
session := NewSessionForID(client, "session-1")
ctx, cancel := context.WithCancel(context.Background())
events, err := session.Events(ctx)
if err != nil {
t.Fatalf("Events() error = %v", err)
}
<-fake.streamStarted
first := <-events
if first.Err != nil {
t.Fatalf("first event error = %v", first.Err)
}
if first.Event.GetWorkerSequence() != 1 {
t.Fatalf("worker sequence = %d, want 1", first.Event.GetWorkerSequence())
}
if got := fake.streamAuth; got != "Bearer test-api-key" {
t.Fatalf("stream authorization metadata = %q, want %q", got, "Bearer test-api-key")
}
cancel()
select {
case _, ok := <-events:
if ok {
t.Fatal("events channel produced an extra item after cancellation")
}
case <-time.After(2 * time.Second):
t.Fatal("events channel did not close after cancellation")
}
}
func TestSessionHelpersBuildCommandsAndExposeRawReply(t *testing.T) {
fake := &fakeGatewayServer{
invokeReply: &pb.MxCommandReply{
SessionId: "session-1",
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2,
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
Payload: &pb.MxCommandReply_AddItem2{
AddItem2: &pb.AddItem2Reply{ItemHandle: 42},
},
},
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
session := NewSessionForID(client, "session-1")
itemHandle, err := session.AddItem2(context.Background(), 12, "Area001.Pump001.Speed", "runtime")
if err != nil {
t.Fatalf("AddItem2() error = %v", err)
}
if itemHandle != 42 {
t.Fatalf("item handle = %d, want 42", itemHandle)
}
req := fake.invokeRequest
if req.GetSessionId() != "session-1" {
t.Fatalf("session id = %q, want session-1", req.GetSessionId())
}
if req.GetClientCorrelationId() == "" {
t.Fatal("client correlation id is empty")
}
if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2 {
t.Fatalf("command kind = %s", req.GetCommand().GetKind())
}
if req.GetCommand().GetAddItem2().GetItemContext() != "runtime" {
t.Fatalf("item context = %q, want runtime", req.GetCommand().GetAddItem2().GetItemContext())
}
}
func TestInvokeReturnsTypedMxAccessErrorWithRawReply(t *testing.T) {
hresult := int32(-2147467259)
fake := &fakeGatewayServer{
invokeReply: &pb.MxCommandReply{
SessionId: "session-1",
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE,
Hresult: &hresult,
DiagnosticMessage: "native failure",
ProtocolStatus: &pb.ProtocolStatus{Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_MXACCESS_FAILURE, Message: "MXAccess failed"},
Statuses: []*pb.MxStatusProxy{{Success: 0, DiagnosticText: "failed"}},
},
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
session := NewSessionForID(client, "session-1")
err := session.Advise(context.Background(), 12, 34)
var mxErr *MxAccessError
if !errors.As(err, &mxErr) {
t.Fatalf("error %T does not support errors.As(*MxAccessError)", err)
}
if mxErr.Reply.GetHresult() != hresult {
t.Fatalf("raw reply HRESULT = %d, want %d", mxErr.Reply.GetHresult(), hresult)
}
var commandErr *CommandError
if !errors.As(err, &commandErr) {
t.Fatalf("error %T does not support errors.As(*CommandError)", err)
}
if commandErr.Reply.GetDiagnosticMessage() != "native failure" {
t.Fatalf("raw diagnostic = %q", commandErr.Reply.GetDiagnosticMessage())
}
}
func newBufconnClient(t *testing.T, fake *fakeGatewayServer) (*Client, func()) {
t.Helper()
listener := bufconn.Listen(bufSize)
server := grpc.NewServer()
pb.RegisterMxAccessGatewayServer(server, fake)
go func() {
if err := server.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
t.Errorf("bufconn server failed: %v", err)
}
}()
dialer := func(ctx context.Context, _ string) (net.Conn, error) {
return listener.DialContext(ctx)
}
client, err := Dial(context.Background(), Options{
Endpoint: "bufnet",
APIKey: "test-api-key",
Plaintext: true,
DialOptions: []grpc.DialOption{
grpc.WithContextDialer(dialer),
},
})
if err != nil {
t.Fatalf("Dial() error = %v", err)
}
return client, func() {
client.Close()
server.Stop()
listener.Close()
}
}
type fakeGatewayServer struct {
pb.UnimplementedMxAccessGatewayServer
openReply *pb.OpenSessionReply
openAuth string
streamAuth string
streamStarted chan struct{}
invokeReply *pb.MxCommandReply
invokeRequest *pb.MxCommandRequest
}
func (s *fakeGatewayServer) OpenSession(ctx context.Context, req *pb.OpenSessionRequest) (*pb.OpenSessionReply, error) {
s.openAuth = authorizationFromContext(ctx)
if s.openReply != nil {
return s.openReply, nil
}
return &pb.OpenSessionReply{
SessionId: "session-1",
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
}, nil
}
func (s *fakeGatewayServer) CloseSession(ctx context.Context, req *pb.CloseSessionRequest) (*pb.CloseSessionReply, error) {
return &pb.CloseSessionReply{
SessionId: req.GetSessionId(),
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
}, nil
}
func (s *fakeGatewayServer) Invoke(ctx context.Context, req *pb.MxCommandRequest) (*pb.MxCommandReply, error) {
s.invokeRequest = req
if s.invokeReply != nil {
return s.invokeReply, nil
}
return &pb.MxCommandReply{
SessionId: req.GetSessionId(),
Kind: req.GetCommand().GetKind(),
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
}, nil
}
func (s *fakeGatewayServer) StreamEvents(req *pb.StreamEventsRequest, stream grpc.ServerStreamingServer[pb.MxEvent]) error {
s.streamAuth = authorizationFromContext(stream.Context())
if s.streamStarted != nil {
close(s.streamStarted)
}
if err := stream.Send(&pb.MxEvent{
SessionId: req.GetSessionId(),
Family: pb.MxEventFamily_MX_EVENT_FAMILY_ON_DATA_CHANGE,
WorkerSequence: 1,
}); err != nil {
return err
}
<-stream.Context().Done()
return io.EOF
}
func authorizationFromContext(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
values := md.Get(authorizationHeader)
if len(values) == 0 {
return ""
}
return values[0]
}
+73
View File
@@ -0,0 +1,73 @@
package mxgateway
import (
"encoding/json"
"os"
"path/filepath"
"testing"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/protobuf/encoding/protojson"
)
func TestValueConversionFixtures(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", "..", "proto", "fixtures", "behavior", "values", "value-conversion-cases.json"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}
var fixture struct {
Cases []struct {
ID string `json:"id"`
ExpectedKind string `json:"expectedKind"`
Value json.RawMessage `json:"value"`
} `json:"cases"`
}
if err := json.Unmarshal(data, &fixture); err != nil {
t.Fatalf("parse fixture manifest: %v", err)
}
for _, tc := range fixture.Cases {
t.Run(tc.ID, func(t *testing.T) {
var value pb.MxValue
if err := protojson.Unmarshal(tc.Value, &value); err != nil {
t.Fatalf("parse value: %v", err)
}
if _, err := NativeValue(&value); err != nil {
t.Fatalf("NativeValue() error = %v", err)
}
if got := value.ProtoReflect().WhichOneof(value.ProtoReflect().Descriptor().Oneofs().ByName("kind")).JSONName(); got != tc.ExpectedKind {
t.Fatalf("kind = %q, want %q", got, tc.ExpectedKind)
}
})
}
}
func TestStatusConversionFixtures(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", "..", "proto", "fixtures", "behavior", "statuses", "status-conversion-cases.json"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}
var fixture struct {
Cases []struct {
ID string `json:"id"`
Status json.RawMessage `json:"status"`
} `json:"cases"`
}
if err := json.Unmarshal(data, &fixture); err != nil {
t.Fatalf("parse fixture manifest: %v", err)
}
for _, tc := range fixture.Cases {
t.Run(tc.ID, func(t *testing.T) {
var status pb.MxStatusProxy
if err := protojson.Unmarshal(tc.Status, &status); err != nil {
t.Fatalf("parse status: %v", err)
}
if got, want := StatusSucceeded(&status), status.GetSuccess() != 0; got != want {
t.Fatalf("StatusSucceeded() = %v, want %v", got, want)
}
})
}
}
+118
View File
@@ -0,0 +1,118 @@
package mxgateway
import (
"fmt"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
)
// GatewayError wraps transport-level gRPC failures.
type GatewayError struct {
Op string
Err error
}
func (e *GatewayError) Error() string {
if e == nil {
return ""
}
if e.Op == "" {
return fmt.Sprintf("mxgateway: %v", e.Err)
}
return fmt.Sprintf("mxgateway: %s failed: %v", e.Op, e.Err)
}
func (e *GatewayError) Unwrap() error {
if e == nil {
return nil
}
return e.Err
}
// CommandError reports a non-OK gateway protocol status and keeps the raw
// command reply when one exists.
type CommandError struct {
Op string
Status *ProtocolStatus
Reply *MxCommandReply
}
func (e *CommandError) Error() string {
if e == nil {
return ""
}
status := e.Status
if status == nil {
return fmt.Sprintf("mxgateway: %s failed with missing protocol status", e.Op)
}
if status.GetMessage() == "" {
return fmt.Sprintf("mxgateway: %s failed with protocol status %s", e.Op, status.GetCode())
}
return fmt.Sprintf("mxgateway: %s failed with protocol status %s: %s", e.Op, status.GetCode(), status.GetMessage())
}
// MxAccessError reports HRESULT or MXSTATUS_PROXY failures returned by MXAccess.
type MxAccessError struct {
Command *CommandError
Reply *MxCommandReply
}
func (e *MxAccessError) Error() string {
if e == nil {
return ""
}
if e.Command != nil && e.Command.Status != nil && e.Command.Status.GetMessage() != "" {
return e.Command.Error()
}
if e.Reply != nil && e.Reply.GetDiagnosticMessage() != "" {
return fmt.Sprintf("mxgateway: MXAccess command %s failed: %s", e.Reply.GetKind(), e.Reply.GetDiagnosticMessage())
}
if e.Reply != nil && e.Reply.Hresult != nil {
return fmt.Sprintf("mxgateway: MXAccess command %s failed with HRESULT 0x%08X", e.Reply.GetKind(), uint32(e.Reply.GetHresult()))
}
return "mxgateway: MXAccess command failed"
}
func (e *MxAccessError) Unwrap() error {
if e == nil {
return nil
}
return e.Command
}
// EnsureProtocolSuccess returns a typed CommandError when status is non-OK.
func EnsureProtocolSuccess(op string, status *ProtocolStatus, reply *MxCommandReply) error {
if status == nil || status.GetCode() == pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK {
return nil
}
commandError := &CommandError{
Op: op,
Status: status,
Reply: reply,
}
if status.GetCode() == pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_MXACCESS_FAILURE {
return &MxAccessError{
Command: commandError,
Reply: reply,
}
}
return commandError
}
// EnsureMxAccessSuccess returns a typed MxAccessError for failing HRESULTs or
// MXSTATUS_PROXY entries.
func EnsureMxAccessSuccess(op string, reply *MxCommandReply) error {
if reply == nil {
return nil
}
if reply.Hresult != nil && reply.GetHresult() != 0 {
return &MxAccessError{Reply: reply}
}
for _, status := range reply.GetStatuses() {
if !StatusSucceeded(status) {
return &MxAccessError{Reply: reply}
}
}
return nil
}
+45
View File
@@ -0,0 +1,45 @@
package mxgateway
import (
"crypto/tls"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Options configures gateway connections.
type Options struct {
Endpoint string
APIKey string
Plaintext bool
CACertFile string
ServerNameOverride string
DialTimeout time.Duration
CallTimeout time.Duration
TLSConfig *tls.Config
TransportCredentials credentials.TransportCredentials
DialOptions []grpc.DialOption
}
// RedactedAPIKey returns a display-safe representation of the configured API
// key for diagnostics and CLI output.
func (o Options) RedactedAPIKey() string {
return RedactAPIKey(o.APIKey)
}
// RedactAPIKey hides credential material while keeping enough shape for
// troubleshooting whether a key was supplied.
func RedactAPIKey(apiKey string) string {
if apiKey == "" {
return ""
}
if len(apiKey) <= 8 {
return "<redacted>"
}
prefix, suffix := apiKey[:4], apiKey[len(apiKey)-4:]
return prefix + strings.Repeat("*", len(apiKey)-8) + suffix
}
+23
View File
@@ -0,0 +1,23 @@
package mxgateway
import "testing"
func TestRedactAPIKey(t *testing.T) {
tests := []struct {
name string
apiKey string
want string
}{
{name: "empty", apiKey: "", want: ""},
{name: "short", apiKey: "mxgw_1", want: "<redacted>"},
{name: "long", apiKey: "mxgw_key_secret", want: "mxgw*******cret"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RedactAPIKey(tt.apiKey); got != tt.want {
t.Fatalf("RedactAPIKey() = %q, want %q", got, tt.want)
}
})
}
}
@@ -0,0 +1,69 @@
package mxgateway
import (
"os"
"path/filepath"
"testing"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func TestGeneratedGoldenFixturesParse(t *testing.T) {
tests := []struct {
name string
path string
msg proto.Message
}{
{
name: "open session reply",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"),
msg: &pb.OpenSessionReply{},
},
{
name: "register command request",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "register-command-request.json"),
msg: &pb.MxCommandRequest{},
},
{
name: "on data change event",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "on-data-change-event.json"),
msg: &pb.MxEvent{},
},
}
unmarshal := protojson.UnmarshalOptions{DiscardUnknown: false}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := os.ReadFile(tt.path)
if err != nil {
t.Fatalf("read fixture: %v", err)
}
if err := unmarshal.Unmarshal(data, tt.msg); err != nil {
t.Fatalf("parse fixture: %v", err)
}
})
}
}
func TestOpenSessionFixtureProtocolVersions(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}
var reply pb.OpenSessionReply
if err := protojson.Unmarshal(data, &reply); err != nil {
t.Fatalf("parse fixture: %v", err)
}
if reply.GetGatewayProtocolVersion() != GatewayProtocolVersion {
t.Fatalf("gateway protocol = %d, want %d", reply.GetGatewayProtocolVersion(), GatewayProtocolVersion)
}
if reply.GetWorkerProtocolVersion() != WorkerProtocolVersion {
t.Fatalf("worker protocol = %d, want %d", reply.GetWorkerProtocolVersion(), WorkerProtocolVersion)
}
}
+260
View File
@@ -0,0 +1,260 @@
package mxgateway
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"io"
"sync"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// EventResult carries either the next ordered event or a terminal stream error.
type EventResult struct {
Event *MxEvent
Err error
}
// Session represents one gateway-backed MXAccess session.
type Session struct {
client *Client
openReply *OpenSessionReply
closeMu sync.Mutex
closeReply *CloseSessionReply
}
func newSession(client *Client, openReply *OpenSessionReply) *Session {
return &Session{
client: client,
openReply: openReply,
}
}
// NewSessionForID creates a session wrapper for commands against an existing
// gateway session id.
func NewSessionForID(client *Client, sessionID string) *Session {
return newSession(client, &pb.OpenSessionReply{SessionId: sessionID})
}
// ID returns the gateway session identifier.
func (s *Session) ID() string {
return s.openReply.GetSessionId()
}
// OpenReply returns the raw OpenSession reply.
func (s *Session) OpenReply() *OpenSessionReply {
return s.openReply
}
// Close closes the gateway session once and returns the raw close reply.
func (s *Session) Close(ctx context.Context) (*CloseSessionReply, error) {
s.closeMu.Lock()
defer s.closeMu.Unlock()
if s.closeReply != nil {
return s.closeReply, nil
}
reply, err := s.client.CloseSessionRaw(ctx, &pb.CloseSessionRequest{SessionId: s.ID()})
if err != nil {
return reply, err
}
s.closeReply = reply
return reply, nil
}
// Register invokes MXAccess Register and returns the server handle.
func (s *Session) Register(ctx context.Context, clientName string) (int32, error) {
reply, err := s.RegisterRaw(ctx, clientName)
if err != nil {
return 0, err
}
if reply.GetRegister() != nil {
return reply.GetRegister().GetServerHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// RegisterRaw invokes MXAccess Register and returns the raw reply.
func (s *Session) RegisterRaw(ctx context.Context, clientName string) (*MxCommandReply, error) {
if clientName == "" {
return nil, errors.New("mxgateway: client name is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REGISTER,
Payload: &pb.MxCommand_Register{
Register: &pb.RegisterCommand{ClientName: clientName},
},
})
}
// Unregister invokes MXAccess Unregister.
func (s *Session) Unregister(ctx context.Context, serverHandle int32) error {
_, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER,
Payload: &pb.MxCommand_Unregister{
Unregister: &pb.UnregisterCommand{ServerHandle: serverHandle},
},
})
return err
}
// AddItem invokes MXAccess AddItem and returns the item handle.
func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) {
reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition)
if err != nil {
return 0, err
}
if reply.GetAddItem() != nil {
return reply.GetAddItem().GetItemHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// AddItemRaw invokes MXAccess AddItem and returns the raw reply.
func (s *Session) AddItemRaw(ctx context.Context, serverHandle int32, itemDefinition string) (*MxCommandReply, error) {
if itemDefinition == "" {
return nil, errors.New("mxgateway: item definition is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM,
Payload: &pb.MxCommand_AddItem{
AddItem: &pb.AddItemCommand{
ServerHandle: serverHandle,
ItemDefinition: itemDefinition,
},
},
})
}
// AddItem2 invokes MXAccess AddItem2 and returns the item handle.
func (s *Session) AddItem2(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (int32, error) {
reply, err := s.AddItem2Raw(ctx, serverHandle, itemDefinition, itemContext)
if err != nil {
return 0, err
}
if reply.GetAddItem2() != nil {
return reply.GetAddItem2().GetItemHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// AddItem2Raw invokes MXAccess AddItem2 and returns the raw reply.
func (s *Session) AddItem2Raw(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (*MxCommandReply, error) {
if itemDefinition == "" {
return nil, errors.New("mxgateway: item definition is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2,
Payload: &pb.MxCommand_AddItem2{
AddItem2: &pb.AddItem2Command{
ServerHandle: serverHandle,
ItemDefinition: itemDefinition,
ItemContext: itemContext,
},
},
})
}
// Advise invokes MXAccess Advise.
func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.AdviseRaw(ctx, serverHandle, itemHandle)
return err
}
// AdviseRaw invokes MXAccess Advise and returns the raw reply.
func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE,
Payload: &pb.MxCommand_Advise{
Advise: &pb.AdviseCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// Write invokes MXAccess Write.
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
return err
}
// WriteRaw invokes MXAccess Write and returns the raw reply.
func (s *Session) WriteRaw(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) (*MxCommandReply, error) {
if value == nil {
return nil, errors.New("mxgateway: write value is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE,
Payload: &pb.MxCommand_Write{
Write: &pb.WriteCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
Value: value,
UserId: userID,
},
},
})
}
// Events streams ordered session events until the server ends the stream,
// context cancellation stops Recv, or a terminal error is sent.
func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) {
return s.EventsAfter(ctx, 0)
}
// EventsAfter streams ordered session events after the given worker sequence.
func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) (<-chan EventResult, error) {
stream, err := s.client.StreamEventsRaw(ctx, &pb.StreamEventsRequest{
SessionId: s.ID(),
AfterWorkerSequence: afterWorkerSequence,
})
if err != nil {
return nil, err
}
results := make(chan EventResult, 16)
go func() {
defer close(results)
for {
event, err := stream.Recv()
if err == nil {
results <- EventResult{Event: event}
continue
}
if err == io.EOF || status.Code(err) == codes.Canceled || ctx.Err() != nil {
return
}
results <- EventResult{Err: &GatewayError{Op: "stream events", Err: err}}
return
}
}()
return results, nil
}
func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) {
return s.client.Invoke(ctx, &pb.MxCommandRequest{
SessionId: s.ID(),
ClientCorrelationId: newCorrelationID(),
Command: command,
})
}
func newCorrelationID() string {
var buffer [16]byte
if _, err := rand.Read(buffer[:]); err != nil {
return ""
}
return hex.EncodeToString(buffer[:])
}
+6
View File
@@ -0,0 +1,6 @@
package mxgateway
// StatusSucceeded reports whether an MXSTATUS_PROXY entry represents success.
func StatusSucceeded(status *MxStatusProxy) bool {
return status == nil || status.GetSuccess() != 0
}
+70
View File
@@ -0,0 +1,70 @@
package mxgateway
import pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
// RawGatewayClient is the generated gRPC client interface exposed for callers
// that need direct contract access.
type RawGatewayClient = pb.MxAccessGatewayClient
// RawEventStream is the generated StreamEvents client stream.
type RawEventStream = pb.MxAccessGateway_StreamEventsClient
// Generated protobuf aliases keep raw contract access available from the public
// mxgateway package while generated code remains under internal/generated.
type (
OpenSessionRequest = pb.OpenSessionRequest
OpenSessionReply = pb.OpenSessionReply
CloseSessionRequest = pb.CloseSessionRequest
CloseSessionReply = pb.CloseSessionReply
StreamEventsRequest = pb.StreamEventsRequest
MxCommandRequest = pb.MxCommandRequest
MxCommandReply = pb.MxCommandReply
MxCommand = pb.MxCommand
MxEvent = pb.MxEvent
MxValue = pb.MxValue
Value = pb.MxValue
MxArray = pb.MxArray
MxStatusProxy = pb.MxStatusProxy
ProtocolStatus = pb.ProtocolStatus
RegisterCommand = pb.RegisterCommand
UnregisterCommand = pb.UnregisterCommand
AddItemCommand = pb.AddItemCommand
AddItem2Command = pb.AddItem2Command
AdviseCommand = pb.AdviseCommand
WriteCommand = pb.WriteCommand
Write2Command = pb.Write2Command
RegisterReply = pb.RegisterReply
AddItemReply = pb.AddItemReply
AddItem2Reply = pb.AddItem2Reply
)
type (
MxCommandKind = pb.MxCommandKind
MxDataType = pb.MxDataType
MxEventFamily = pb.MxEventFamily
MxStatusCategory = pb.MxStatusCategory
MxStatusSource = pb.MxStatusSource
ProtocolStatusCode = pb.ProtocolStatusCode
SessionState = pb.SessionState
)
const (
CommandKindRegister = pb.MxCommandKind_MX_COMMAND_KIND_REGISTER
CommandKindUnregister = pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER
CommandKindAddItem = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM
CommandKindAddItem2 = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2
CommandKindAdvise = pb.MxCommandKind_MX_COMMAND_KIND_ADVISE
CommandKindWrite = pb.MxCommandKind_MX_COMMAND_KIND_WRITE
CommandKindWrite2 = pb.MxCommandKind_MX_COMMAND_KIND_WRITE2
DataTypeUnknown = pb.MxDataType_MX_DATA_TYPE_UNKNOWN
DataTypeBoolean = pb.MxDataType_MX_DATA_TYPE_BOOLEAN
DataTypeInteger = pb.MxDataType_MX_DATA_TYPE_INTEGER
DataTypeFloat = pb.MxDataType_MX_DATA_TYPE_FLOAT
DataTypeDouble = pb.MxDataType_MX_DATA_TYPE_DOUBLE
DataTypeString = pb.MxDataType_MX_DATA_TYPE_STRING
DataTypeTime = pb.MxDataType_MX_DATA_TYPE_TIME
ProtocolStatusOK = pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK
ProtocolStatusMxAccessFailure = pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_MXACCESS_FAILURE
)
+148
View File
@@ -0,0 +1,148 @@
package mxgateway
import (
"fmt"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/protobuf/types/known/timestamppb"
)
// BoolValue builds an MXAccess Boolean value.
func BoolValue(value bool) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_BOOLEAN,
VariantType: "VT_BOOL",
Kind: &pb.MxValue_BoolValue{BoolValue: value},
}
}
// Int32Value builds an MXAccess Int32 value.
func Int32Value(value int32) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_INTEGER,
VariantType: "VT_I4",
Kind: &pb.MxValue_Int32Value{Int32Value: value},
}
}
// Int64Value builds an MXAccess Int64 value.
func Int64Value(value int64) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_INTEGER,
VariantType: "VT_I8",
Kind: &pb.MxValue_Int64Value{Int64Value: value},
}
}
// FloatValue builds an MXAccess Float value.
func FloatValue(value float32) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_FLOAT,
VariantType: "VT_R4",
Kind: &pb.MxValue_FloatValue{FloatValue: value},
}
}
// DoubleValue builds an MXAccess Double value.
func DoubleValue(value float64) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_DOUBLE,
VariantType: "VT_R8",
Kind: &pb.MxValue_DoubleValue{DoubleValue: value},
}
}
// StringValue builds an MXAccess String value.
func StringValue(value string) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_STRING,
VariantType: "VT_BSTR",
Kind: &pb.MxValue_StringValue{StringValue: value},
}
}
// TimestampValue builds an MXAccess timestamp value from a Go time.
func TimestampValue(value time.Time) *MxValue {
return &pb.MxValue{
DataType: pb.MxDataType_MX_DATA_TYPE_TIME,
VariantType: "VT_DATE",
Kind: &pb.MxValue_TimestampValue{TimestampValue: timestamppb.New(value)},
}
}
// NativeValue converts a protobuf MxValue to the closest Go representation
// without discarding raw fallback data.
func NativeValue(value *MxValue) (any, error) {
if value == nil || value.GetIsNull() {
return nil, nil
}
switch kind := value.GetKind().(type) {
case *pb.MxValue_BoolValue:
return kind.BoolValue, nil
case *pb.MxValue_Int32Value:
return kind.Int32Value, nil
case *pb.MxValue_Int64Value:
return kind.Int64Value, nil
case *pb.MxValue_FloatValue:
return kind.FloatValue, nil
case *pb.MxValue_DoubleValue:
return kind.DoubleValue, nil
case *pb.MxValue_StringValue:
return kind.StringValue, nil
case *pb.MxValue_TimestampValue:
if kind.TimestampValue == nil {
return nil, nil
}
return kind.TimestampValue.AsTime(), nil
case *pb.MxValue_ArrayValue:
return NativeArray(kind.ArrayValue)
case *pb.MxValue_RawValue:
return append([]byte(nil), kind.RawValue...), nil
default:
return nil, fmt.Errorf("mxgateway: unsupported value kind %T", kind)
}
}
// NativeArray converts a protobuf MxArray to the closest Go slice
// representation.
func NativeArray(array *MxArray) (any, error) {
if array == nil {
return nil, nil
}
switch values := array.GetValues().(type) {
case *pb.MxArray_BoolValues:
return append([]bool(nil), values.BoolValues.GetValues()...), nil
case *pb.MxArray_Int32Values:
return append([]int32(nil), values.Int32Values.GetValues()...), nil
case *pb.MxArray_Int64Values:
return append([]int64(nil), values.Int64Values.GetValues()...), nil
case *pb.MxArray_FloatValues:
return append([]float32(nil), values.FloatValues.GetValues()...), nil
case *pb.MxArray_DoubleValues:
return append([]float64(nil), values.DoubleValues.GetValues()...), nil
case *pb.MxArray_StringValues:
return append([]string(nil), values.StringValues.GetValues()...), nil
case *pb.MxArray_TimestampValues:
result := make([]time.Time, 0, len(values.TimestampValues.GetValues()))
for _, value := range values.TimestampValues.GetValues() {
if value == nil {
result = append(result, time.Time{})
continue
}
result = append(result, value.AsTime())
}
return result, nil
case *pb.MxArray_RawValues:
rawValues := values.RawValues.GetValues()
result := make([][]byte, 0, len(rawValues))
for _, value := range rawValues {
result = append(result, append([]byte(nil), value...))
}
return result, nil
default:
return nil, fmt.Errorf("mxgateway: unsupported array value kind %T", values)
}
}
+15
View File
@@ -0,0 +1,15 @@
package mxgateway
const (
// ClientVersion identifies this Go client scaffold before package releases
// assign semantic versions.
ClientVersion = "0.1.0-dev"
// GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion
// in the shared .NET contracts.
GatewayProtocolVersion uint32 = 1
// WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion
// and is exposed for fake-worker and parity tests.
WorkerProtocolVersion uint32 = 1
)
+41
View File
@@ -0,0 +1,41 @@
# Java Client
The Java client workspace contains the Gradle scaffold for the MXAccess Gateway
client library, generated protobuf/gRPC bindings, a test CLI project, and JUnit
tests.
## Layout
```text
clients/java/
settings.gradle
build.gradle
src/main/generated/
mxgateway-client/
mxgateway-cli/
```
`mxgateway-client` generates Java protobuf and gRPC sources from
`../../src/MxGateway.Contracts/Protos`. The Gradle protobuf plugin writes those
generated sources under `src/main/generated`, which matches the client proto
manifest in `../proto/proto-inputs.json`. Do not edit generated files by hand.
`mxgateway-cli` depends on `mxgateway-client` and provides the `mxgw-java`
application entry point used by later CLI implementation work.
## Build And Test
Run the Java checks from `clients/java`:
```powershell
gradle test
```
The build uses the Java 21 Gradle toolchain, compiles generated protobuf/gRPC
code, and runs JUnit 5 tests for the scaffold and CLI entry point.
## Related Documentation
- [Client Proto Generation](../../docs/client-proto-generation.md)
- [Java Client Detailed Design](../../docs/clients-java-design.md)
- [Java Style Guide](../../docs/style-guides/JavaStyleGuide.md)
+38
View File
@@ -0,0 +1,38 @@
plugins {
id 'base'
}
ext {
grpcVersion = '1.76.0'
junitVersion = '5.14.1'
picocliVersion = '4.7.7'
protobufVersion = '4.33.1'
}
subprojects {
group = 'com.dohertylan.mxgateway'
version = '0.1.0'
pluginManager.withPlugin('java') {
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
tasks.withType(JavaCompile).configureEach {
options.encoding = 'UTF-8'
options.release = 21
}
tasks.withType(Test).configureEach {
useJUnitPlatform()
}
dependencies {
testImplementation platform("org.junit:junit-bom:${junitVersion}")
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
}
}
+12
View File
@@ -0,0 +1,12 @@
plugins {
id 'application'
}
dependencies {
implementation project(':mxgateway-client')
implementation "info.picocli:picocli:${picocliVersion}"
}
application {
mainClass = 'com.dohertylan.mxgateway.cli.MxGatewayCli'
}
@@ -0,0 +1,53 @@
package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
import java.io.PrintWriter;
import java.util.concurrent.Callable;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;
@Command(
name = "mxgw-java",
mixinStandardHelpOptions = true,
description = "MXAccess Gateway Java test CLI.",
subcommands = MxGatewayCli.VersionCommand.class)
public final class MxGatewayCli implements Callable<Integer> {
@Spec
private CommandSpec spec;
public static void main(String[] args) {
int exitCode = new CommandLine(new MxGatewayCli()).execute(args);
System.exit(exitCode);
}
public static int execute(PrintWriter out, PrintWriter err, String... args) {
CommandLine commandLine = new CommandLine(new MxGatewayCli());
commandLine.setOut(out);
commandLine.setErr(err);
return commandLine.execute(args);
}
@Override
public Integer call() {
spec.commandLine().usage(spec.commandLine().getOut());
return 0;
}
@Command(name = "version", description = "Prints the Java client scaffold version.")
public static final class VersionCommand implements Callable<Integer> {
@Spec
private CommandSpec spec;
@Override
public Integer call() {
spec.commandLine().getOut().printf(
"mxgateway-java %s gatewayProtocolVersion=%d workerProtocolVersion=%d%n",
MxGatewayClientVersion.clientVersion(),
MxGatewayClientVersion.gatewayProtocolVersion(),
MxGatewayClientVersion.workerProtocolVersion());
return 0;
}
}
}
@@ -0,0 +1,27 @@
package com.dohertylan.mxgateway.cli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@Test
void versionCommandPrintsProtocolVersions() {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
int exitCode = MxGatewayCli.execute(
new PrintWriter(output, true),
new PrintWriter(errors, true),
"version");
assertEquals(0, exitCode);
assertEquals("", errors.toString());
assertTrue(output.toString().contains("mxgateway-java 0.1.0"));
assertTrue(output.toString().contains("gatewayProtocolVersion=1"));
assertTrue(output.toString().contains("workerProtocolVersion=1"));
}
}
@@ -0,0 +1,46 @@
plugins {
id 'java-library'
id 'com.google.protobuf'
}
dependencies {
api "com.google.protobuf:protobuf-java:${protobufVersion}"
api "io.grpc:grpc-protobuf:${grpcVersion}"
api "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
compileOnly 'javax.annotation:javax.annotation-api:1.3.2'
}
sourceSets {
main {
proto {
srcDir rootProject.file('../../src/MxGateway.Contracts/Protos')
include 'mxaccess_gateway.proto'
include 'mxaccess_worker.proto'
}
}
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generatedFilesBaseDir = rootProject.file('src/main/generated').absolutePath
generateProtoTasks {
all().configureEach {
plugins {
grpc {}
}
}
}
}
@@ -0,0 +1,22 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewayClientVersion {
private static final int GATEWAY_PROTOCOL_VERSION = 1;
private static final int WORKER_PROTOCOL_VERSION = 1;
private static final String CLIENT_VERSION = "0.1.0";
private MxGatewayClientVersion() {
}
public static String clientVersion() {
return CLIENT_VERSION;
}
public static int gatewayProtocolVersion() {
return GATEWAY_PROTOCOL_VERSION;
}
public static int workerProtocolVersion() {
return WORKER_PROTOCOL_VERSION;
}
}
@@ -0,0 +1,29 @@
package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_worker.v1.MxaccessWorker.WorkerEnvelope;
import org.junit.jupiter.api.Test;
final class GeneratedContractSmokeTests {
@Test
void generatedGatewayAndWorkerContractsCompile() {
OpenSessionRequest request = OpenSessionRequest.newBuilder()
.setClientSessionName("junit")
.build();
WorkerEnvelope envelope = WorkerEnvelope.newBuilder()
.setProtocolVersion(MxGatewayClientVersion.workerProtocolVersion())
.build();
assertEquals("junit", request.getClientSessionName());
assertEquals("mxaccess_gateway.v1.MxAccessGateway", MxAccessGatewayGrpc.SERVICE_NAME);
assertEquals(1, envelope.getProtocolVersion());
}
@Test
void javaTwentyOneToolchainRunsTests() {
assertEquals(21, Runtime.version().feature());
}
}
+22
View File
@@ -0,0 +1,22 @@
pluginManagement {
repositories {
gradlePluginPortal()
mavenCentral()
}
plugins {
id 'com.google.protobuf' version '0.9.5'
}
}
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
mavenCentral()
}
}
rootProject.name = 'mxaccessgw-java'
include 'mxgateway-client'
include 'mxgateway-cli'
+1
View File
@@ -0,0 +1 @@
@@ -0,0 +1,588 @@
package mxaccess_gateway.v1;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
@io.grpc.stub.annotations.GrpcGenerated
public final class MxAccessGatewayGrpc {
private MxAccessGatewayGrpc() {}
public static final java.lang.String SERVICE_NAME = "mxaccess_gateway.v1.MxAccessGateway";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "OpenSession",
requestType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
MxAccessGatewayGrpc.getOpenSessionMethod = getOpenSessionMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "OpenSession"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("OpenSession"))
.build();
}
}
}
return getOpenSessionMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "CloseSession",
requestType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
MxAccessGatewayGrpc.getCloseSessionMethod = getCloseSessionMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "CloseSession"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("CloseSession"))
.build();
}
}
}
return getCloseSessionMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "Invoke",
requestType = mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
MxAccessGatewayGrpc.getInvokeMethod = getInvokeMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Invoke"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("Invoke"))
.build();
}
}
}
return getInvokeMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "StreamEvents",
requestType = mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.MxEvent.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
MxAccessGatewayGrpc.getStreamEventsMethod = getStreamEventsMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "StreamEvents"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxEvent.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("StreamEvents"))
.build();
}
}
}
return getStreamEventsMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static MxAccessGatewayStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub>() {
@java.lang.Override
public MxAccessGatewayStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayStub(channel, callOptions);
}
};
return MxAccessGatewayStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports all types of calls on the service
*/
public static MxAccessGatewayBlockingV2Stub newBlockingV2Stub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub>() {
@java.lang.Override
public MxAccessGatewayBlockingV2Stub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
}
};
return MxAccessGatewayBlockingV2Stub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static MxAccessGatewayBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub>() {
@java.lang.Override
public MxAccessGatewayBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingStub(channel, callOptions);
}
};
return MxAccessGatewayBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static MxAccessGatewayFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub>() {
@java.lang.Override
public MxAccessGatewayFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayFutureStub(channel, callOptions);
}
};
return MxAccessGatewayFutureStub.newStub(factory, channel);
}
/**
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public interface AsyncService {
/**
*/
default void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getOpenSessionMethod(), responseObserver);
}
/**
*/
default void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getCloseSessionMethod(), responseObserver);
}
/**
*/
default void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), responseObserver);
}
/**
*/
default void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getStreamEventsMethod(), responseObserver);
}
}
/**
* Base class for the server implementation of the service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static abstract class MxAccessGatewayImplBase
implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return MxAccessGatewayGrpc.bindService(this);
}
}
/**
* A stub to allow clients to do asynchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayStub
extends io.grpc.stub.AbstractAsyncStub<MxAccessGatewayStub> {
private MxAccessGatewayStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayStub(channel, callOptions);
}
/**
*/
public void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getInvokeMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getStreamEventsMethod(), getCallOptions()), request, responseObserver);
}
}
/**
* A stub to allow clients to do synchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayBlockingV2Stub
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingV2Stub> {
private MxAccessGatewayBlockingV2Stub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayBlockingV2Stub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getInvokeMethod(), getCallOptions(), request);
}
/**
*/
@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
public io.grpc.stub.BlockingClientCall<?, mxaccess_gateway.v1.MxaccessGateway.MxEvent>
streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall(
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
}
}
/**
* A stub to allow clients to do limited synchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayBlockingStub
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingStub> {
private MxAccessGatewayBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingStub(channel, callOptions);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getInvokeMethod(), getCallOptions(), request);
}
/**
*/
public java.util.Iterator<mxaccess_gateway.v1.MxaccessGateway.MxEvent> streamEvents(
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
}
}
/**
* A stub to allow clients to do ListenableFuture-style rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayFutureStub
extends io.grpc.stub.AbstractFutureStub<MxAccessGatewayFutureStub> {
private MxAccessGatewayFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> openSession(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> closeSession(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> invoke(
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getInvokeMethod(), getCallOptions()), request);
}
}
private static final int METHODID_OPEN_SESSION = 0;
private static final int METHODID_CLOSE_SESSION = 1;
private static final int METHODID_INVOKE = 2;
private static final int METHODID_STREAM_EVENTS = 3;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final AsyncService serviceImpl;
private final int methodId;
MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_OPEN_SESSION:
serviceImpl.openSession((mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>) responseObserver);
break;
case METHODID_CLOSE_SESSION:
serviceImpl.closeSession((mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>) responseObserver);
break;
case METHODID_INVOKE:
serviceImpl.invoke((mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>) responseObserver);
break;
case METHODID_STREAM_EVENTS:
serviceImpl.streamEvents((mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getOpenSessionMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>(
service, METHODID_OPEN_SESSION)))
.addMethod(
getCloseSessionMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>(
service, METHODID_CLOSE_SESSION)))
.addMethod(
getInvokeMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>(
service, METHODID_INVOKE)))
.addMethod(
getStreamEventsMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent>(
service, METHODID_STREAM_EVENTS)))
.build();
}
private static abstract class MxAccessGatewayBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
MxAccessGatewayBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return mxaccess_gateway.v1.MxaccessGateway.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("MxAccessGateway");
}
}
private static final class MxAccessGatewayFileDescriptorSupplier
extends MxAccessGatewayBaseDescriptorSupplier {
MxAccessGatewayFileDescriptorSupplier() {}
}
private static final class MxAccessGatewayMethodDescriptorSupplier
extends MxAccessGatewayBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final java.lang.String methodName;
MxAccessGatewayMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (MxAccessGatewayGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new MxAccessGatewayFileDescriptorSupplier())
.addMethod(getOpenSessionMethod())
.addMethod(getCloseSessionMethod())
.addMethod(getInvokeMethod())
.addMethod(getStreamEventsMethod())
.build();
}
}
}
return result;
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,36 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "missing-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": ""
},
"expectedRedactedOutput": "authentication failed: missing bearer token",
"retryableWithoutCredentialChange": false
},
{
"id": "invalid-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"expectedRedactedOutput": "authentication failed: invalid API key <redacted>",
"retryableWithoutCredentialChange": false
},
{
"id": "missing-write-scope",
"grpcStatusCode": "PERMISSION_DENIED",
"clientErrorCategory": "AuthorizationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"requiredScope": "mxaccess.write",
"expectedRedactedOutput": "authorization failed: missing scope mxaccess.write",
"retryableWithoutCredentialChange": false
}
]
}
@@ -0,0 +1,30 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-register-1",
"kind": "MX_COMMAND_KIND_REGISTER",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_OK",
"message": "Register completed."
},
"hresult": 0,
"returnValue": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 12
},
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"diagnosticMessage": "COM Register returned server handle 12.",
"register": {
"serverHandle": 12
}
}
@@ -0,0 +1,38 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-write-1",
"kind": "MX_COMMAND_KIND_WRITE",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_MXACCESS_FAILURE",
"message": "MXAccess rejected the write."
},
"hresult": -2147220992,
"returnValue": {
"dataType": "MX_DATA_TYPE_NO_DATA",
"variantType": "VT_EMPTY",
"isNull": true,
"rawDiagnostic": "MXAccess returned no value for the failed write.",
"rawDataType": 2
},
"statuses": [
{
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 321,
"rawCategory": 8,
"rawDetectedBy": 3,
"diagnosticText": "Write denied by provider security."
},
{
"success": 0,
"category": "MX_STATUS_CATEGORY_OPERATIONAL_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 902,
"rawCategory": 7,
"rawDetectedBy": 5,
"diagnosticText": "Provider rejected the item state."
}
],
"diagnosticMessage": "Fixture preserves a data-bearing MXAccess failure reply with HRESULT and status array."
}
@@ -0,0 +1,159 @@
{
"sessionId": "session-fixture",
"description": "Ordered event stream sample for one worker-backed session.",
"events": [
{
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 123
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:00Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"workerSequence": "1",
"workerTimestamp": "2026-01-01T00:00:00.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:00.015Z",
"onDataChange": {}
},
{
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_DOUBLE",
"variantType": "VT_R8",
"doubleValue": 45.5
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:01Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Write complete."
}
],
"workerSequence": "2",
"workerTimestamp": "2026-01-01T00:00:01.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:01.015Z",
"hresult": 0,
"onWriteComplete": {}
},
{
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_BSTR",
"stringValue": "operation-complete"
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:02Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Operation complete."
}
],
"workerSequence": "3",
"workerTimestamp": "2026-01-01T00:00:02.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:02.015Z",
"operationComplete": {}
},
{
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_FLOAT",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_FLOAT",
"variantType": "VT_ARRAY|VT_R4",
"dimensions": [
2
],
"floatValues": {
"values": [
1.5,
2.5
]
}
}
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:03Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Buffered data delivered."
}
],
"workerSequence": "4",
"workerTimestamp": "2026-01-01T00:00:03.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:03.015Z",
"onBufferedDataChange": {
"dataType": "MX_DATA_TYPE_FLOAT",
"qualityValues": {
"elementDataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_ARRAY|VT_I4",
"dimensions": [
2
],
"int32Values": {
"values": [
192,
192
]
}
},
"timestampValues": {
"elementDataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_ARRAY|VT_DATE",
"dimensions": [
2
],
"timestampValues": {
"values": [
"2026-01-01T00:00:02Z",
"2026-01-01T00:00:03Z"
]
}
},
"rawDataType": 5
}
}
]
}
@@ -0,0 +1,59 @@
{
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-client-behavior",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"protoInputManifest": "clients/proto/proto-inputs.json",
"fixtures": [
{
"id": "command-reply.register.ok",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/register.ok.reply.json",
"expectation": "Successful command replies preserve protocol status, HRESULT, return value, status arrays, and method-specific output."
},
{
"id": "command-reply.write.mxaccess-failure",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/write.mxaccess-failure.reply.json",
"expectation": "MXAccess failures are data-bearing replies with HRESULT and status details, not transport failures."
},
{
"id": "event-stream.session-ordered",
"category": "event_streams",
"messageType": "mxaccess_gateway.v1.MxEvent",
"path": "event-streams/session-event-stream.json",
"expectation": "Clients preserve per-session event order and event family bodies exactly as emitted."
},
{
"id": "values.conversion-cases",
"category": "value_conversion",
"messageType": "mxaccess_gateway.v1.MxValue",
"path": "values/value-conversion-cases.json",
"expectation": "Clients expose typed projections and keep raw fallback metadata when conversion is incomplete."
},
{
"id": "statuses.conversion-cases",
"category": "status_conversion",
"messageType": "mxaccess_gateway.v1.MxStatusProxy",
"path": "statuses/status-conversion-cases.json",
"expectation": "Clients preserve every MXSTATUS_PROXY field, including raw category/source values."
},
{
"id": "auth.error-cases",
"category": "auth_errors",
"messageType": "client_behavior.v1.AuthErrorCase",
"path": "auth/auth-error-cases.json",
"expectation": "Clients map authentication and authorization failures distinctly and redact credentials."
},
{
"id": "timeout-cancel.expected-behavior",
"category": "timeout_cancel",
"messageType": "client_behavior.v1.TimeoutCancelCase",
"path": "timeout-cancel/timeout-cancel-cases.json",
"expectation": "Client cancellation stops waiting locally but does not imply an in-flight MXAccess COM call was aborted."
}
]
}
@@ -0,0 +1,41 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "ok.responding-lmx",
"status": {
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
},
{
"id": "security-error.requesting-lmx",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_REQUESTING_LMX",
"detail": 401,
"rawCategory": 8,
"rawDetectedBy": 2,
"diagnosticText": "Requesting LMX denied the secured operation."
}
},
{
"id": "raw-unknown-category",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_UNKNOWN",
"detectedBy": "MX_STATUS_SOURCE_UNKNOWN",
"detail": 65535,
"rawCategory": 99,
"rawDetectedBy": 77,
"diagnosticText": "Unknown native MXSTATUS_PROXY fields are preserved."
}
}
]
}
@@ -0,0 +1,27 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "unary-deadline-exceeded",
"operation": "Invoke",
"clientDeadline": "2s",
"grpcStatusCode": "DEADLINE_EXCEEDED",
"clientErrorCategory": "TimeoutError",
"gatewayWaitBehavior": "stops_waiting_for_reply",
"workerCommandBehavior": "continues_until_worker_reply_or_worker_fault",
"sessionExpectation": "session_state_is_unknown_until_follow_up_status_or_close",
"expectedClientAction": "issue GetSessionState or CloseSession before reusing handles"
},
{
"id": "stream-cancel",
"operation": "StreamEvents",
"clientDeadline": "5s",
"grpcStatusCode": "CANCELLED",
"clientErrorCategory": "CancelledError",
"gatewayWaitBehavior": "stops_streaming_to_that_call",
"workerCommandBehavior": "does_not_cancel_worker_session",
"sessionExpectation": "session_remains_ready_if_worker_stays_healthy",
"expectedClientAction": "open a new StreamEvents call with the last observed worker sequence"
}
]
}
@@ -0,0 +1,85 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "bool.true",
"expectedKind": "boolValue",
"value": {
"dataType": "MX_DATA_TYPE_BOOLEAN",
"variantType": "VT_BOOL",
"boolValue": true
}
},
{
"id": "int64.large",
"expectedKind": "int64Value",
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I8",
"int64Value": "9223372036854770000"
}
},
{
"id": "timestamp.utc",
"expectedKind": "timestampValue",
"value": {
"dataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_DATE",
"timestampValue": "2026-01-01T00:00:04Z"
}
},
{
"id": "string-array",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_ARRAY|VT_BSTR",
"dimensions": [
2
],
"stringValues": {
"values": [
"alpha",
"beta"
]
}
}
}
},
{
"id": "raw-fallback.variant",
"expectedKind": "rawValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_RECORD",
"rawDiagnostic": "No lossless typed projection exists for this VARIANT.",
"rawDataType": 32767,
"rawValue": "AQIDBAU="
}
},
{
"id": "raw-array-fallback",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_ARRAY|VT_VARIANT",
"dimensions": [
2
],
"rawDiagnostic": "Array elements contain mixed VARIANT types.",
"rawElementDataType": 32767,
"rawValues": {
"values": [
"AAE=",
"AgM="
]
}
}
}
}
]
}
@@ -0,0 +1,28 @@
{
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 123
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:00Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"workerSequence": "1",
"workerTimestamp": "2026-01-01T00:00:00.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:00.015Z",
"onDataChange": {}
}
@@ -0,0 +1,18 @@
{
"sessionId": "session-fixture",
"backendName": "mxaccess-worker",
"workerProcessId": 1234,
"workerProtocolVersion": 1,
"gatewayProtocolVersion": 1,
"capabilities": [
"unary-open-session",
"unary-close-session",
"unary-invoke",
"server-stream-events"
],
"defaultCommandTimeout": "30s",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_OK",
"message": "Session opened."
}
}
@@ -0,0 +1,10 @@
{
"sessionId": "session-fixture",
"clientCorrelationId": "fixture-register-1",
"command": {
"kind": "MX_COMMAND_KIND_REGISTER",
"register": {
"clientName": "fixture-client"
}
}
}
+27
View File
@@ -0,0 +1,27 @@
{
"schemaVersion": 1,
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"protoRoot": "src/MxGateway.Contracts/Protos",
"sourceFiles": [
{
"path": "mxaccess_gateway.proto",
"role": "public_gateway"
},
{
"path": "mxaccess_worker.proto",
"role": "gateway_worker_ipc"
}
],
"descriptorSet": "clients/proto/descriptors/mxaccessgw-client-v1.protoset",
"fixtureRoot": "clients/proto/fixtures/golden",
"behaviorFixtureRoot": "clients/proto/fixtures/behavior",
"generatedOutputs": {
"dotnet": "clients/dotnet/generated",
"go": "clients/go/internal/generated",
"rust": "clients/rust/src/generated",
"python": "clients/python/src/mxgateway/generated",
"java": "clients/java/src/main/generated"
}
}
+57
View File
@@ -0,0 +1,57 @@
# Python Client
The Python client package contains generated MXAccess Gateway protobuf
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
scaffold. The package uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
## Layout
```text
clients/python/
pyproject.toml
generate-proto.ps1
src/mxgateway/
src/mxgateway/generated/
src/mxgateway_cli/
tests/
```
`src/mxgateway/generated` contains code produced by `grpc_tools.protoc`. Do not
edit generated files by hand.
## Regenerating Protobuf Bindings
Run generation after the shared `.proto` files or the Python output path
changes:
```powershell
./generate-proto.ps1
```
The script uses the Python tool path recorded in
`../../docs/toolchain-links.md`.
## Build And Test
Run the Python checks from `clients/python`:
```powershell
python -m pip install -e ".[dev]"
python -m pytest
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
```
The scaffold tests import the generated gateway and worker stubs and exercise
the deterministic CLI version output.
## CLI
The scaffold CLI exposes version information:
```powershell
mxgw-py version --json
```
Additional commands are implemented with the async client/session wrapper work.
+22
View File
@@ -0,0 +1,22 @@
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
$outputRoot = Join-Path $PSScriptRoot 'src\mxgateway\generated'
$python = 'C:\Users\dohertj2\AppData\Local\Programs\Python\Python312\python.exe'
if (-not (Test-Path $python)) {
throw "Python was not found at $python. See docs/toolchain-links.md."
}
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2.py') -File | Remove-Item
Get-ChildItem -Path (Join-Path $outputRoot '*_pb2_grpc.py') -File | Remove-Item
& $python -m grpc_tools.protoc `
"-I$protoRoot" `
"--python_out=$outputRoot" `
"--grpc_python_out=$outputRoot" `
mxaccess_gateway.proto `
mxaccess_worker.proto
+33
View File
@@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "mxaccess-gateway-client"
version = "0.1.0"
description = "Async Python client scaffold for MXAccess Gateway."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"click>=8.3,<9",
"grpcio>=1.80,<2",
"protobuf>=6.33,<7",
]
[project.optional-dependencies]
dev = [
"grpcio-tools>=1.80,<2",
"pytest>=9,<10",
"pytest-asyncio>=1.3,<2",
]
[project.scripts]
mxgw-py = "mxgateway_cli.commands:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
addopts = "-ra"
pythonpath = ["src"]
testpaths = ["tests"]
+5
View File
@@ -0,0 +1,5 @@
"""MXAccess Gateway Python client package."""
from .version import __version__
__all__ = ["__version__"]
@@ -0,0 +1 @@
@@ -0,0 +1,29 @@
"""Generated protobuf and gRPC modules for MXAccess Gateway.
The Python protobuf generator emits absolute imports between generated modules.
This package initializer registers package-local aliases so callers can import
the generated stubs through `mxgateway.generated` without moving the modules to
the top-level import namespace.
"""
from importlib import import_module
import sys
mxaccess_gateway_pb2 = import_module(f"{__name__}.mxaccess_gateway_pb2")
sys.modules.setdefault("mxaccess_gateway_pb2", mxaccess_gateway_pb2)
mxaccess_gateway_pb2_grpc = import_module(f"{__name__}.mxaccess_gateway_pb2_grpc")
sys.modules.setdefault("mxaccess_gateway_pb2_grpc", mxaccess_gateway_pb2_grpc)
mxaccess_worker_pb2 = import_module(f"{__name__}.mxaccess_worker_pb2")
sys.modules.setdefault("mxaccess_worker_pb2", mxaccess_worker_pb2)
mxaccess_worker_pb2_grpc = import_module(f"{__name__}.mxaccess_worker_pb2_grpc")
sys.modules.setdefault("mxaccess_worker_pb2_grpc", mxaccess_worker_pb2_grpc)
__all__ = [
"mxaccess_gateway_pb2",
"mxaccess_gateway_pb2_grpc",
"mxaccess_worker_pb2",
"mxaccess_worker_pb2_grpc",
]
File diff suppressed because one or more lines are too long
@@ -0,0 +1,229 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_gateway_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class MxAccessGatewayStub(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.OpenSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
request_serializer=mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.OpenSessionReply.FromString,
_registered_method=True)
self.CloseSession = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
request_serializer=mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.CloseSessionReply.FromString,
_registered_method=True)
self.Invoke = channel.unary_unary(
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
request_serializer=mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxCommandReply.FromString,
_registered_method=True)
self.StreamEvents = channel.unary_stream(
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
request_serializer=mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.MxEvent.FromString,
_registered_method=True)
class MxAccessGatewayServicer(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
def OpenSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CloseSession(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Invoke(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamEvents(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MxAccessGatewayServicer_to_server(servicer, server):
rpc_method_handlers = {
'OpenSession': grpc.unary_unary_rpc_method_handler(
servicer.OpenSession,
request_deserializer=mxaccess__gateway__pb2.OpenSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.OpenSessionReply.SerializeToString,
),
'CloseSession': grpc.unary_unary_rpc_method_handler(
servicer.CloseSession,
request_deserializer=mxaccess__gateway__pb2.CloseSessionRequest.FromString,
response_serializer=mxaccess__gateway__pb2.CloseSessionReply.SerializeToString,
),
'Invoke': grpc.unary_unary_rpc_method_handler(
servicer.Invoke,
request_deserializer=mxaccess__gateway__pb2.MxCommandRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxCommandReply.SerializeToString,
),
'StreamEvents': grpc.unary_stream_rpc_method_handler(
servicer.StreamEvents,
request_deserializer=mxaccess__gateway__pb2.StreamEventsRequest.FromString,
response_serializer=mxaccess__gateway__pb2.MxEvent.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('mxaccess_gateway.v1.MxAccessGateway', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class MxAccessGateway(object):
"""Public client API for MXAccess sessions hosted by the gateway.
"""
@staticmethod
def OpenSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/OpenSession',
mxaccess__gateway__pb2.OpenSessionRequest.SerializeToString,
mxaccess__gateway__pb2.OpenSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CloseSession(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/CloseSession',
mxaccess__gateway__pb2.CloseSessionRequest.SerializeToString,
mxaccess__gateway__pb2.CloseSessionReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def Invoke(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/Invoke',
mxaccess__gateway__pb2.MxCommandRequest.SerializeToString,
mxaccess__gateway__pb2.MxCommandReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamEvents(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/mxaccess_gateway.v1.MxAccessGateway/StreamEvents',
mxaccess__gateway__pb2.StreamEventsRequest.SerializeToString,
mxaccess__gateway__pb2.MxEvent.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: mxaccess_worker.proto
# Protobuf Python Version: 6.31.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
6,
31,
1,
'',
'mxaccess_worker.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
import mxaccess_gateway_pb2 as mxaccess__gateway__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15mxaccess_worker.proto\x12\x12mxaccess_worker.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16mxaccess_gateway.proto\"\x95\x06\n\x0eWorkerEnvelope\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\x12\n\nsession_id\x18\x02 \x01(\t\x12\x10\n\x08sequence\x18\x03 \x01(\x04\x12\x16\n\x0e\x63orrelation_id\x18\x04 \x01(\t\x12\x39\n\rgateway_hello\x18\n \x01(\x0b\x32 .mxaccess_worker.v1.GatewayHelloH\x00\x12\x37\n\x0cworker_hello\x18\x0b \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerHelloH\x00\x12\x37\n\x0cworker_ready\x18\x0c \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerReadyH\x00\x12;\n\x0eworker_command\x18\r \x01(\x0b\x32!.mxaccess_worker.v1.WorkerCommandH\x00\x12\x46\n\x14worker_command_reply\x18\x0e \x01(\x0b\x32&.mxaccess_worker.v1.WorkerCommandReplyH\x00\x12\x39\n\rworker_cancel\x18\x0f \x01(\x0b\x32 .mxaccess_worker.v1.WorkerCancelH\x00\x12=\n\x0fworker_shutdown\x18\x10 \x01(\x0b\x32\".mxaccess_worker.v1.WorkerShutdownH\x00\x12\x44\n\x13worker_shutdown_ack\x18\x11 \x01(\x0b\x32%.mxaccess_worker.v1.WorkerShutdownAckH\x00\x12\x37\n\x0cworker_event\x18\x12 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerEventH\x00\x12?\n\x10worker_heartbeat\x18\x13 \x01(\x0b\x32#.mxaccess_worker.v1.WorkerHeartbeatH\x00\x12\x37\n\x0cworker_fault\x18\x14 \x01(\x0b\x32\x1f.mxaccess_worker.v1.WorkerFaultH\x00\x42\x06\n\x04\x62ody\"Z\n\x0cGatewayHello\x12\"\n\x1asupported_protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x17\n\x0fgateway_version\x18\x03 \x01(\t\"i\n\x0bWorkerHello\x12\x18\n\x10protocol_version\x18\x01 \x01(\r\x12\r\n\x05nonce\x18\x02 \x01(\t\x12\x19\n\x11worker_process_id\x18\x03 \x01(\x05\x12\x16\n\x0eworker_version\x18\x04 \x01(\t\"\x8e\x01\n\x0bWorkerReady\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12\x17\n\x0fmxaccess_progid\x18\x02 \x01(\t\x12\x16\n\x0emxaccess_clsid\x18\x03 \x01(\t\x12\x33\n\x0fready_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"w\n\rWorkerCommand\x12/\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x1e.mxaccess_gateway.v1.MxCommand\x12\x35\n\x11\x65nqueue_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x81\x01\n\x12WorkerCommandReply\x12\x32\n\x05reply\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.MxCommandReply\x12\x37\n\x13\x63ompleted_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x0cWorkerCancel\x12\x0e\n\x06reason\x18\x01 \x01(\t\"Q\n\x0eWorkerShutdown\x12/\n\x0cgrace_period\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x0e\n\x06reason\x18\x02 \x01(\t\"H\n\x11WorkerShutdownAck\x12\x33\n\x06status\x18\x01 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatus\":\n\x0bWorkerEvent\x12+\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.mxaccess_gateway.v1.MxEvent\"\xa5\x02\n\x0fWorkerHeartbeat\x12\x19\n\x11worker_process_id\x18\x01 \x01(\x05\x12.\n\x05state\x18\x02 \x01(\x0e\x32\x1f.mxaccess_worker.v1.WorkerState\x12?\n\x1blast_sta_activity_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1d\n\x15pending_command_count\x18\x04 \x01(\r\x12\"\n\x1aoutbound_event_queue_depth\x18\x05 \x01(\r\x12\x1b\n\x13last_event_sequence\x18\x06 \x01(\x04\x12&\n\x1e\x63urrent_command_correlation_id\x18\x07 \x01(\t\"\xf4\x01\n\x0bWorkerFault\x12\x39\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\'.mxaccess_worker.v1.WorkerFaultCategory\x12\x16\n\x0e\x63ommand_method\x18\x02 \x01(\t\x12\x14\n\x07hresult\x18\x03 \x01(\x05H\x00\x88\x01\x01\x12\x16\n\x0e\x65xception_type\x18\x04 \x01(\t\x12\x1a\n\x12\x64iagnostic_message\x18\x05 \x01(\t\x12<\n\x0fprotocol_status\x18\x06 \x01(\x0b\x32#.mxaccess_gateway.v1.ProtocolStatusB\n\n\x08_hresult*\x97\x02\n\x0bWorkerState\x12\x1c\n\x18WORKER_STATE_UNSPECIFIED\x10\x00\x12\x19\n\x15WORKER_STATE_STARTING\x10\x01\x12\x1c\n\x18WORKER_STATE_HANDSHAKING\x10\x02\x12!\n\x1dWORKER_STATE_INITIALIZING_STA\x10\x03\x12\x16\n\x12WORKER_STATE_READY\x10\x04\x12\"\n\x1eWORKER_STATE_EXECUTING_COMMAND\x10\x05\x12\x1e\n\x1aWORKER_STATE_SHUTTING_DOWN\x10\x06\x12\x18\n\x14WORKER_STATE_STOPPED\x10\x07\x12\x18\n\x14WORKER_STATE_FAULTED\x10\x08*\xc7\x04\n\x13WorkerFaultCategory\x12%\n!WORKER_FAULT_CATEGORY_UNSPECIFIED\x10\x00\x12+\n\'WORKER_FAULT_CATEGORY_INVALID_ARGUMENTS\x10\x01\x12\x37\n3WORKER_FAULT_CATEGORY_GATEWAY_AUTHENTICATION_FAILED\x10\x02\x12+\n\'WORKER_FAULT_CATEGORY_PROTOCOL_MISMATCH\x10\x03\x12,\n(WORKER_FAULT_CATEGORY_PROTOCOL_VIOLATION\x10\x04\x12+\n\'WORKER_FAULT_CATEGORY_PIPE_DISCONNECTED\x10\x05\x12\x32\n.WORKER_FAULT_CATEGORY_MXACCESS_CREATION_FAILED\x10\x06\x12\x31\n-WORKER_FAULT_CATEGORY_MXACCESS_COMMAND_FAILED\x10\x07\x12:\n6WORKER_FAULT_CATEGORY_MXACCESS_EVENT_CONVERSION_FAILED\x10\x08\x12\"\n\x1eWORKER_FAULT_CATEGORY_STA_HUNG\x10\t\x12(\n$WORKER_FAULT_CATEGORY_QUEUE_OVERFLOW\x10\n\x12*\n&WORKER_FAULT_CATEGORY_SHUTDOWN_TIMEOUT\x10\x0b\x42\x1c\xaa\x02\x19MxGateway.Contracts.Protob\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'mxaccess_worker_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\252\002\031MxGateway.Contracts.Proto'
_globals['_WORKERSTATE']._serialized_start=2316
_globals['_WORKERSTATE']._serialized_end=2595
_globals['_WORKERFAULTCATEGORY']._serialized_start=2598
_globals['_WORKERFAULTCATEGORY']._serialized_end=3181
_globals['_WORKERENVELOPE']._serialized_start=135
_globals['_WORKERENVELOPE']._serialized_end=924
_globals['_GATEWAYHELLO']._serialized_start=926
_globals['_GATEWAYHELLO']._serialized_end=1016
_globals['_WORKERHELLO']._serialized_start=1018
_globals['_WORKERHELLO']._serialized_end=1123
_globals['_WORKERREADY']._serialized_start=1126
_globals['_WORKERREADY']._serialized_end=1268
_globals['_WORKERCOMMAND']._serialized_start=1270
_globals['_WORKERCOMMAND']._serialized_end=1389
_globals['_WORKERCOMMANDREPLY']._serialized_start=1392
_globals['_WORKERCOMMANDREPLY']._serialized_end=1521
_globals['_WORKERCANCEL']._serialized_start=1523
_globals['_WORKERCANCEL']._serialized_end=1553
_globals['_WORKERSHUTDOWN']._serialized_start=1555
_globals['_WORKERSHUTDOWN']._serialized_end=1636
_globals['_WORKERSHUTDOWNACK']._serialized_start=1638
_globals['_WORKERSHUTDOWNACK']._serialized_end=1710
_globals['_WORKEREVENT']._serialized_start=1712
_globals['_WORKEREVENT']._serialized_end=1770
_globals['_WORKERHEARTBEAT']._serialized_start=1773
_globals['_WORKERHEARTBEAT']._serialized_end=2066
_globals['_WORKERFAULT']._serialized_start=2069
_globals['_WORKERFAULT']._serialized_end=2313
# @@protoc_insertion_point(module_scope)
@@ -0,0 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in mxaccess_worker_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
+3
View File
@@ -0,0 +1,3 @@
"""Package version information."""
__version__ = "0.1.0"
@@ -0,0 +1 @@
"""Command-line entry points for the MXAccess Gateway Python client."""
@@ -0,0 +1,6 @@
"""Module execution entry point for `python -m mxgateway_cli`."""
from .commands import main
if __name__ == "__main__":
main()

Some files were not shown because too many files have changed in this diff Show More