Compare commits

..

13 Commits

Author SHA1 Message Date
Joseph Doherty bcfbd1cfc8 Merge remote-tracking branch 'origin/main' into agent-3/issue-41-scaffold-go-module 2026-04-26 19:30:16 -04:00
dohertj2 7a0743496f Merge pull request #87 from agent-1/issue-38-scaffold-dotnet-client-projects
Issue #38: scaffold .NET client projects
2026-04-26 19:31:28 -04:00
Joseph Doherty 8e3b0c1c4a Scaffold Go client module 2026-04-26 19:27:27 -04:00
Joseph Doherty bd4be85f26 Merge remote-tracking branch 'origin/main' into agent-1/issue-38-scaffold-dotnet-client-projects 2026-04-26 19:25:15 -04:00
Joseph Doherty 7331c6157a Scaffold .NET client projects 2026-04-26 19:25:07 -04:00
dohertj2 cbc317e3e7 Merge pull request #85 from agent-3/issue-32-implement-heartbeat-and-watchdog
Issue #32: implement heartbeat and watchdog
2026-04-26 19:20:15 -04:00
Joseph Doherty 7242cf772b Merge remote-tracking branch 'origin/main' into agent-3/issue-32-implement-heartbeat-and-watchdog 2026-04-26 19:16:56 -04:00
Joseph Doherty 7d67313a7d Merge remote-tracking branch 'origin/main' into agent-3/issue-32-implement-heartbeat-and-watchdog
# Conflicts:
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
#	src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs
2026-04-26 19:16:42 -04:00
dohertj2 044b16c5db Merge pull request #84 from agent-1/issue-37-create-cross-language-client-behavior-fixtures
Issue #37: create cross-language client behavior fixtures
2026-04-26 19:15:40 -04:00
Joseph Doherty 1f92078777 Merge remote-tracking branch 'origin/main' into agent-1/issue-37-create-cross-language-client-behavior-fixtures 2026-04-26 19:12:19 -04:00
Joseph Doherty 4a3560c7ee Implement worker heartbeat watchdog 2026-04-26 19:12:06 -04:00
Joseph Doherty 108a3d3f8a Add client behavior fixtures 2026-04-26 19:11:04 -04:00
dohertj2 95e71cd819 Merge pull request #83 from agent-2/issue-29-implement-event-sink-and-event-queue
Issue #29: implement event sink and event queue
2026-04-26 19:08:52 -04:00
56 changed files with 9860 additions and 20 deletions
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
@@ -0,0 +1,48 @@
using MxGateway.Client;
namespace MxGateway.Client.Cli;
public static class MxGatewayClientCli
{
public static int Run(
string[] args,
TextWriter standardOutput,
TextWriter standardError)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
ArgumentNullException.ThrowIfNull(standardError);
if (args.Length is 0 || IsHelp(args[0]))
{
WriteUsage(standardOutput);
return 0;
}
if (string.Equals(args[0], "version", StringComparison.OrdinalIgnoreCase))
{
standardOutput.WriteLine(
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
standardOutput.WriteLine(
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
return 0;
}
standardError.WriteLine($"Unknown command '{args[0]}'.");
WriteUsage(standardError);
return 2;
}
private static bool IsHelp(string value)
{
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
}
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet version");
writer.WriteLine("mxgw-dotnet --help");
}
}
@@ -0,0 +1,3 @@
using MxGateway.Client.Cli;
return MxGatewayClientCli.Run(args, Console.Out, Console.Error);
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
<ProjectReference Include="..\MxGateway.Client.Cli\MxGateway.Client.Cli.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,20 @@
using MxGateway.Client.Cli;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientCliTests
{
[Fact]
public void Run_Version_PrintsCompiledProtocolVersions()
{
using var output = new StringWriter();
using var error = new StringWriter();
var exitCode = MxGatewayClientCli.Run(["version"], output, error);
Assert.Equal(0, exitCode);
Assert.Contains("gateway-protocol=1", output.ToString());
Assert.Contains("worker-protocol=1", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientContractInfoTests
{
[Fact]
public void GatewayProtocolVersion_MatchesSharedContract()
{
Assert.Equal(
GatewayContractInfo.GatewayProtocolVersion,
MxGatewayClientContractInfo.GatewayProtocolVersion);
}
[Fact]
public void WorkerProtocolVersion_MatchesSharedContract()
{
Assert.Equal(
GatewayContractInfo.WorkerProtocolVersion,
MxGatewayClientContractInfo.WorkerProtocolVersion);
}
}
@@ -0,0 +1,28 @@
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientOptionsTests
{
[Fact]
public void Validate_WithAbsoluteEndpointAndApiKey_Succeeds()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
};
options.Validate();
}
[Fact]
public void Validate_WithEmptyApiKey_Throws()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "",
};
Assert.Throws<ArgumentException>(options.Validate);
}
}
@@ -0,0 +1,18 @@
namespace MxGateway.Client.Tests;
public sealed class MxGatewayGeneratedContractTests
{
[Fact]
public async Task GeneratedGrpcClient_CanBeConstructedFromClientFactory()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
};
await using var client = MxGatewayClient.Create(options);
Assert.NotNull(client.RawClient);
}
}
+76
View File
@@ -0,0 +1,76 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client", "MxGateway.Client\MxGateway.Client.csproj", "{7CF9ED88-1F32-4040-BEB1-D0902E304C70}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Contracts", "..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj", "{9AB807A8-0469-40F7-A000-D240F36B6E5D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Cli", "MxGateway.Client.Cli\MxGateway.Client.Cli.csproj", "{EB061E77-2475-4322-9257-3F2456DD141C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Tests", "MxGateway.Client.Tests\MxGateway.Client.Tests.csproj", "{B77B5A8E-0C53-4419-9BCD-227C9753A074}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.ActiveCfg = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.Build.0 = Debug|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.Build.0 = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.Build.0 = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.ActiveCfg = Release|Any CPU
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.ActiveCfg = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.Build.0 = Debug|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.Build.0 = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.ActiveCfg = Release|Any CPU
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.ActiveCfg = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.Build.0 = Debug|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.Build.0 = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.ActiveCfg = Release|Any CPU
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.ActiveCfg = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.Build.0 = Debug|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.Build.0 = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.ActiveCfg = Release|Any CPU
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.76.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
</ItemGroup>
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
@@ -0,0 +1,41 @@
using Grpc.Net.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Provides the initial .NET client entry point and raw generated gRPC client.
/// </summary>
public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private MxGatewayClient(GrpcChannel channel)
{
_channel = channel;
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
}
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
public static MxGatewayClient Create(MxGatewayClientOptions options)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
var channel = GrpcChannel.ForAddress(
options.Endpoint,
new GrpcChannelOptions
{
LoggerFactory = options.LoggerFactory,
});
return new MxGatewayClient(channel);
}
public ValueTask DisposeAsync()
{
_channel.Dispose();
return ValueTask.CompletedTask;
}
}
@@ -0,0 +1,15 @@
using MxGateway.Contracts;
namespace MxGateway.Client;
/// <summary>
/// Exposes the protocol versions compiled into this client package.
/// </summary>
public static class MxGatewayClientContractInfo
{
public const uint GatewayProtocolVersion =
GatewayContractInfo.GatewayProtocolVersion;
public const uint WorkerProtocolVersion =
GatewayContractInfo.WorkerProtocolVersion;
}
@@ -0,0 +1,58 @@
using Microsoft.Extensions.Logging;
namespace MxGateway.Client;
/// <summary>
/// Configures the gRPC channel used by the .NET MXAccess Gateway client.
/// </summary>
public sealed class MxGatewayClientOptions
{
public required Uri Endpoint { get; init; }
public required string ApiKey { get; init; }
public bool UseTls { get; init; }
public string? CaCertificatePath { get; init; }
public string? ServerNameOverride { get; init; }
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30);
public ILoggerFactory? LoggerFactory { get; init; }
public void Validate()
{
ArgumentNullException.ThrowIfNull(Endpoint);
if (!Endpoint.IsAbsoluteUri)
{
throw new ArgumentException(
"The gateway endpoint must be an absolute URI.",
nameof(Endpoint));
}
if (string.IsNullOrWhiteSpace(ApiKey))
{
throw new ArgumentException(
"The gateway API key must not be empty.",
nameof(ApiKey));
}
if (ConnectTimeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(ConnectTimeout),
"The connect timeout must be greater than zero.");
}
if (DefaultCallTimeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(DefaultCallTimeout),
"The default call timeout must be greater than zero.");
}
}
}
+24
View File
@@ -0,0 +1,24 @@
# .NET Client Projects
The .NET client workspace contains the MXAccess Gateway client library, test
CLI, and unit tests.
## Projects
| Project | Purpose |
|---------|---------|
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
the client compiles against the same generated protobuf and gRPC types as the
gateway. `clients/dotnet/generated` remains reserved for generator output if a
future client build switches to client-local `Grpc.Tools` generation.
## Build And Test
```powershell
dotnet build clients/dotnet/MxGateway.Client.sln
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
```
+55
View File
@@ -0,0 +1,55 @@
# Go Client
The Go client module contains the generated MXAccess Gateway protobuf bindings,
a small handwritten `mxgateway` package, and the `mxgw-go` test CLI scaffold.
The module uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
## Layout
```text
clients/go/
go.mod
generate-proto.ps1
internal/generated/
mxgateway/
cmd/mxgw-go/
```
`internal/generated` contains code produced by `protoc`, `protoc-gen-go`, and
`protoc-gen-go-grpc`. Do not edit generated files by hand.
## Regenerating Protobuf Bindings
Run generation after the shared `.proto` files or the Go output path changes:
```powershell
./generate-proto.ps1
```
The script uses the tool paths recorded in `../../docs/toolchain-links.md`.
## Build And Test
Run the Go module checks from `clients/go`:
```powershell
go test ./...
go build ./...
```
The scaffold tests parse the shared golden JSON fixtures with the generated Go
types. Later client implementation tests add fake gRPC services, auth metadata,
streaming, value conversion, and CLI behavior.
## CLI
The scaffold CLI exposes version information:
```powershell
go run ./cmd/mxgw-go version -json
```
Additional commands are implemented with the client/session wrapper work.
+63
View File
@@ -0,0 +1,63 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"os"
"gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway"
)
type versionOutput struct {
ClientVersion string `json:"clientVersion"`
GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"`
WorkerProtocolVersion uint32 `json:"workerProtocolVersion"`
}
func main() {
if err := run(os.Args[1:]); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
}
func run(args []string) error {
if len(args) == 0 {
return fmt.Errorf("usage: mxgw-go version [-json]")
}
switch args[0] {
case "version":
return runVersion(args[1:])
default:
return fmt.Errorf("unknown command %q", args[0])
}
}
func runVersion(args []string) error {
flags := flag.NewFlagSet("version", flag.ContinueOnError)
flags.SetOutput(os.Stderr)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
output := versionOutput{
ClientVersion: mxgateway.ClientVersion,
GatewayProtocolVersion: mxgateway.GatewayProtocolVersion,
WorkerProtocolVersion: mxgateway.WorkerProtocolVersion,
}
if *jsonOutput {
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(output)
}
fmt.Fprintf(os.Stdout, "mxgw-go %s\n", output.ClientVersion)
fmt.Fprintf(os.Stdout, "gateway protocol %d\n", output.GatewayProtocolVersion)
fmt.Fprintf(os.Stdout, "worker protocol %d\n", output.WorkerProtocolVersion)
return nil
}
+42
View File
@@ -0,0 +1,42 @@
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
$outputRoot = Join-Path $PSScriptRoot 'internal\generated'
$modulePath = 'gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated'
$protoc = 'C:\Users\dohertj2\AppData\Local\Microsoft\WinGet\Packages\Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe\bin\protoc.exe'
$goPluginPath = 'C:\Users\dohertj2\go\bin'
if (-not (Test-Path $protoc)) {
throw "protoc was not found at $protoc. See docs/toolchain-links.md."
}
foreach ($pluginName in @('protoc-gen-go.exe', 'protoc-gen-go-grpc.exe')) {
$pluginPath = Join-Path $goPluginPath $pluginName
if (-not (Test-Path $pluginPath)) {
throw "$pluginName was not found at $pluginPath. See docs/toolchain-links.md."
}
}
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
Get-ChildItem -Path $outputRoot -Filter '*.pb.go' -File | Remove-Item
$env:Path = "$goPluginPath;$env:Path"
& $protoc `
--proto_path=$protoRoot `
--go_out=$outputRoot `
--go_opt=paths=source_relative `
"--go_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
"--go_opt=Mmxaccess_worker.proto=$modulePath;generated" `
mxaccess_gateway.proto `
mxaccess_worker.proto
& $protoc `
--proto_path=$protoRoot `
--go-grpc_out=$outputRoot `
--go-grpc_opt=paths=source_relative `
"--go-grpc_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
mxaccess_gateway.proto
+15
View File
@@ -0,0 +1,15 @@
module gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go
go 1.26
require (
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
)
require (
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
)
+38
View File
@@ -0,0 +1,38 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,243 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc v7.34.1
// source: mxaccess_gateway.proto
package generated
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
MxAccessGateway_OpenSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/OpenSession"
MxAccessGateway_CloseSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/CloseSession"
MxAccessGateway_Invoke_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/Invoke"
MxAccessGateway_StreamEvents_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/StreamEvents"
)
// MxAccessGatewayClient is the client API for MxAccessGateway service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// Public client API for MXAccess sessions hosted by the gateway.
type MxAccessGatewayClient interface {
OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error)
CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error)
Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error)
StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error)
}
type mxAccessGatewayClient struct {
cc grpc.ClientConnInterface
}
func NewMxAccessGatewayClient(cc grpc.ClientConnInterface) MxAccessGatewayClient {
return &mxAccessGatewayClient{cc}
}
func (c *mxAccessGatewayClient) OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(OpenSessionReply)
err := c.cc.Invoke(ctx, MxAccessGateway_OpenSession_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CloseSessionReply)
err := c.cc.Invoke(ctx, MxAccessGateway_CloseSession_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(MxCommandReply)
err := c.cc.Invoke(ctx, MxAccessGateway_Invoke_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mxAccessGatewayClient) StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &MxAccessGateway_ServiceDesc.Streams[0], MxAccessGateway_StreamEvents_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[StreamEventsRequest, MxEvent]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type MxAccessGateway_StreamEventsClient = grpc.ServerStreamingClient[MxEvent]
// MxAccessGatewayServer is the server API for MxAccessGateway service.
// All implementations must embed UnimplementedMxAccessGatewayServer
// for forward compatibility.
//
// Public client API for MXAccess sessions hosted by the gateway.
type MxAccessGatewayServer interface {
OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error)
CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error)
Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error)
StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error
mustEmbedUnimplementedMxAccessGatewayServer()
}
// UnimplementedMxAccessGatewayServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedMxAccessGatewayServer struct{}
func (UnimplementedMxAccessGatewayServer) OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error) {
return nil, status.Error(codes.Unimplemented, "method OpenSession not implemented")
}
func (UnimplementedMxAccessGatewayServer) CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error) {
return nil, status.Error(codes.Unimplemented, "method CloseSession not implemented")
}
func (UnimplementedMxAccessGatewayServer) Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error) {
return nil, status.Error(codes.Unimplemented, "method Invoke not implemented")
}
func (UnimplementedMxAccessGatewayServer) StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error {
return status.Error(codes.Unimplemented, "method StreamEvents not implemented")
}
func (UnimplementedMxAccessGatewayServer) mustEmbedUnimplementedMxAccessGatewayServer() {}
func (UnimplementedMxAccessGatewayServer) testEmbeddedByValue() {}
// UnsafeMxAccessGatewayServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to MxAccessGatewayServer will
// result in compilation errors.
type UnsafeMxAccessGatewayServer interface {
mustEmbedUnimplementedMxAccessGatewayServer()
}
func RegisterMxAccessGatewayServer(s grpc.ServiceRegistrar, srv MxAccessGatewayServer) {
// If the following call panics, it indicates UnimplementedMxAccessGatewayServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&MxAccessGateway_ServiceDesc, srv)
}
func _MxAccessGateway_OpenSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(OpenSessionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).OpenSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_OpenSession_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).OpenSession(ctx, req.(*OpenSessionRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_CloseSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CloseSessionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).CloseSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_CloseSession_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).CloseSession(ctx, req.(*CloseSessionRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_Invoke_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(MxCommandRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MxAccessGatewayServer).Invoke(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: MxAccessGateway_Invoke_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MxAccessGatewayServer).Invoke(ctx, req.(*MxCommandRequest))
}
return interceptor(ctx, in, info, handler)
}
func _MxAccessGateway_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamEventsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(MxAccessGatewayServer).StreamEvents(m, &grpc.GenericServerStream[StreamEventsRequest, MxEvent]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type MxAccessGateway_StreamEventsServer = grpc.ServerStreamingServer[MxEvent]
// MxAccessGateway_ServiceDesc is the grpc.ServiceDesc for MxAccessGateway service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var MxAccessGateway_ServiceDesc = grpc.ServiceDesc{
ServiceName: "mxaccess_gateway.v1.MxAccessGateway",
HandlerType: (*MxAccessGatewayServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "OpenSession",
Handler: _MxAccessGateway_OpenSession_Handler,
},
{
MethodName: "CloseSession",
Handler: _MxAccessGateway_CloseSession_Handler,
},
{
MethodName: "Invoke",
Handler: _MxAccessGateway_Invoke_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamEvents",
Handler: _MxAccessGateway_StreamEvents_Handler,
ServerStreams: true,
},
},
Metadata: "mxaccess_gateway.proto",
}
File diff suppressed because it is too large Load Diff
+33
View File
@@ -0,0 +1,33 @@
package mxgateway
import "strings"
// Options configures future gateway connections.
type Options struct {
Endpoint string
APIKey string
Plaintext bool
CACertFile string
ServerNameOverride string
}
// RedactedAPIKey returns a display-safe representation of the configured API
// key for diagnostics and CLI output.
func (o Options) RedactedAPIKey() string {
return RedactAPIKey(o.APIKey)
}
// RedactAPIKey hides credential material while keeping enough shape for
// troubleshooting whether a key was supplied.
func RedactAPIKey(apiKey string) string {
if apiKey == "" {
return ""
}
if len(apiKey) <= 8 {
return "<redacted>"
}
prefix, suffix := apiKey[:4], apiKey[len(apiKey)-4:]
return prefix + strings.Repeat("*", len(apiKey)-8) + suffix
}
+23
View File
@@ -0,0 +1,23 @@
package mxgateway
import "testing"
func TestRedactAPIKey(t *testing.T) {
tests := []struct {
name string
apiKey string
want string
}{
{name: "empty", apiKey: "", want: ""},
{name: "short", apiKey: "mxgw_1", want: "<redacted>"},
{name: "long", apiKey: "mxgw_key_secret", want: "mxgw*******cret"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RedactAPIKey(tt.apiKey); got != tt.want {
t.Fatalf("RedactAPIKey() = %q, want %q", got, tt.want)
}
})
}
}
@@ -0,0 +1,69 @@
package mxgateway
import (
"os"
"path/filepath"
"testing"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func TestGeneratedGoldenFixturesParse(t *testing.T) {
tests := []struct {
name string
path string
msg proto.Message
}{
{
name: "open session reply",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"),
msg: &pb.OpenSessionReply{},
},
{
name: "register command request",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "register-command-request.json"),
msg: &pb.MxCommandRequest{},
},
{
name: "on data change event",
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "on-data-change-event.json"),
msg: &pb.MxEvent{},
},
}
unmarshal := protojson.UnmarshalOptions{DiscardUnknown: false}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := os.ReadFile(tt.path)
if err != nil {
t.Fatalf("read fixture: %v", err)
}
if err := unmarshal.Unmarshal(data, tt.msg); err != nil {
t.Fatalf("parse fixture: %v", err)
}
})
}
}
func TestOpenSessionFixtureProtocolVersions(t *testing.T) {
data, err := os.ReadFile(filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}
var reply pb.OpenSessionReply
if err := protojson.Unmarshal(data, &reply); err != nil {
t.Fatalf("parse fixture: %v", err)
}
if reply.GetGatewayProtocolVersion() != GatewayProtocolVersion {
t.Fatalf("gateway protocol = %d, want %d", reply.GetGatewayProtocolVersion(), GatewayProtocolVersion)
}
if reply.GetWorkerProtocolVersion() != WorkerProtocolVersion {
t.Fatalf("worker protocol = %d, want %d", reply.GetWorkerProtocolVersion(), WorkerProtocolVersion)
}
}
+15
View File
@@ -0,0 +1,15 @@
package mxgateway
const (
// ClientVersion identifies this Go client scaffold before package releases
// assign semantic versions.
ClientVersion = "0.1.0-dev"
// GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion
// in the shared .NET contracts.
GatewayProtocolVersion uint32 = 1
// WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion
// and is exposed for fake-worker and parity tests.
WorkerProtocolVersion uint32 = 1
)
@@ -0,0 +1,36 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "missing-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": ""
},
"expectedRedactedOutput": "authentication failed: missing bearer token",
"retryableWithoutCredentialChange": false
},
{
"id": "invalid-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"expectedRedactedOutput": "authentication failed: invalid API key <redacted>",
"retryableWithoutCredentialChange": false
},
{
"id": "missing-write-scope",
"grpcStatusCode": "PERMISSION_DENIED",
"clientErrorCategory": "AuthorizationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"requiredScope": "mxaccess.write",
"expectedRedactedOutput": "authorization failed: missing scope mxaccess.write",
"retryableWithoutCredentialChange": false
}
]
}
@@ -0,0 +1,30 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-register-1",
"kind": "MX_COMMAND_KIND_REGISTER",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_OK",
"message": "Register completed."
},
"hresult": 0,
"returnValue": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 12
},
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"diagnosticMessage": "COM Register returned server handle 12.",
"register": {
"serverHandle": 12
}
}
@@ -0,0 +1,38 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-write-1",
"kind": "MX_COMMAND_KIND_WRITE",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_MXACCESS_FAILURE",
"message": "MXAccess rejected the write."
},
"hresult": -2147220992,
"returnValue": {
"dataType": "MX_DATA_TYPE_NO_DATA",
"variantType": "VT_EMPTY",
"isNull": true,
"rawDiagnostic": "MXAccess returned no value for the failed write.",
"rawDataType": 2
},
"statuses": [
{
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 321,
"rawCategory": 8,
"rawDetectedBy": 3,
"diagnosticText": "Write denied by provider security."
},
{
"success": 0,
"category": "MX_STATUS_CATEGORY_OPERATIONAL_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 902,
"rawCategory": 7,
"rawDetectedBy": 5,
"diagnosticText": "Provider rejected the item state."
}
],
"diagnosticMessage": "Fixture preserves a data-bearing MXAccess failure reply with HRESULT and status array."
}
@@ -0,0 +1,159 @@
{
"sessionId": "session-fixture",
"description": "Ordered event stream sample for one worker-backed session.",
"events": [
{
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 123
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:00Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"workerSequence": "1",
"workerTimestamp": "2026-01-01T00:00:00.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:00.015Z",
"onDataChange": {}
},
{
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_DOUBLE",
"variantType": "VT_R8",
"doubleValue": 45.5
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:01Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Write complete."
}
],
"workerSequence": "2",
"workerTimestamp": "2026-01-01T00:00:01.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:01.015Z",
"hresult": 0,
"onWriteComplete": {}
},
{
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_BSTR",
"stringValue": "operation-complete"
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:02Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Operation complete."
}
],
"workerSequence": "3",
"workerTimestamp": "2026-01-01T00:00:02.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:02.015Z",
"operationComplete": {}
},
{
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_FLOAT",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_FLOAT",
"variantType": "VT_ARRAY|VT_R4",
"dimensions": [
2
],
"floatValues": {
"values": [
1.5,
2.5
]
}
}
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:03Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Buffered data delivered."
}
],
"workerSequence": "4",
"workerTimestamp": "2026-01-01T00:00:03.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:03.015Z",
"onBufferedDataChange": {
"dataType": "MX_DATA_TYPE_FLOAT",
"qualityValues": {
"elementDataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_ARRAY|VT_I4",
"dimensions": [
2
],
"int32Values": {
"values": [
192,
192
]
}
},
"timestampValues": {
"elementDataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_ARRAY|VT_DATE",
"dimensions": [
2
],
"timestampValues": {
"values": [
"2026-01-01T00:00:02Z",
"2026-01-01T00:00:03Z"
]
}
},
"rawDataType": 5
}
}
]
}
@@ -0,0 +1,59 @@
{
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-client-behavior",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"protoInputManifest": "clients/proto/proto-inputs.json",
"fixtures": [
{
"id": "command-reply.register.ok",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/register.ok.reply.json",
"expectation": "Successful command replies preserve protocol status, HRESULT, return value, status arrays, and method-specific output."
},
{
"id": "command-reply.write.mxaccess-failure",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/write.mxaccess-failure.reply.json",
"expectation": "MXAccess failures are data-bearing replies with HRESULT and status details, not transport failures."
},
{
"id": "event-stream.session-ordered",
"category": "event_streams",
"messageType": "mxaccess_gateway.v1.MxEvent",
"path": "event-streams/session-event-stream.json",
"expectation": "Clients preserve per-session event order and event family bodies exactly as emitted."
},
{
"id": "values.conversion-cases",
"category": "value_conversion",
"messageType": "mxaccess_gateway.v1.MxValue",
"path": "values/value-conversion-cases.json",
"expectation": "Clients expose typed projections and keep raw fallback metadata when conversion is incomplete."
},
{
"id": "statuses.conversion-cases",
"category": "status_conversion",
"messageType": "mxaccess_gateway.v1.MxStatusProxy",
"path": "statuses/status-conversion-cases.json",
"expectation": "Clients preserve every MXSTATUS_PROXY field, including raw category/source values."
},
{
"id": "auth.error-cases",
"category": "auth_errors",
"messageType": "client_behavior.v1.AuthErrorCase",
"path": "auth/auth-error-cases.json",
"expectation": "Clients map authentication and authorization failures distinctly and redact credentials."
},
{
"id": "timeout-cancel.expected-behavior",
"category": "timeout_cancel",
"messageType": "client_behavior.v1.TimeoutCancelCase",
"path": "timeout-cancel/timeout-cancel-cases.json",
"expectation": "Client cancellation stops waiting locally but does not imply an in-flight MXAccess COM call was aborted."
}
]
}
@@ -0,0 +1,41 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "ok.responding-lmx",
"status": {
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
},
{
"id": "security-error.requesting-lmx",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_REQUESTING_LMX",
"detail": 401,
"rawCategory": 8,
"rawDetectedBy": 2,
"diagnosticText": "Requesting LMX denied the secured operation."
}
},
{
"id": "raw-unknown-category",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_UNKNOWN",
"detectedBy": "MX_STATUS_SOURCE_UNKNOWN",
"detail": 65535,
"rawCategory": 99,
"rawDetectedBy": 77,
"diagnosticText": "Unknown native MXSTATUS_PROXY fields are preserved."
}
}
]
}
@@ -0,0 +1,27 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "unary-deadline-exceeded",
"operation": "Invoke",
"clientDeadline": "2s",
"grpcStatusCode": "DEADLINE_EXCEEDED",
"clientErrorCategory": "TimeoutError",
"gatewayWaitBehavior": "stops_waiting_for_reply",
"workerCommandBehavior": "continues_until_worker_reply_or_worker_fault",
"sessionExpectation": "session_state_is_unknown_until_follow_up_status_or_close",
"expectedClientAction": "issue GetSessionState or CloseSession before reusing handles"
},
{
"id": "stream-cancel",
"operation": "StreamEvents",
"clientDeadline": "5s",
"grpcStatusCode": "CANCELLED",
"clientErrorCategory": "CancelledError",
"gatewayWaitBehavior": "stops_streaming_to_that_call",
"workerCommandBehavior": "does_not_cancel_worker_session",
"sessionExpectation": "session_remains_ready_if_worker_stays_healthy",
"expectedClientAction": "open a new StreamEvents call with the last observed worker sequence"
}
]
}
@@ -0,0 +1,85 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "bool.true",
"expectedKind": "boolValue",
"value": {
"dataType": "MX_DATA_TYPE_BOOLEAN",
"variantType": "VT_BOOL",
"boolValue": true
}
},
{
"id": "int64.large",
"expectedKind": "int64Value",
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I8",
"int64Value": "9223372036854770000"
}
},
{
"id": "timestamp.utc",
"expectedKind": "timestampValue",
"value": {
"dataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_DATE",
"timestampValue": "2026-01-01T00:00:04Z"
}
},
{
"id": "string-array",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_ARRAY|VT_BSTR",
"dimensions": [
2
],
"stringValues": {
"values": [
"alpha",
"beta"
]
}
}
}
},
{
"id": "raw-fallback.variant",
"expectedKind": "rawValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_RECORD",
"rawDiagnostic": "No lossless typed projection exists for this VARIANT.",
"rawDataType": 32767,
"rawValue": "AQIDBAU="
}
},
{
"id": "raw-array-fallback",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_ARRAY|VT_VARIANT",
"dimensions": [
2
],
"rawDiagnostic": "Array elements contain mixed VARIANT types.",
"rawElementDataType": 32767,
"rawValues": {
"values": [
"AAE=",
"AgM="
]
}
}
}
}
]
}
+1
View File
@@ -16,6 +16,7 @@
],
"descriptorSet": "clients/proto/descriptors/mxaccessgw-client-v1.protoset",
"fixtureRoot": "clients/proto/fixtures/golden",
"behaviorFixtureRoot": "clients/proto/fixtures/behavior",
"generatedOutputs": {
"dotnet": "clients/dotnet/generated",
"go": "clients/go/internal/generated",
+106
View File
@@ -0,0 +1,106 @@
# Client Behavior Fixtures
Client behavior fixtures define the shared expectations used by the official
.NET, Go, Rust, Python, and Java clients. They keep wrapper behavior aligned
while each language exposes idiomatic APIs over the same protobuf contract.
## Fixture Set
The fixture manifest is `clients/proto/fixtures/behavior/manifest.json`.
`clients/proto/proto-inputs.json` references the fixture root through
`behaviorFixtureRoot` so generators and client test projects can discover the
same files they use for descriptor inputs.
The fixture set contains:
- command reply protobuf JSON,
- ordered event stream protobuf JSON samples,
- `MxValue` conversion case sets,
- `MxStatusProxy` conversion case sets,
- authentication and authorization error expectations,
- timeout and cancellation behavior expectations.
Protobuf message fixtures use protobuf JSON field names and enum values. Files
that describe client wrapper behavior use explicit JSON fields instead of a
proto message because those expectations apply above the generated transport
types.
## Command Replies
Command reply fixtures live in
`clients/proto/fixtures/behavior/command-replies/`. They parse as
`mxaccess_gateway.v1.MxCommandReply`.
Clients use these fixtures to verify that successful and failed MXAccess
commands both carry the full reply details:
- `protocolStatus`,
- `hresult`,
- `returnValue`,
- repeated `statuses`,
- method-specific reply payloads when MXAccess returns out parameters.
MXAccess failures remain command replies when the gateway reached the worker and
the worker captured HRESULT or `MXSTATUS_PROXY` details. Client wrappers should
map those replies to rich command errors without discarding the raw reply.
## Event Streams
Event stream fixtures live in
`clients/proto/fixtures/behavior/event-streams/`. Each file contains an ordered
`events` array whose entries parse as `mxaccess_gateway.v1.MxEvent`.
Clients use these fixtures to verify that stream helpers preserve
`workerSequence` order and expose each native event family:
- `OnDataChange`,
- `OnWriteComplete`,
- `OperationComplete`,
- `OnBufferedDataChange`.
Wrappers must not reorder, coalesce, or drop events while reading the fixture.
## Value And Status Conversion
Value fixtures live in `clients/proto/fixtures/behavior/values/`. Each case
contains a `value` object that parses as `mxaccess_gateway.v1.MxValue`.
Status fixtures live in `clients/proto/fixtures/behavior/statuses/`. Each case
contains a `status` object that parses as
`mxaccess_gateway.v1.MxStatusProxy`.
Clients use these fixtures to verify typed projections and raw fallback
behavior. A language helper may expose native booleans, integers, strings,
arrays, and timestamps, but it must keep `rawDiagnostic`, raw data type fields,
and raw byte payloads accessible when conversion is incomplete.
## Auth, Timeout, And Cancel Behavior
Authentication fixtures live in `clients/proto/fixtures/behavior/auth/`. They
separate `UNAUTHENTICATED` from `PERMISSION_DENIED` so clients map missing or
invalid credentials differently from missing scopes. Expected output strings
contain only redacted credentials.
Timeout and cancellation fixtures live in
`clients/proto/fixtures/behavior/timeout-cancel/`. They document that canceling
or timing out a client call stops the client from waiting, but it does not abort
an in-flight MXAccess COM call on the worker STA. Clients should follow up with
`GetSessionState` or `CloseSession` before reusing handles after an uncertain
command timeout.
## Validation
Run the fixture validation tests after changing the behavior fixture set:
```bash
powershell -ExecutionPolicy Bypass -File scripts/validate-client-behavior-fixtures.ps1
```
The script runs the focused C# contract tests that parse all protobuf JSON
fixtures and validate deterministic wrapper expectation files.
## Related Documentation
- [Client Proto Generation](./client-proto-generation.md)
- [Client Libraries Detailed Design](./client-libraries-design.md)
- [Protobuf Contracts](./Contracts.md)
+3
View File
@@ -18,6 +18,7 @@ starting `MxGateway.Worker.exe` or loading MXAccess COM. The harness scripts:
- `WorkerHello` and `WorkerReady` startup,
- command replies with matching correlation ids,
- ordered `WorkerEvent` frames,
- `WorkerHeartbeat` frames,
- `WorkerFault` frames,
- shutdown acknowledgements,
- malformed protobuf payloads and oversized frame headers,
@@ -43,6 +44,8 @@ event streaming behavior:
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~GatewayEndToEndFakeWorkerSmokeTests
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~WorkerClientTests
dotnet test src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter FullyQualifiedName~WorkerPipeSessionTests
```
Run the gateway test project after shared gateway test infrastructure changes:
+6
View File
@@ -29,6 +29,7 @@ Language-specific plans:
Shared generation inputs:
- `docs/client-proto-generation.md`
- `docs/ClientBehaviorFixtures.md`
- `clients/proto/proto-inputs.json`
Language style guides:
@@ -310,6 +311,11 @@ CLI output should support JSON for automated tests.
Unit tests must run without a live gateway. Use fake gRPC services, mock
transports, or generated test servers depending on language.
Shared behavior fixtures live in `clients/proto/fixtures/behavior`. Every
client should include tests that load the fixture manifest and verify wrapper
behavior against the common command reply, event stream, value conversion,
status conversion, auth error, and timeout/cancel cases.
Required unit test areas:
- options parsing,
+33
View File
@@ -16,6 +16,7 @@ records:
- the public and worker source files,
- the descriptor set path,
- golden fixture locations,
- behavior fixture locations,
- generated-code output directories for each planned client.
The source files listed by the manifest are:
@@ -99,6 +100,17 @@ Go clients should generate `mxaccess_gateway.proto` and
`protoc-gen-go` and `protoc-gen-go-grpc`. Keep generated packages internal
unless the wrapper API intentionally exposes raw protobuf messages.
The Go scaffold provides a repo-local generation script:
```powershell
clients/go/generate-proto.ps1
```
The script maps both proto files into the internal Go package
`gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated` because
the source `.proto` files do not carry Go-specific `go_package` options. This
keeps language-specific packaging outside the public contract files.
Rust clients should use `tonic-build` or the selected protobuf generator from
the Rust client build script, with generated modules placed under
`clients/rust/src/generated` or included from the build output according to the
@@ -125,9 +137,30 @@ The fixtures use protobuf JSON field names and enum values. Contract tests parse
them with the generated C# types so schema drift is caught before client
generation work starts.
## Behavior Fixtures
Cross-language behavior fixtures live in
`clients/proto/fixtures/behavior`. The manifest
`clients/proto/fixtures/behavior/manifest.json` lists command replies, ordered
event stream samples, value conversion cases, status conversion cases, auth
error expectations, and timeout/cancel expectations.
The behavior fixtures let each generated client wrapper test the same
expectations without a live gateway. Protobuf message fixtures parse with the
generated types. Auth and timeout/cancel files describe wrapper behavior above
the generated transport layer, including credential redaction and the rule that
client cancellation does not abort an in-flight MXAccess COM call.
Run the focused validation script after changing these fixtures:
```powershell
scripts/validate-client-behavior-fixtures.ps1
```
## Related Documentation
- [Protobuf Contracts](./Contracts.md)
- [Client Libraries Detailed Design](./client-libraries-design.md)
- [Client Behavior Fixtures](./ClientBehaviorFixtures.md)
- [Client Libraries Implementation Plan](./implementation-plan-clients.md)
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
+7
View File
@@ -16,6 +16,7 @@ Recommended layout:
```text
clients/dotnet/
MxGateway.Client.sln
MxGateway.Client/
MxGateway.Client.csproj
GatewayClient.cs
@@ -41,6 +42,12 @@ Target framework:
<TargetFramework>net10.0</TargetFramework>
```
The scaffold uses a project reference to
`src/MxGateway.Contracts/MxGateway.Contracts.csproj` for generated protobuf and
gRPC types. `clients/dotnet/generated` remains reserved for client-local
generator output if the .NET client later needs to decouple from the contracts
project.
Expected packages:
- `Grpc.Net.Client`
+20 -8
View File
@@ -618,13 +618,19 @@ Do not drop or coalesce events in v1.
## Heartbeat And Watchdog
The worker heartbeat should prove that:
`WorkerPipeSession` starts the heartbeat loop after the gateway validates
`WorkerHello` and receives `WorkerReady`. Heartbeats continue until
`WorkerShutdown`, cancellation, or a pipe/protocol failure stops the session.
The loop uses `WorkerPipeSessionOptions.HeartbeatInterval`; the default matches
the gateway worker heartbeat interval.
The worker heartbeat proves that:
- pipe writer is alive,
- worker host is alive,
- STA has recently pumped or completed work.
Heartbeat payload should include:
Heartbeat payload includes:
- worker process id,
- session id,
@@ -635,13 +641,19 @@ Heartbeat payload should include:
- event sequence,
- current command correlation id if any.
The STA watchdog should warn when:
`MxAccessStaSession.CaptureHeartbeat()` reads `StaRuntime.LastActivityUtc` and
`StaCommandDispatcher` queue state without touching the raw MXAccess COM object
outside the STA. Event queue depth and event sequence are reported as zero until
the event queue implementation owns those counters.
- one command exceeds its expected duration,
- the STA has not pumped messages within the heartbeat grace period,
- event queue depth remains high.
The worker can report the problem, but the gateway owns the final kill decision.
The STA watchdog currently emits a `WorkerFault` with
`WorkerFaultCategory.StaHung` when `LastStaActivityUtc` is older than
`WorkerPipeSessionOptions.HeartbeatGrace`. The fault includes the current
command correlation id when a command is active. Command duration and high event
queue depth remain observable through heartbeat fields until dedicated
thresholds own those warnings. The worker reports stale STA activity, but the
gateway owns the final kill decision through its existing heartbeat and worker
lifecycle policy.
## Shutdown
@@ -0,0 +1,26 @@
[CmdletBinding()]
param(
[switch]$NoBuild
)
Set-StrictMode -Version Latest
$ErrorActionPreference = "Stop"
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
$testProject = Join-Path $repoRoot "src/MxGateway.Tests/MxGateway.Tests.csproj"
$arguments = @(
"test",
$testProject,
"--filter",
"ClientBehaviorFixtureTests"
)
if ($NoBuild) {
$arguments += "--no-build"
}
& dotnet @arguments
if ($LASTEXITCODE -ne 0) {
throw "Client behavior fixture validation failed with exit code $LASTEXITCODE."
}
@@ -0,0 +1,379 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
namespace MxGateway.Tests.Contracts;
public sealed class ClientBehaviorFixtureTests
{
private static readonly JsonParser ProtobufJsonParser = new(JsonParser.Settings.Default);
[Fact]
public void BehaviorManifest_DeclaresCurrentProtocolVersionsAndExistingFixtures()
{
using JsonDocument manifest = LoadBehaviorManifest();
JsonElement root = manifest.RootElement;
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
Assert.Equal("mxaccess-gateway-client-behavior", root.GetProperty("fixtureSet").GetString());
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
HashSet<string> fixtureIds = new(StringComparer.Ordinal);
foreach (JsonElement fixture in root.GetProperty("fixtures").EnumerateArray())
{
string id = fixture.GetProperty("id").GetString()!;
string path = fixture.GetProperty("path").GetString()!;
string category = fixture.GetProperty("category").GetString()!;
string messageType = fixture.GetProperty("messageType").GetString()!;
Assert.True(fixtureIds.Add(id), $"Duplicate behavior fixture id '{id}'.");
Assert.Contains(category, KnownCategories);
Assert.Contains(messageType, KnownMessageTypes);
Assert.True(
File.Exists(Path.Combine(GetBehaviorFixtureRoot().FullName, path)),
$"Expected behavior fixture '{path}' to exist.");
Assert.False(Path.IsPathRooted(path), $"Fixture path '{path}' must be relative.");
Assert.NotEmpty(fixture.GetProperty("expectation").GetString()!);
}
}
[Fact]
public void ProtoInputManifest_ReferencesBehaviorFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
string manifestPath = Path.Combine(repositoryRoot.FullName, "clients", "proto", "proto-inputs.json");
using JsonDocument manifest = JsonDocument.Parse(File.ReadAllText(manifestPath));
string fixtureRoot = manifest.RootElement.GetProperty("behaviorFixtureRoot").GetString()!;
Assert.Equal("clients/proto/fixtures/behavior", fixtureRoot);
Assert.True(Directory.Exists(Path.Combine(repositoryRoot.FullName, fixtureRoot)));
}
[Fact]
public void CommandReplyFixtures_ParseWithCurrentContractAndPreserveMxAccessDetails()
{
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("command_replies");
Assert.NotEmpty(fixtures);
foreach (JsonElement fixture in fixtures)
{
MxCommandReply reply = ParseFixture<MxCommandReply>(
fixture,
MxCommandReply.Parser);
Assert.NotEqual(MxCommandKind.Unspecified, reply.Kind);
Assert.NotEqual(ProtocolStatusCode.Unspecified, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult, $"Fixture '{GetFixtureId(fixture)}' must carry an HRESULT.");
Assert.NotEmpty(reply.Statuses);
Assert.NotEqual(MxDataType.Unspecified, reply.ReturnValue.DataType);
Assert.True(
reply.ReturnValue.KindCase != MxValue.KindOneofCase.None || reply.ReturnValue.IsNull,
$"Fixture '{GetFixtureId(fixture)}' must carry a typed value, raw value, or explicit null.");
}
MxCommandReply failedWrite = ParseFixture<MxCommandReply>(
Assert.Single(fixtures, fixture => GetFixtureId(fixture) == "command-reply.write.mxaccess-failure"),
MxCommandReply.Parser);
Assert.Equal(ProtocolStatusCode.MxaccessFailure, failedWrite.ProtocolStatus.Code);
Assert.Equal(-2147220992, failedWrite.Hresult);
Assert.True(failedWrite.Statuses.Count > 1);
Assert.All(failedWrite.Statuses, status => Assert.Equal(0, status.Success));
}
[Fact]
public void EventStreamFixtures_ParseWithMonotonicSequencesAndExpectedFamilies()
{
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("event_streams");
Assert.NotEmpty(fixtures);
foreach (JsonElement fixture in fixtures)
{
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
ulong previousSequence = 0;
List<MxEventFamily> families = [];
foreach (JsonElement eventElement in document.RootElement.GetProperty("events").EnumerateArray())
{
MxEvent gatewayEvent = ProtobufJsonParser.Parse<MxEvent>(eventElement.GetRawText());
Assert.True(gatewayEvent.WorkerSequence > previousSequence);
Assert.Equal(document.RootElement.GetProperty("sessionId").GetString(), gatewayEvent.SessionId);
Assert.NotEmpty(gatewayEvent.Statuses);
AssertEventBodyMatchesFamily(gatewayEvent);
previousSequence = gatewayEvent.WorkerSequence;
families.Add(gatewayEvent.Family);
}
Assert.Contains(MxEventFamily.OnDataChange, families);
Assert.Contains(MxEventFamily.OnWriteComplete, families);
Assert.Contains(MxEventFamily.OperationComplete, families);
Assert.Contains(MxEventFamily.OnBufferedDataChange, families);
}
}
[Fact]
public void ValueConversionFixtures_ParseTypedValuesAndRawFallbacks()
{
JsonElement cases = LoadCaseSet("value_conversion", "cases");
bool sawRawFallback = false;
bool sawRawArrayFallback = false;
bool sawTypedArray = false;
foreach (JsonElement valueCase in cases.EnumerateArray())
{
MxValue value = ProtobufJsonParser.Parse<MxValue>(
valueCase.GetProperty("value").GetRawText());
string expectedKind = valueCase.GetProperty("expectedKind").GetString()!;
Assert.NotEqual(MxDataType.Unspecified, value.DataType);
AssertJsonKindMatchesValueKind(expectedKind, value);
sawRawFallback |= value.KindCase == MxValue.KindOneofCase.RawValue
&& !string.IsNullOrWhiteSpace(value.RawDiagnostic)
&& value.RawDataType != 0;
sawRawArrayFallback |= value.KindCase == MxValue.KindOneofCase.ArrayValue
&& value.ArrayValue.ValuesCase == MxArray.ValuesOneofCase.RawValues
&& !string.IsNullOrWhiteSpace(value.ArrayValue.RawDiagnostic)
&& value.ArrayValue.RawElementDataType != 0;
sawTypedArray |= value.KindCase == MxValue.KindOneofCase.ArrayValue
&& value.ArrayValue.ValuesCase != MxArray.ValuesOneofCase.RawValues;
}
Assert.True(sawRawFallback, "Expected at least one raw scalar fallback case.");
Assert.True(sawRawArrayFallback, "Expected at least one raw array fallback case.");
Assert.True(sawTypedArray, "Expected at least one typed array case.");
}
[Fact]
public void StatusConversionFixtures_ParseStatusArraysAndRawFields()
{
JsonElement cases = LoadCaseSet("status_conversion", "cases");
bool sawRawUnknown = false;
foreach (JsonElement statusCase in cases.EnumerateArray())
{
MxStatusProxy status = ProtobufJsonParser.Parse<MxStatusProxy>(
statusCase.GetProperty("status").GetRawText());
Assert.NotEqual(MxStatusCategory.Unspecified, status.Category);
Assert.NotEqual(MxStatusSource.Unspecified, status.DetectedBy);
Assert.NotEmpty(status.DiagnosticText);
sawRawUnknown |= status.Category == MxStatusCategory.Unknown
&& status.RawCategory != 0
&& status.RawDetectedBy != 0;
}
Assert.True(sawRawUnknown, "Expected a status case with unknown raw native fields.");
}
[Fact]
public void AuthErrorFixtures_MapAuthenticationAuthorizationAndRedactCredentials()
{
JsonElement cases = LoadCaseSet("auth_errors", "cases");
HashSet<string> statusCodes = new(StringComparer.Ordinal);
foreach (JsonElement authCase in cases.EnumerateArray())
{
string grpcStatusCode = authCase.GetProperty("grpcStatusCode").GetString()!;
string category = authCase.GetProperty("clientErrorCategory").GetString()!;
string redactedOutput = authCase.GetProperty("expectedRedactedOutput").GetString()!;
string serialized = authCase.GetRawText();
Assert.Contains(grpcStatusCode, AuthGrpcStatusCodes);
Assert.Contains(category, AuthClientErrorCategories);
string authorization = authCase.GetProperty("inputMetadata").GetProperty("authorization").GetString()!;
if (!string.IsNullOrEmpty(authorization))
{
Assert.Contains("<redacted>", serialized);
}
Assert.DoesNotContain("mxgw_", serialized, StringComparison.Ordinal);
Assert.DoesNotContain("secret", redactedOutput, StringComparison.OrdinalIgnoreCase);
statusCodes.Add(grpcStatusCode);
}
Assert.Contains("UNAUTHENTICATED", statusCodes);
Assert.Contains("PERMISSION_DENIED", statusCodes);
}
[Fact]
public void TimeoutCancelFixtures_DocumentClientWaitAndWorkerCommandBehavior()
{
JsonElement cases = LoadCaseSet("timeout_cancel", "cases");
HashSet<string> statusCodes = new(StringComparer.Ordinal);
foreach (JsonElement timeoutCase in cases.EnumerateArray())
{
string grpcStatusCode = timeoutCase.GetProperty("grpcStatusCode").GetString()!;
Assert.Contains(grpcStatusCode, TimeoutGrpcStatusCodes);
Assert.NotEmpty(timeoutCase.GetProperty("clientDeadline").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("gatewayWaitBehavior").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("workerCommandBehavior").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("expectedClientAction").GetString()!);
statusCodes.Add(grpcStatusCode);
}
Assert.Contains("DEADLINE_EXCEEDED", statusCodes);
Assert.Contains("CANCELLED", statusCodes);
}
private static readonly string[] KnownCategories =
[
"command_replies",
"event_streams",
"value_conversion",
"status_conversion",
"auth_errors",
"timeout_cancel",
];
private static readonly string[] KnownMessageTypes =
[
"mxaccess_gateway.v1.MxCommandReply",
"mxaccess_gateway.v1.MxEvent",
"mxaccess_gateway.v1.MxValue",
"mxaccess_gateway.v1.MxStatusProxy",
"client_behavior.v1.AuthErrorCase",
"client_behavior.v1.TimeoutCancelCase",
];
private static readonly string[] AuthGrpcStatusCodes =
[
"UNAUTHENTICATED",
"PERMISSION_DENIED",
];
private static readonly string[] AuthClientErrorCategories =
[
"AuthenticationError",
"AuthorizationError",
];
private static readonly string[] TimeoutGrpcStatusCodes =
[
"DEADLINE_EXCEEDED",
"CANCELLED",
];
private static T ParseFixture<T>(
JsonElement fixture,
MessageParser<T> parser)
where T : IMessage<T>
{
return parser.ParseJson(File.ReadAllText(GetFixturePath(fixture)));
}
private static JsonElement LoadCaseSet(
string category,
string propertyName)
{
JsonElement fixture = Assert.Single(LoadManifestFixtures(category));
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
return document.RootElement.GetProperty(propertyName).Clone();
}
private static IReadOnlyList<JsonElement> LoadManifestFixtures(string category)
{
using JsonDocument manifest = LoadBehaviorManifest();
return manifest.RootElement
.GetProperty("fixtures")
.EnumerateArray()
.Where(fixture => fixture.GetProperty("category").GetString() == category)
.Select(fixture => fixture.Clone())
.ToArray();
}
private static JsonDocument LoadBehaviorManifest()
{
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetBehaviorFixtureRoot().FullName, "manifest.json")));
}
private static string GetFixturePath(JsonElement fixture)
{
return Path.Combine(GetBehaviorFixtureRoot().FullName, fixture.GetProperty("path").GetString()!);
}
private static string GetFixtureId(JsonElement fixture)
{
return fixture.GetProperty("id").GetString()!;
}
private static DirectoryInfo GetBehaviorFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "behavior"));
}
private static DirectoryInfo FindRepositoryRoot()
{
DirectoryInfo? current = new(AppContext.BaseDirectory);
while (current is not null)
{
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
&& Directory.Exists(Path.Combine(current.FullName, "src"))
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
{
return current;
}
current = current.Parent;
}
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
}
private static void AssertEventBodyMatchesFamily(MxEvent gatewayEvent)
{
switch (gatewayEvent.Family)
{
case MxEventFamily.OnDataChange:
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, gatewayEvent.BodyCase);
break;
case MxEventFamily.OnWriteComplete:
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, gatewayEvent.BodyCase);
break;
case MxEventFamily.OperationComplete:
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, gatewayEvent.BodyCase);
break;
case MxEventFamily.OnBufferedDataChange:
Assert.Equal(MxEvent.BodyOneofCase.OnBufferedDataChange, gatewayEvent.BodyCase);
break;
default:
throw new InvalidOperationException($"Unexpected event family '{gatewayEvent.Family}'.");
}
}
private static void AssertJsonKindMatchesValueKind(
string expectedKind,
MxValue value)
{
MxValue.KindOneofCase expected = expectedKind switch
{
"boolValue" => MxValue.KindOneofCase.BoolValue,
"int32Value" => MxValue.KindOneofCase.Int32Value,
"int64Value" => MxValue.KindOneofCase.Int64Value,
"floatValue" => MxValue.KindOneofCase.FloatValue,
"doubleValue" => MxValue.KindOneofCase.DoubleValue,
"stringValue" => MxValue.KindOneofCase.StringValue,
"timestampValue" => MxValue.KindOneofCase.TimestampValue,
"arrayValue" => MxValue.KindOneofCase.ArrayValue,
"rawValue" => MxValue.KindOneofCase.RawValue,
_ => throw new InvalidOperationException($"Unexpected expected value kind '{expectedKind}'."),
};
Assert.Equal(expected, value.KindCase);
}
}
@@ -105,6 +105,25 @@ public sealed class FakeWorkerHarnessTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
await fakeWorker.SendHeartbeatAsync(
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
await WaitUntilAsync(
() => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
}
[Fact]
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
{
@@ -284,6 +284,26 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
cancellationToken).ConfigureAwait(false);
}
public async Task SendHeartbeatAsync(
WorkerState state = WorkerState.Ready,
CancellationToken cancellationToken = default,
Action<WorkerHeartbeat>? configureHeartbeat = null)
{
WorkerHeartbeat heartbeat = new()
{
WorkerProcessId = DefaultWorkerProcessId,
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
configureHeartbeat?.Invoke(heartbeat);
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerHeartbeat = heartbeat),
cancellationToken).ConfigureAwait(false);
}
public async Task SendShutdownAckAsync(
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
CancellationToken cancellationToken = default)
@@ -1,4 +1,5 @@
using System.IO.Pipes;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
@@ -151,6 +152,24 @@ public sealed class WorkerClientTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
await WaitUntilAsync(
() => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
}
[Fact]
public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient()
{
@@ -276,6 +295,21 @@ public sealed class WorkerClientTests
});
}
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 20,
envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat
{
WorkerProcessId = workerProcessId,
State = WorkerState.Ready,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
PendingCommandCount = 0,
OutboundEventQueueDepth = 0,
});
}
private static WorkerEnvelope CreateWorkerEnvelope(
string correlationId,
ulong sequence,
@@ -30,7 +30,7 @@ public sealed class WorkerApplicationTests
Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]);
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]);
Assert.Equal("[redacted]", entry.Fields["nonce"]);
Assert.Equal("WorkerPipeHandshakeSucceeded", logger.Entries[1].EventName);
Assert.Equal("WorkerPipeSessionCompleted", logger.Entries[1].EventName);
}
[Fact]
@@ -1,10 +1,15 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Ipc;
@@ -28,7 +33,9 @@ public sealed class WorkerPipeClientTests
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
WorkerPipeClient client = new(connectTimeoutMilliseconds: 5000);
WorkerPipeClient client = new(
connectTimeoutMilliseconds: 5000,
(stream, options) => CreateSession(stream, options));
Task clientTask = client.RunAsync(workerOptions);
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
@@ -56,6 +63,87 @@ public sealed class WorkerPipeClientTests
WorkerEnvelope ready = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
await writer.WriteAsync(new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 2,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
});
WorkerEnvelope shutdownAck = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase);
await clientTask;
}
private static WorkerPipeSession CreateSession(
Stream stream,
WorkerFrameProtocolOptions options)
{
return new WorkerPipeSession(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => 1234,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromSeconds(30),
HeartbeatGrace = TimeSpan.FromSeconds(30),
},
() => new FakeRuntimeSession());
}
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
public Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
public Task<MxCommandReply> DispatchAsync(StaCommand command)
{
return Task.FromResult(new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
});
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
return new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
}
public void RequestShutdown()
{
}
public void Dispose()
{
}
}
}
@@ -1,11 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Ipc;
@@ -147,6 +152,127 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code);
}
[Fact]
public async Task RunAsync_SendsHeartbeatPayloadFromRuntimeSnapshot()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new();
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 2,
outboundEventQueueDepth: 3,
lastEventSequence: 42,
currentCommandCorrelationId: "current-command"));
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromSeconds(5),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
await ThrowIfCompletedAsync(runTask);
WorkerEnvelope heartbeat = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
cancellation.Token);
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
Assert.Equal(1234, heartbeat.WorkerHeartbeat.WorkerProcessId);
Assert.Equal(2u, heartbeat.WorkerHeartbeat.PendingCommandCount);
Assert.Equal(3u, heartbeat.WorkerHeartbeat.OutboundEventQueueDepth);
Assert.Equal(42UL, heartbeat.WorkerHeartbeat.LastEventSequence);
Assert.Equal("current-command", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
[Fact]
public async Task RunAsync_WhenCommandIsExecuting_HeartbeatReportsCurrentCorrelation()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new()
{
BlockDispatch = true,
};
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromSeconds(5),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
await pipePair.GatewayWriter.WriteAsync(
CreateCommandEnvelope("command-1"),
cancellation.Token);
Assert.True(runtime.DispatchStarted.Wait(TimeSpan.FromSeconds(2)));
WorkerEnvelope heartbeat = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
envelope => envelope.WorkerHeartbeat.CurrentCommandCorrelationId == "command-1",
cancellation.Token);
Assert.Equal("command-1", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
runtime.ReleaseDispatch();
WorkerEnvelope reply = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerCommandReply,
cancellation.Token);
Assert.Equal("command-1", reply.CorrelationId);
Assert.Equal(ProtocolStatusCode.Ok, reply.WorkerCommandReply.Reply.ProtocolStatus.Code);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
[Fact]
public async Task RunAsync_WhenStaActivityIsStale_WritesWatchdogFault()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new();
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow - TimeSpan.FromSeconds(5),
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: "stuck-command"));
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromMilliseconds(50),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
WorkerEnvelope fault = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerFault,
cancellation.Token);
Assert.Equal(WorkerFaultCategory.StaHung, fault.WorkerFault.Category);
Assert.Equal("stuck-command", fault.WorkerFault.CommandMethod);
Assert.Contains("STA activity is stale", fault.WorkerFault.DiagnosticMessage);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
private static WorkerPipeSession CreateSession(
Stream inbound,
Stream outbound,
@@ -159,6 +285,21 @@ public sealed class WorkerPipeSessionTests
() => 1234);
}
private static WorkerPipeSession CreatePipeSession(
Stream stream,
FakeRuntimeSession runtime,
WorkerPipeSessionOptions sessionOptions)
{
WorkerFrameProtocolOptions options = CreateOptions();
return new WorkerPipeSession(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => 1234,
sessionOptions,
() => runtime);
}
private static WorkerFrameProtocolOptions CreateOptions()
{
return new WorkerFrameProtocolOptions(
@@ -185,6 +326,119 @@ public sealed class WorkerPipeSessionTests
};
}
private static WorkerEnvelope CreateCommandEnvelope(string correlationId)
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 2,
CorrelationId = correlationId,
WorkerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand
{
Message = "ping",
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
};
}
private static WorkerEnvelope CreateShutdownEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 3,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
};
}
private static async Task CompleteGatewayHandshakeAsync(
PipePair pipePair,
CancellationToken cancellationToken)
{
await pipePair.GatewayWriter
.WriteAsync(CreateGatewayHelloEnvelope(), cancellationToken)
.ConfigureAwait(false);
WorkerEnvelope hello = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
WorkerEnvelope ready = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
}
private static async Task SendShutdownAndWaitAsync(
PipePair pipePair,
Task runTask,
CancellationToken cancellationToken)
{
await pipePair.GatewayWriter
.WriteAsync(CreateShutdownEnvelope(), cancellationToken)
.ConfigureAwait(false);
WorkerEnvelope shutdownAck = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerShutdownAck,
cancellationToken);
Assert.Equal(ProtocolStatusCode.Ok, shutdownAck.WorkerShutdownAck.Status.Code);
Task completedTask = await Task
.WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(2), cancellationToken))
.ConfigureAwait(false);
Assert.Same(runTask, completedTask);
await runTask.ConfigureAwait(false);
}
private static async Task ThrowIfCompletedAsync(Task task)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
if (task.IsCompleted)
{
await task;
}
}
private static Task<WorkerEnvelope> ReadUntilAsync(
WorkerFrameReader reader,
WorkerEnvelope.BodyOneofCase expectedBody,
CancellationToken cancellationToken)
{
return ReadUntilAsync(
reader,
expectedBody,
_ => true,
cancellationToken);
}
private static async Task<WorkerEnvelope> ReadUntilAsync(
WorkerFrameReader reader,
WorkerEnvelope.BodyOneofCase expectedBody,
Func<WorkerEnvelope, bool> predicate,
CancellationToken cancellationToken)
{
while (true)
{
WorkerEnvelope envelope = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase == expectedBody && predicate(envelope))
{
return envelope;
}
}
}
private static WorkerEnvelope[] ReadWrittenFrames(
MemoryStream stream,
WorkerFrameProtocolOptions options)
@@ -219,4 +473,158 @@ public sealed class WorkerPipeSessionTests
buffer[2] = (byte)(value >> 16);
buffer[3] = (byte)(value >> 24);
}
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
private readonly ManualResetEventSlim releaseDispatch = new(false);
private readonly object gate = new();
private WorkerRuntimeHeartbeatSnapshot snapshot = new(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
public ManualResetEventSlim DispatchStarted { get; } = new(false);
public bool BlockDispatch { get; set; }
public Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
public Task<MxCommandReply> DispatchAsync(StaCommand command)
{
return Task.Run(
() =>
{
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
command.CorrelationId));
DispatchStarted.Set();
if (BlockDispatch)
{
releaseDispatch.Wait(TimeSpan.FromSeconds(5));
}
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty));
return new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
});
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
lock (gate)
{
return snapshot;
}
}
public void RequestShutdown()
{
releaseDispatch.Set();
}
public void ReleaseDispatch()
{
releaseDispatch.Set();
}
public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value)
{
lock (gate)
{
snapshot = value;
}
}
public void Dispose()
{
releaseDispatch.Set();
releaseDispatch.Dispose();
DispatchStarted.Dispose();
}
}
private sealed class PipePair : IDisposable
{
private readonly NamedPipeServerStream gatewayStream;
private PipePair(
NamedPipeServerStream gatewayStream,
NamedPipeClientStream workerStream)
{
this.gatewayStream = gatewayStream;
WorkerStream = workerStream;
WorkerFrameProtocolOptions options = CreateOptions();
GatewayReader = new WorkerFrameReader(gatewayStream, options);
GatewayWriter = new WorkerFrameWriter(gatewayStream, options);
}
public Stream WorkerStream { get; }
public WorkerFrameReader GatewayReader { get; }
public WorkerFrameWriter GatewayWriter { get; }
public static async Task<PipePair> CreateAsync(CancellationToken cancellationToken)
{
string pipeName = $"mxaccessgw-worker-session-tests-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = new(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync();
await Task
.Run(() => workerStream.Connect(5000), cancellationToken)
.ConfigureAwait(false);
await waitForConnectionTask.ConfigureAwait(false);
return new PipePair(gatewayStream, workerStream);
}
public void Dispose()
{
WorkerStream.Dispose();
gatewayStream.Dispose();
}
}
}
+14 -2
View File
@@ -1,4 +1,5 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
@@ -11,6 +12,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
@@ -18,6 +20,15 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
: this(
connectTimeoutMilliseconds,
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
{
}
public WorkerPipeClient(
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -26,6 +37,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
"Worker pipe connect timeout must be greater than zero.");
}
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
@@ -48,8 +60,8 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = new(pipe, frameOptions);
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
private Task ConnectAsync(
+329 -5
View File
@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Ipc;
@@ -13,10 +14,14 @@ public sealed class WorkerPipeSession
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Func<int> _processIdProvider;
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
private readonly WorkerPipeSessionOptions _sessionOptions;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private MxAccessStaSession? _mxAccessStaSession;
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
private WorkerState _state = WorkerState.Starting;
private bool _watchdogFaultSent;
public WorkerPipeSession(
Stream stream,
@@ -34,11 +39,49 @@ public sealed class WorkerPipeSession
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider)
: this(
reader,
writer,
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession())
{
}
public WorkerPipeSession(
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider,
WorkerPipeSessionOptions sessionOptions,
Func<IWorkerRuntimeSession> runtimeSessionFactory)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
_sessionOptions.Validate();
}
public async Task RunAsync(CancellationToken cancellationToken = default)
{
_runtimeSession = _runtimeSessionFactory();
try
{
await CompleteStartupHandshakeAsync(
token => _runtimeSession.StartAsync(_options.SessionId, _processIdProvider(), token),
cancellationToken).ConfigureAwait(false);
await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_runtimeSession?.Dispose();
_runtimeSession = null;
_state = WorkerState.Stopped;
}
}
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
@@ -76,11 +119,14 @@ public sealed class WorkerPipeSession
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.Handshaking;
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.InitializingSta;
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
_state = WorkerState.Ready;
}
catch (WorkerFrameProtocolException exception)
{
@@ -140,6 +186,174 @@ public sealed class WorkerPipeSession
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
}
private async Task RunMessageLoopAsync(CancellationToken cancellationToken)
{
using CancellationTokenSource heartbeatCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token);
try
{
while (!cancellationToken.IsCancellationRequested)
{
Task<WorkerEnvelope> readTask = _reader.ReadAsync(cancellationToken);
Task completedTask = await Task.WhenAny(readTask, heartbeatTask).ConfigureAwait(false);
if (completedTask == heartbeatTask)
{
await heartbeatTask.ConfigureAwait(false);
}
WorkerEnvelope envelope = await readTask.ConfigureAwait(false);
bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false);
if (!keepReading)
{
return;
}
}
}
finally
{
heartbeatCancellation.Cancel();
try
{
await heartbeatTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
}
private async Task<bool> DispatchGatewayEnvelopeAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommand:
_ = ProcessCommandAsync(envelope, cancellationToken);
return true;
case WorkerEnvelope.BodyOneofCase.WorkerShutdown:
await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false);
return false;
case WorkerEnvelope.BodyOneofCase.WorkerCancel:
return true;
default:
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
$"Worker received unexpected gateway envelope body {envelope.BodyCase}.");
}
}
private async Task ProcessCommandAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
IWorkerRuntimeSession runtimeSession = _runtimeSession
?? throw new InvalidOperationException("Worker runtime session has not been initialized.");
WorkerCommand workerCommand = envelope.WorkerCommand;
MxCommand command = workerCommand.Command;
StaCommand staCommand = new(
_options.SessionId,
envelope.CorrelationId,
command,
workerCommand.EnqueueTimestamp,
cancellationToken);
try
{
MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false);
await _writer
.WriteAsync(
CreateEnvelope(new WorkerCommandReply
{
Reply = reply,
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
_state = WorkerState.Faulted;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.MxaccessCommandFailed,
staCommand.MethodName,
exception),
cancellationToken).ConfigureAwait(false);
}
}
private async Task ShutdownAsync(
WorkerShutdown shutdown,
CancellationToken cancellationToken)
{
_state = WorkerState.ShuttingDown;
_runtimeSession?.RequestShutdown();
await _writer
.WriteAsync(
CreateEnvelope(
new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
? "Worker shutdown accepted."
: $"Worker shutdown accepted: {shutdown.Reason}",
},
}),
cancellationToken)
.ConfigureAwait(false);
}
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
continue;
}
WorkerRuntimeHeartbeatSnapshot snapshot = runtimeSession.CaptureHeartbeat();
await _writer
.WriteAsync(CreateEnvelope(CreateHeartbeat(snapshot)), cancellationToken)
.ConfigureAwait(false);
await ReportWatchdogFaultIfNeededAsync(snapshot, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReportWatchdogFaultIfNeededAsync(
WorkerRuntimeHeartbeatSnapshot snapshot,
CancellationToken cancellationToken)
{
TimeSpan staleFor = DateTimeOffset.UtcNow - snapshot.LastStaActivityUtc;
if (staleFor <= _sessionOptions.HeartbeatGrace)
{
_watchdogFaultSent = false;
return;
}
if (_watchdogFaultSent)
{
return;
}
_watchdogFaultSent = true;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.StaHung,
snapshot.CurrentCommandCorrelationId,
$"STA activity is stale by {staleFor}."),
cancellationToken).ConfigureAwait(false);
}
private async Task TryWriteFaultAsync(
WorkerFrameProtocolException exception,
CancellationToken cancellationToken)
@@ -178,6 +392,25 @@ public sealed class WorkerPipeSession
}
}
private async Task TryWriteFaultAsync(
WorkerFault fault,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(fault), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The runtime fault remains observable through worker exit or pipe closure.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
@@ -193,6 +426,21 @@ public sealed class WorkerPipeSession
return CreateBaseEnvelope(fault);
}
private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply)
{
return CreateBaseEnvelope(reply);
}
private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck)
{
return CreateBaseEnvelope(shutdownAck);
}
private WorkerEnvelope CreateEnvelope(WorkerHeartbeat heartbeat)
{
return CreateBaseEnvelope(heartbeat);
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
@@ -214,6 +462,28 @@ public sealed class WorkerPipeSession
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty;
envelope.WorkerCommandReply = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerShutdownAck = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHeartbeat body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerHeartbeat = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope()
{
return new WorkerEnvelope
@@ -231,21 +501,39 @@ public sealed class WorkerPipeSession
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_mxAccessStaSession = new MxAccessStaSession();
_runtimeSession = new MxAccessStaSession();
try
{
return await _mxAccessStaSession
return await _runtimeSession
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
{
_mxAccessStaSession.Dispose();
_mxAccessStaSession = null;
_runtimeSession.Dispose();
_runtimeSession = null;
throw;
}
}
private WorkerHeartbeat CreateHeartbeat(WorkerRuntimeHeartbeatSnapshot snapshot)
{
WorkerState state = string.IsNullOrWhiteSpace(snapshot.CurrentCommandCorrelationId)
? _state
: WorkerState.ExecutingCommand;
return new WorkerHeartbeat
{
WorkerProcessId = _processIdProvider(),
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(snapshot.LastStaActivityUtc),
PendingCommandCount = snapshot.PendingCommandCount,
OutboundEventQueueDepth = snapshot.OutboundEventQueueDepth,
LastEventSequence = snapshot.LastEventSequence,
CurrentCommandCorrelationId = snapshot.CurrentCommandCorrelationId,
};
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
@@ -295,6 +583,42 @@ public sealed class WorkerPipeSession
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
Exception exception)
{
WorkerFault fault = CreateFault(
category,
commandMethod,
exception.Message);
fault.ExceptionType = exception.GetType().FullName ?? string.Empty;
fault.ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
};
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
string diagnosticMessage)
{
return new WorkerFault
{
Category = category,
CommandMethod = commandMethod ?? string.Empty,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
};
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -0,0 +1,36 @@
using System;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerPipeSessionOptions
{
public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(5);
public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15);
public WorkerPipeSessionOptions()
{
HeartbeatInterval = DefaultHeartbeatInterval;
HeartbeatGrace = DefaultHeartbeatGrace;
}
public TimeSpan HeartbeatInterval { get; set; }
public TimeSpan HeartbeatGrace { get; set; }
public void Validate()
{
if (HeartbeatInterval <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(HeartbeatInterval),
"Worker heartbeat interval must be greater than zero.");
}
if (HeartbeatGrace <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(HeartbeatGrace),
"Worker heartbeat grace must be greater than zero.");
}
}
}
@@ -0,0 +1,21 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public interface IWorkerRuntimeSession : IDisposable
{
Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default);
Task<MxCommandReply> DispatchAsync(StaCommand command);
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
void RequestShutdown();
}
@@ -7,7 +7,7 @@ using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IDisposable
public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
@@ -97,6 +97,30 @@ public sealed class MxAccessStaSession : IDisposable
return commandDispatcher.DispatchAsync(command);
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
uint pendingCommandCount = 0;
string currentCommandCorrelationId = string.Empty;
if (commandDispatcher is not null)
{
pendingCommandCount = (uint)commandDispatcher.PendingCommandCount;
currentCommandCorrelationId = commandDispatcher.CurrentCommandCorrelationId;
}
return new WorkerRuntimeHeartbeatSnapshot(
staRuntime.LastActivityUtc,
pendingCommandCount,
(uint)eventQueue.Count,
eventQueue.LastEventSequence,
currentCommandCorrelationId);
}
public void RequestShutdown()
{
commandDispatcher?.RequestShutdown();
}
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
{
return eventQueue.Drain(maxEvents);
@@ -148,7 +172,7 @@ public sealed class MxAccessStaSession : IDisposable
return;
}
commandDispatcher?.RequestShutdown();
RequestShutdown();
if (session is not null)
{
@@ -0,0 +1,30 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class WorkerRuntimeHeartbeatSnapshot
{
public WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset lastStaActivityUtc,
uint pendingCommandCount,
uint outboundEventQueueDepth,
ulong lastEventSequence,
string currentCommandCorrelationId)
{
LastStaActivityUtc = lastStaActivityUtc;
PendingCommandCount = pendingCommandCount;
OutboundEventQueueDepth = outboundEventQueueDepth;
LastEventSequence = lastEventSequence;
CurrentCommandCorrelationId = currentCommandCorrelationId ?? string.Empty;
}
public DateTimeOffset LastStaActivityUtc { get; }
public uint PendingCommandCount { get; }
public uint OutboundEventQueueDepth { get; }
public ulong LastEventSequence { get; }
public string CurrentCommandCorrelationId { get; }
}
+1 -1
View File
@@ -84,7 +84,7 @@ public static class WorkerApplication
pipeClient.RunAsync(options).GetAwaiter().GetResult();
logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary<string, object?>
logger.Information("WorkerPipeSessionCompleted", new Dictionary<string, object?>
{
["session_id"] = options.SessionId,
["pipe_name"] = options.PipeName,