Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bd4be85f26 | |||
| 7331c6157a | |||
| cbc317e3e7 | |||
| 7242cf772b | |||
| 7d67313a7d | |||
| 044b16c5db | |||
| 1f92078777 | |||
| 4a3560c7ee | |||
| 108a3d3f8a | |||
| 95e71cd819 |
@@ -0,0 +1,14 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,48 @@
|
||||
using MxGateway.Client;
|
||||
|
||||
namespace MxGateway.Client.Cli;
|
||||
|
||||
public static class MxGatewayClientCli
|
||||
{
|
||||
public static int Run(
|
||||
string[] args,
|
||||
TextWriter standardOutput,
|
||||
TextWriter standardError)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(args);
|
||||
ArgumentNullException.ThrowIfNull(standardOutput);
|
||||
ArgumentNullException.ThrowIfNull(standardError);
|
||||
|
||||
if (args.Length is 0 || IsHelp(args[0]))
|
||||
{
|
||||
WriteUsage(standardOutput);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (string.Equals(args[0], "version", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
standardOutput.WriteLine(
|
||||
$"gateway-protocol={MxGatewayClientContractInfo.GatewayProtocolVersion}");
|
||||
standardOutput.WriteLine(
|
||||
$"worker-protocol={MxGatewayClientContractInfo.WorkerProtocolVersion}");
|
||||
return 0;
|
||||
}
|
||||
|
||||
standardError.WriteLine($"Unknown command '{args[0]}'.");
|
||||
WriteUsage(standardError);
|
||||
return 2;
|
||||
}
|
||||
|
||||
private static bool IsHelp(string value)
|
||||
{
|
||||
return string.Equals(value, "-h", StringComparison.OrdinalIgnoreCase)
|
||||
|| string.Equals(value, "--help", StringComparison.OrdinalIgnoreCase)
|
||||
|| string.Equals(value, "help", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
private static void WriteUsage(TextWriter writer)
|
||||
{
|
||||
writer.WriteLine("mxgw-dotnet version");
|
||||
writer.WriteLine("mxgw-dotnet --help");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
using MxGateway.Client.Cli;
|
||||
|
||||
return MxGatewayClientCli.Run(args, Console.Out, Console.Error);
|
||||
@@ -0,0 +1,26 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<IsPackable>false</IsPackable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.4" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
|
||||
<PackageReference Include="xunit" Version="2.9.3" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Using Include="Xunit" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MxGateway.Client\MxGateway.Client.csproj" />
|
||||
<ProjectReference Include="..\MxGateway.Client.Cli\MxGateway.Client.Cli.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,20 @@
|
||||
using MxGateway.Client.Cli;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayClientCliTests
|
||||
{
|
||||
[Fact]
|
||||
public void Run_Version_PrintsCompiledProtocolVersions()
|
||||
{
|
||||
using var output = new StringWriter();
|
||||
using var error = new StringWriter();
|
||||
|
||||
var exitCode = MxGatewayClientCli.Run(["version"], output, error);
|
||||
|
||||
Assert.Equal(0, exitCode);
|
||||
Assert.Contains("gateway-protocol=1", output.ToString());
|
||||
Assert.Contains("worker-protocol=1", output.ToString());
|
||||
Assert.Equal(string.Empty, error.ToString());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayClientContractInfoTests
|
||||
{
|
||||
[Fact]
|
||||
public void GatewayProtocolVersion_MatchesSharedContract()
|
||||
{
|
||||
Assert.Equal(
|
||||
GatewayContractInfo.GatewayProtocolVersion,
|
||||
MxGatewayClientContractInfo.GatewayProtocolVersion);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void WorkerProtocolVersion_MatchesSharedContract()
|
||||
{
|
||||
Assert.Equal(
|
||||
GatewayContractInfo.WorkerProtocolVersion,
|
||||
MxGatewayClientContractInfo.WorkerProtocolVersion);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayClientOptionsTests
|
||||
{
|
||||
[Fact]
|
||||
public void Validate_WithAbsoluteEndpointAndApiKey_Succeeds()
|
||||
{
|
||||
var options = new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = "test-api-key",
|
||||
};
|
||||
|
||||
options.Validate();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Validate_WithEmptyApiKey_Throws()
|
||||
{
|
||||
var options = new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = "",
|
||||
};
|
||||
|
||||
Assert.Throws<ArgumentException>(options.Validate);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayGeneratedContractTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task GeneratedGrpcClient_CanBeConstructedFromClientFactory()
|
||||
{
|
||||
var options = new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = "test-api-key",
|
||||
};
|
||||
|
||||
await using var client = MxGatewayClient.Create(options);
|
||||
|
||||
Assert.NotNull(client.RawClient);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 17
|
||||
VisualStudioVersion = 17.0.31903.59
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client", "MxGateway.Client\MxGateway.Client.csproj", "{7CF9ED88-1F32-4040-BEB1-D0902E304C70}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Contracts", "..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj", "{9AB807A8-0469-40F7-A000-D240F36B6E5D}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Cli", "MxGateway.Client.Cli\MxGateway.Client.Cli.csproj", "{EB061E77-2475-4322-9257-3F2456DD141C}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MxGateway.Client.Tests", "MxGateway.Client.Tests\MxGateway.Client.Tests.csproj", "{B77B5A8E-0C53-4419-9BCD-227C9753A074}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Debug|x64 = Debug|x64
|
||||
Debug|x86 = Debug|x86
|
||||
Release|Any CPU = Release|Any CPU
|
||||
Release|x64 = Release|x64
|
||||
Release|x86 = Release|x86
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x64.Build.0 = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Debug|x86.Build.0 = Debug|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.ActiveCfg = Release|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x64.Build.0 = Release|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{7CF9ED88-1F32-4040-BEB1-D0902E304C70}.Release|x86.Build.0 = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x64.Build.0 = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Debug|x86.Build.0 = Debug|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.ActiveCfg = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x64.Build.0 = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{9AB807A8-0469-40F7-A000-D240F36B6E5D}.Release|x86.Build.0 = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x64.Build.0 = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Debug|x86.Build.0 = Debug|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.ActiveCfg = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x64.Build.0 = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{EB061E77-2475-4322-9257-3F2456DD141C}.Release|x86.Build.0 = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x64.Build.0 = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Debug|x86.Build.0 = Debug|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.ActiveCfg = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x64.Build.0 = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{B77B5A8E-0C53-4419-9BCD-227C9753A074}.Release|x86.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
@@ -0,0 +1,18 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\..\src\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Grpc.Net.Client" Version="2.76.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,41 @@
|
||||
using Grpc.Net.Client;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Provides the initial .NET client entry point and raw generated gRPC client.
|
||||
/// </summary>
|
||||
public sealed class MxGatewayClient : IAsyncDisposable
|
||||
{
|
||||
private readonly GrpcChannel _channel;
|
||||
|
||||
private MxGatewayClient(GrpcChannel channel)
|
||||
{
|
||||
_channel = channel;
|
||||
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
|
||||
}
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
|
||||
|
||||
public static MxGatewayClient Create(MxGatewayClientOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
options.Validate();
|
||||
|
||||
var channel = GrpcChannel.ForAddress(
|
||||
options.Endpoint,
|
||||
new GrpcChannelOptions
|
||||
{
|
||||
LoggerFactory = options.LoggerFactory,
|
||||
});
|
||||
|
||||
return new MxGatewayClient(channel);
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_channel.Dispose();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Exposes the protocol versions compiled into this client package.
|
||||
/// </summary>
|
||||
public static class MxGatewayClientContractInfo
|
||||
{
|
||||
public const uint GatewayProtocolVersion =
|
||||
GatewayContractInfo.GatewayProtocolVersion;
|
||||
|
||||
public const uint WorkerProtocolVersion =
|
||||
GatewayContractInfo.WorkerProtocolVersion;
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Configures the gRPC channel used by the .NET MXAccess Gateway client.
|
||||
/// </summary>
|
||||
public sealed class MxGatewayClientOptions
|
||||
{
|
||||
public required Uri Endpoint { get; init; }
|
||||
|
||||
public required string ApiKey { get; init; }
|
||||
|
||||
public bool UseTls { get; init; }
|
||||
|
||||
public string? CaCertificatePath { get; init; }
|
||||
|
||||
public string? ServerNameOverride { get; init; }
|
||||
|
||||
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
|
||||
|
||||
public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30);
|
||||
|
||||
public ILoggerFactory? LoggerFactory { get; init; }
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(Endpoint);
|
||||
|
||||
if (!Endpoint.IsAbsoluteUri)
|
||||
{
|
||||
throw new ArgumentException(
|
||||
"The gateway endpoint must be an absolute URI.",
|
||||
nameof(Endpoint));
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(ApiKey))
|
||||
{
|
||||
throw new ArgumentException(
|
||||
"The gateway API key must not be empty.",
|
||||
nameof(ApiKey));
|
||||
}
|
||||
|
||||
if (ConnectTimeout <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(
|
||||
nameof(ConnectTimeout),
|
||||
"The connect timeout must be greater than zero.");
|
||||
}
|
||||
|
||||
if (DefaultCallTimeout <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(
|
||||
nameof(DefaultCallTimeout),
|
||||
"The default call timeout must be greater than zero.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
# .NET Client Projects
|
||||
|
||||
The .NET client workspace contains the MXAccess Gateway client library, test
|
||||
CLI, and unit tests.
|
||||
|
||||
## Projects
|
||||
|
||||
| Project | Purpose |
|
||||
|---------|---------|
|
||||
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
|
||||
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
|
||||
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
|
||||
|
||||
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
|
||||
the client compiles against the same generated protobuf and gRPC types as the
|
||||
gateway. `clients/dotnet/generated` remains reserved for generator output if a
|
||||
future client build switches to client-local `Grpc.Tools` generation.
|
||||
|
||||
## Build And Test
|
||||
|
||||
```powershell
|
||||
dotnet build clients/dotnet/MxGateway.Client.sln
|
||||
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
|
||||
```
|
||||
@@ -0,0 +1,36 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"cases": [
|
||||
{
|
||||
"id": "missing-api-key",
|
||||
"grpcStatusCode": "UNAUTHENTICATED",
|
||||
"clientErrorCategory": "AuthenticationError",
|
||||
"inputMetadata": {
|
||||
"authorization": ""
|
||||
},
|
||||
"expectedRedactedOutput": "authentication failed: missing bearer token",
|
||||
"retryableWithoutCredentialChange": false
|
||||
},
|
||||
{
|
||||
"id": "invalid-api-key",
|
||||
"grpcStatusCode": "UNAUTHENTICATED",
|
||||
"clientErrorCategory": "AuthenticationError",
|
||||
"inputMetadata": {
|
||||
"authorization": "Bearer <redacted>"
|
||||
},
|
||||
"expectedRedactedOutput": "authentication failed: invalid API key <redacted>",
|
||||
"retryableWithoutCredentialChange": false
|
||||
},
|
||||
{
|
||||
"id": "missing-write-scope",
|
||||
"grpcStatusCode": "PERMISSION_DENIED",
|
||||
"clientErrorCategory": "AuthorizationError",
|
||||
"inputMetadata": {
|
||||
"authorization": "Bearer <redacted>"
|
||||
},
|
||||
"requiredScope": "mxaccess.write",
|
||||
"expectedRedactedOutput": "authorization failed: missing scope mxaccess.write",
|
||||
"retryableWithoutCredentialChange": false
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"sessionId": "session-fixture",
|
||||
"correlationId": "gateway-correlation-register-1",
|
||||
"kind": "MX_COMMAND_KIND_REGISTER",
|
||||
"protocolStatus": {
|
||||
"code": "PROTOCOL_STATUS_CODE_OK",
|
||||
"message": "Register completed."
|
||||
},
|
||||
"hresult": 0,
|
||||
"returnValue": {
|
||||
"dataType": "MX_DATA_TYPE_INTEGER",
|
||||
"variantType": "VT_I4",
|
||||
"int32Value": 12
|
||||
},
|
||||
"statuses": [
|
||||
{
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "OK"
|
||||
}
|
||||
],
|
||||
"diagnosticMessage": "COM Register returned server handle 12.",
|
||||
"register": {
|
||||
"serverHandle": 12
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
{
|
||||
"sessionId": "session-fixture",
|
||||
"correlationId": "gateway-correlation-write-1",
|
||||
"kind": "MX_COMMAND_KIND_WRITE",
|
||||
"protocolStatus": {
|
||||
"code": "PROTOCOL_STATUS_CODE_MXACCESS_FAILURE",
|
||||
"message": "MXAccess rejected the write."
|
||||
},
|
||||
"hresult": -2147220992,
|
||||
"returnValue": {
|
||||
"dataType": "MX_DATA_TYPE_NO_DATA",
|
||||
"variantType": "VT_EMPTY",
|
||||
"isNull": true,
|
||||
"rawDiagnostic": "MXAccess returned no value for the failed write.",
|
||||
"rawDataType": 2
|
||||
},
|
||||
"statuses": [
|
||||
{
|
||||
"success": 0,
|
||||
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 321,
|
||||
"rawCategory": 8,
|
||||
"rawDetectedBy": 3,
|
||||
"diagnosticText": "Write denied by provider security."
|
||||
},
|
||||
{
|
||||
"success": 0,
|
||||
"category": "MX_STATUS_CATEGORY_OPERATIONAL_ERROR",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
|
||||
"detail": 902,
|
||||
"rawCategory": 7,
|
||||
"rawDetectedBy": 5,
|
||||
"diagnosticText": "Provider rejected the item state."
|
||||
}
|
||||
],
|
||||
"diagnosticMessage": "Fixture preserves a data-bearing MXAccess failure reply with HRESULT and status array."
|
||||
}
|
||||
@@ -0,0 +1,159 @@
|
||||
{
|
||||
"sessionId": "session-fixture",
|
||||
"description": "Ordered event stream sample for one worker-backed session.",
|
||||
"events": [
|
||||
{
|
||||
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
|
||||
"sessionId": "session-fixture",
|
||||
"serverHandle": 12,
|
||||
"itemHandle": 34,
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_INTEGER",
|
||||
"variantType": "VT_I4",
|
||||
"int32Value": 123
|
||||
},
|
||||
"quality": 192,
|
||||
"sourceTimestamp": "2026-01-01T00:00:00Z",
|
||||
"statuses": [
|
||||
{
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "OK"
|
||||
}
|
||||
],
|
||||
"workerSequence": "1",
|
||||
"workerTimestamp": "2026-01-01T00:00:00.010Z",
|
||||
"gatewayReceiveTimestamp": "2026-01-01T00:00:00.015Z",
|
||||
"onDataChange": {}
|
||||
},
|
||||
{
|
||||
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
|
||||
"sessionId": "session-fixture",
|
||||
"serverHandle": 12,
|
||||
"itemHandle": 34,
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_DOUBLE",
|
||||
"variantType": "VT_R8",
|
||||
"doubleValue": 45.5
|
||||
},
|
||||
"quality": 192,
|
||||
"sourceTimestamp": "2026-01-01T00:00:01Z",
|
||||
"statuses": [
|
||||
{
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "Write complete."
|
||||
}
|
||||
],
|
||||
"workerSequence": "2",
|
||||
"workerTimestamp": "2026-01-01T00:00:01.010Z",
|
||||
"gatewayReceiveTimestamp": "2026-01-01T00:00:01.015Z",
|
||||
"hresult": 0,
|
||||
"onWriteComplete": {}
|
||||
},
|
||||
{
|
||||
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
|
||||
"sessionId": "session-fixture",
|
||||
"serverHandle": 12,
|
||||
"itemHandle": 34,
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_STRING",
|
||||
"variantType": "VT_BSTR",
|
||||
"stringValue": "operation-complete"
|
||||
},
|
||||
"quality": 192,
|
||||
"sourceTimestamp": "2026-01-01T00:00:02Z",
|
||||
"statuses": [
|
||||
{
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "Operation complete."
|
||||
}
|
||||
],
|
||||
"workerSequence": "3",
|
||||
"workerTimestamp": "2026-01-01T00:00:02.010Z",
|
||||
"gatewayReceiveTimestamp": "2026-01-01T00:00:02.015Z",
|
||||
"operationComplete": {}
|
||||
},
|
||||
{
|
||||
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
|
||||
"sessionId": "session-fixture",
|
||||
"serverHandle": 12,
|
||||
"itemHandle": 34,
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_FLOAT",
|
||||
"arrayValue": {
|
||||
"elementDataType": "MX_DATA_TYPE_FLOAT",
|
||||
"variantType": "VT_ARRAY|VT_R4",
|
||||
"dimensions": [
|
||||
2
|
||||
],
|
||||
"floatValues": {
|
||||
"values": [
|
||||
1.5,
|
||||
2.5
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"quality": 192,
|
||||
"sourceTimestamp": "2026-01-01T00:00:03Z",
|
||||
"statuses": [
|
||||
{
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "Buffered data delivered."
|
||||
}
|
||||
],
|
||||
"workerSequence": "4",
|
||||
"workerTimestamp": "2026-01-01T00:00:03.010Z",
|
||||
"gatewayReceiveTimestamp": "2026-01-01T00:00:03.015Z",
|
||||
"onBufferedDataChange": {
|
||||
"dataType": "MX_DATA_TYPE_FLOAT",
|
||||
"qualityValues": {
|
||||
"elementDataType": "MX_DATA_TYPE_INTEGER",
|
||||
"variantType": "VT_ARRAY|VT_I4",
|
||||
"dimensions": [
|
||||
2
|
||||
],
|
||||
"int32Values": {
|
||||
"values": [
|
||||
192,
|
||||
192
|
||||
]
|
||||
}
|
||||
},
|
||||
"timestampValues": {
|
||||
"elementDataType": "MX_DATA_TYPE_TIME",
|
||||
"variantType": "VT_ARRAY|VT_DATE",
|
||||
"dimensions": [
|
||||
2
|
||||
],
|
||||
"timestampValues": {
|
||||
"values": [
|
||||
"2026-01-01T00:00:02Z",
|
||||
"2026-01-01T00:00:03Z"
|
||||
]
|
||||
}
|
||||
},
|
||||
"rawDataType": 5
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"fixtureSet": "mxaccess-gateway-client-behavior",
|
||||
"contractName": "mxaccess-gateway",
|
||||
"gatewayProtocolVersion": 1,
|
||||
"workerProtocolVersion": 1,
|
||||
"protoInputManifest": "clients/proto/proto-inputs.json",
|
||||
"fixtures": [
|
||||
{
|
||||
"id": "command-reply.register.ok",
|
||||
"category": "command_replies",
|
||||
"messageType": "mxaccess_gateway.v1.MxCommandReply",
|
||||
"path": "command-replies/register.ok.reply.json",
|
||||
"expectation": "Successful command replies preserve protocol status, HRESULT, return value, status arrays, and method-specific output."
|
||||
},
|
||||
{
|
||||
"id": "command-reply.write.mxaccess-failure",
|
||||
"category": "command_replies",
|
||||
"messageType": "mxaccess_gateway.v1.MxCommandReply",
|
||||
"path": "command-replies/write.mxaccess-failure.reply.json",
|
||||
"expectation": "MXAccess failures are data-bearing replies with HRESULT and status details, not transport failures."
|
||||
},
|
||||
{
|
||||
"id": "event-stream.session-ordered",
|
||||
"category": "event_streams",
|
||||
"messageType": "mxaccess_gateway.v1.MxEvent",
|
||||
"path": "event-streams/session-event-stream.json",
|
||||
"expectation": "Clients preserve per-session event order and event family bodies exactly as emitted."
|
||||
},
|
||||
{
|
||||
"id": "values.conversion-cases",
|
||||
"category": "value_conversion",
|
||||
"messageType": "mxaccess_gateway.v1.MxValue",
|
||||
"path": "values/value-conversion-cases.json",
|
||||
"expectation": "Clients expose typed projections and keep raw fallback metadata when conversion is incomplete."
|
||||
},
|
||||
{
|
||||
"id": "statuses.conversion-cases",
|
||||
"category": "status_conversion",
|
||||
"messageType": "mxaccess_gateway.v1.MxStatusProxy",
|
||||
"path": "statuses/status-conversion-cases.json",
|
||||
"expectation": "Clients preserve every MXSTATUS_PROXY field, including raw category/source values."
|
||||
},
|
||||
{
|
||||
"id": "auth.error-cases",
|
||||
"category": "auth_errors",
|
||||
"messageType": "client_behavior.v1.AuthErrorCase",
|
||||
"path": "auth/auth-error-cases.json",
|
||||
"expectation": "Clients map authentication and authorization failures distinctly and redact credentials."
|
||||
},
|
||||
{
|
||||
"id": "timeout-cancel.expected-behavior",
|
||||
"category": "timeout_cancel",
|
||||
"messageType": "client_behavior.v1.TimeoutCancelCase",
|
||||
"path": "timeout-cancel/timeout-cancel-cases.json",
|
||||
"expectation": "Client cancellation stops waiting locally but does not imply an in-flight MXAccess COM call was aborted."
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"cases": [
|
||||
{
|
||||
"id": "ok.responding-lmx",
|
||||
"status": {
|
||||
"success": 1,
|
||||
"category": "MX_STATUS_CATEGORY_OK",
|
||||
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
|
||||
"detail": 0,
|
||||
"rawCategory": 0,
|
||||
"rawDetectedBy": 0,
|
||||
"diagnosticText": "OK"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "security-error.requesting-lmx",
|
||||
"status": {
|
||||
"success": 0,
|
||||
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
|
||||
"detectedBy": "MX_STATUS_SOURCE_REQUESTING_LMX",
|
||||
"detail": 401,
|
||||
"rawCategory": 8,
|
||||
"rawDetectedBy": 2,
|
||||
"diagnosticText": "Requesting LMX denied the secured operation."
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "raw-unknown-category",
|
||||
"status": {
|
||||
"success": 0,
|
||||
"category": "MX_STATUS_CATEGORY_UNKNOWN",
|
||||
"detectedBy": "MX_STATUS_SOURCE_UNKNOWN",
|
||||
"detail": 65535,
|
||||
"rawCategory": 99,
|
||||
"rawDetectedBy": 77,
|
||||
"diagnosticText": "Unknown native MXSTATUS_PROXY fields are preserved."
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"cases": [
|
||||
{
|
||||
"id": "unary-deadline-exceeded",
|
||||
"operation": "Invoke",
|
||||
"clientDeadline": "2s",
|
||||
"grpcStatusCode": "DEADLINE_EXCEEDED",
|
||||
"clientErrorCategory": "TimeoutError",
|
||||
"gatewayWaitBehavior": "stops_waiting_for_reply",
|
||||
"workerCommandBehavior": "continues_until_worker_reply_or_worker_fault",
|
||||
"sessionExpectation": "session_state_is_unknown_until_follow_up_status_or_close",
|
||||
"expectedClientAction": "issue GetSessionState or CloseSession before reusing handles"
|
||||
},
|
||||
{
|
||||
"id": "stream-cancel",
|
||||
"operation": "StreamEvents",
|
||||
"clientDeadline": "5s",
|
||||
"grpcStatusCode": "CANCELLED",
|
||||
"clientErrorCategory": "CancelledError",
|
||||
"gatewayWaitBehavior": "stops_streaming_to_that_call",
|
||||
"workerCommandBehavior": "does_not_cancel_worker_session",
|
||||
"sessionExpectation": "session_remains_ready_if_worker_stays_healthy",
|
||||
"expectedClientAction": "open a new StreamEvents call with the last observed worker sequence"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"cases": [
|
||||
{
|
||||
"id": "bool.true",
|
||||
"expectedKind": "boolValue",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_BOOLEAN",
|
||||
"variantType": "VT_BOOL",
|
||||
"boolValue": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "int64.large",
|
||||
"expectedKind": "int64Value",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_INTEGER",
|
||||
"variantType": "VT_I8",
|
||||
"int64Value": "9223372036854770000"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "timestamp.utc",
|
||||
"expectedKind": "timestampValue",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_TIME",
|
||||
"variantType": "VT_DATE",
|
||||
"timestampValue": "2026-01-01T00:00:04Z"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "string-array",
|
||||
"expectedKind": "arrayValue",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_STRING",
|
||||
"arrayValue": {
|
||||
"elementDataType": "MX_DATA_TYPE_STRING",
|
||||
"variantType": "VT_ARRAY|VT_BSTR",
|
||||
"dimensions": [
|
||||
2
|
||||
],
|
||||
"stringValues": {
|
||||
"values": [
|
||||
"alpha",
|
||||
"beta"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "raw-fallback.variant",
|
||||
"expectedKind": "rawValue",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_UNKNOWN",
|
||||
"variantType": "VT_RECORD",
|
||||
"rawDiagnostic": "No lossless typed projection exists for this VARIANT.",
|
||||
"rawDataType": 32767,
|
||||
"rawValue": "AQIDBAU="
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "raw-array-fallback",
|
||||
"expectedKind": "arrayValue",
|
||||
"value": {
|
||||
"dataType": "MX_DATA_TYPE_UNKNOWN",
|
||||
"arrayValue": {
|
||||
"elementDataType": "MX_DATA_TYPE_UNKNOWN",
|
||||
"variantType": "VT_ARRAY|VT_VARIANT",
|
||||
"dimensions": [
|
||||
2
|
||||
],
|
||||
"rawDiagnostic": "Array elements contain mixed VARIANT types.",
|
||||
"rawElementDataType": 32767,
|
||||
"rawValues": {
|
||||
"values": [
|
||||
"AAE=",
|
||||
"AgM="
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
],
|
||||
"descriptorSet": "clients/proto/descriptors/mxaccessgw-client-v1.protoset",
|
||||
"fixtureRoot": "clients/proto/fixtures/golden",
|
||||
"behaviorFixtureRoot": "clients/proto/fixtures/behavior",
|
||||
"generatedOutputs": {
|
||||
"dotnet": "clients/dotnet/generated",
|
||||
"go": "clients/go/internal/generated",
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
# Client Behavior Fixtures
|
||||
|
||||
Client behavior fixtures define the shared expectations used by the official
|
||||
.NET, Go, Rust, Python, and Java clients. They keep wrapper behavior aligned
|
||||
while each language exposes idiomatic APIs over the same protobuf contract.
|
||||
|
||||
## Fixture Set
|
||||
|
||||
The fixture manifest is `clients/proto/fixtures/behavior/manifest.json`.
|
||||
`clients/proto/proto-inputs.json` references the fixture root through
|
||||
`behaviorFixtureRoot` so generators and client test projects can discover the
|
||||
same files they use for descriptor inputs.
|
||||
|
||||
The fixture set contains:
|
||||
|
||||
- command reply protobuf JSON,
|
||||
- ordered event stream protobuf JSON samples,
|
||||
- `MxValue` conversion case sets,
|
||||
- `MxStatusProxy` conversion case sets,
|
||||
- authentication and authorization error expectations,
|
||||
- timeout and cancellation behavior expectations.
|
||||
|
||||
Protobuf message fixtures use protobuf JSON field names and enum values. Files
|
||||
that describe client wrapper behavior use explicit JSON fields instead of a
|
||||
proto message because those expectations apply above the generated transport
|
||||
types.
|
||||
|
||||
## Command Replies
|
||||
|
||||
Command reply fixtures live in
|
||||
`clients/proto/fixtures/behavior/command-replies/`. They parse as
|
||||
`mxaccess_gateway.v1.MxCommandReply`.
|
||||
|
||||
Clients use these fixtures to verify that successful and failed MXAccess
|
||||
commands both carry the full reply details:
|
||||
|
||||
- `protocolStatus`,
|
||||
- `hresult`,
|
||||
- `returnValue`,
|
||||
- repeated `statuses`,
|
||||
- method-specific reply payloads when MXAccess returns out parameters.
|
||||
|
||||
MXAccess failures remain command replies when the gateway reached the worker and
|
||||
the worker captured HRESULT or `MXSTATUS_PROXY` details. Client wrappers should
|
||||
map those replies to rich command errors without discarding the raw reply.
|
||||
|
||||
## Event Streams
|
||||
|
||||
Event stream fixtures live in
|
||||
`clients/proto/fixtures/behavior/event-streams/`. Each file contains an ordered
|
||||
`events` array whose entries parse as `mxaccess_gateway.v1.MxEvent`.
|
||||
|
||||
Clients use these fixtures to verify that stream helpers preserve
|
||||
`workerSequence` order and expose each native event family:
|
||||
|
||||
- `OnDataChange`,
|
||||
- `OnWriteComplete`,
|
||||
- `OperationComplete`,
|
||||
- `OnBufferedDataChange`.
|
||||
|
||||
Wrappers must not reorder, coalesce, or drop events while reading the fixture.
|
||||
|
||||
## Value And Status Conversion
|
||||
|
||||
Value fixtures live in `clients/proto/fixtures/behavior/values/`. Each case
|
||||
contains a `value` object that parses as `mxaccess_gateway.v1.MxValue`.
|
||||
|
||||
Status fixtures live in `clients/proto/fixtures/behavior/statuses/`. Each case
|
||||
contains a `status` object that parses as
|
||||
`mxaccess_gateway.v1.MxStatusProxy`.
|
||||
|
||||
Clients use these fixtures to verify typed projections and raw fallback
|
||||
behavior. A language helper may expose native booleans, integers, strings,
|
||||
arrays, and timestamps, but it must keep `rawDiagnostic`, raw data type fields,
|
||||
and raw byte payloads accessible when conversion is incomplete.
|
||||
|
||||
## Auth, Timeout, And Cancel Behavior
|
||||
|
||||
Authentication fixtures live in `clients/proto/fixtures/behavior/auth/`. They
|
||||
separate `UNAUTHENTICATED` from `PERMISSION_DENIED` so clients map missing or
|
||||
invalid credentials differently from missing scopes. Expected output strings
|
||||
contain only redacted credentials.
|
||||
|
||||
Timeout and cancellation fixtures live in
|
||||
`clients/proto/fixtures/behavior/timeout-cancel/`. They document that canceling
|
||||
or timing out a client call stops the client from waiting, but it does not abort
|
||||
an in-flight MXAccess COM call on the worker STA. Clients should follow up with
|
||||
`GetSessionState` or `CloseSession` before reusing handles after an uncertain
|
||||
command timeout.
|
||||
|
||||
## Validation
|
||||
|
||||
Run the fixture validation tests after changing the behavior fixture set:
|
||||
|
||||
```bash
|
||||
powershell -ExecutionPolicy Bypass -File scripts/validate-client-behavior-fixtures.ps1
|
||||
```
|
||||
|
||||
The script runs the focused C# contract tests that parse all protobuf JSON
|
||||
fixtures and validate deterministic wrapper expectation files.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Client Proto Generation](./client-proto-generation.md)
|
||||
- [Client Libraries Detailed Design](./client-libraries-design.md)
|
||||
- [Protobuf Contracts](./Contracts.md)
|
||||
@@ -18,6 +18,7 @@ starting `MxGateway.Worker.exe` or loading MXAccess COM. The harness scripts:
|
||||
- `WorkerHello` and `WorkerReady` startup,
|
||||
- command replies with matching correlation ids,
|
||||
- ordered `WorkerEvent` frames,
|
||||
- `WorkerHeartbeat` frames,
|
||||
- `WorkerFault` frames,
|
||||
- shutdown acknowledgements,
|
||||
- malformed protobuf payloads and oversized frame headers,
|
||||
@@ -43,6 +44,8 @@ event streaming behavior:
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~GatewayEndToEndFakeWorkerSmokeTests
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~WorkerClientTests
|
||||
dotnet test src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter FullyQualifiedName~WorkerPipeSessionTests
|
||||
```
|
||||
|
||||
Run the gateway test project after shared gateway test infrastructure changes:
|
||||
|
||||
@@ -29,6 +29,7 @@ Language-specific plans:
|
||||
Shared generation inputs:
|
||||
|
||||
- `docs/client-proto-generation.md`
|
||||
- `docs/ClientBehaviorFixtures.md`
|
||||
- `clients/proto/proto-inputs.json`
|
||||
|
||||
Language style guides:
|
||||
@@ -310,6 +311,11 @@ CLI output should support JSON for automated tests.
|
||||
Unit tests must run without a live gateway. Use fake gRPC services, mock
|
||||
transports, or generated test servers depending on language.
|
||||
|
||||
Shared behavior fixtures live in `clients/proto/fixtures/behavior`. Every
|
||||
client should include tests that load the fixture manifest and verify wrapper
|
||||
behavior against the common command reply, event stream, value conversion,
|
||||
status conversion, auth error, and timeout/cancel cases.
|
||||
|
||||
Required unit test areas:
|
||||
|
||||
- options parsing,
|
||||
|
||||
@@ -16,6 +16,7 @@ records:
|
||||
- the public and worker source files,
|
||||
- the descriptor set path,
|
||||
- golden fixture locations,
|
||||
- behavior fixture locations,
|
||||
- generated-code output directories for each planned client.
|
||||
|
||||
The source files listed by the manifest are:
|
||||
@@ -125,9 +126,30 @@ The fixtures use protobuf JSON field names and enum values. Contract tests parse
|
||||
them with the generated C# types so schema drift is caught before client
|
||||
generation work starts.
|
||||
|
||||
## Behavior Fixtures
|
||||
|
||||
Cross-language behavior fixtures live in
|
||||
`clients/proto/fixtures/behavior`. The manifest
|
||||
`clients/proto/fixtures/behavior/manifest.json` lists command replies, ordered
|
||||
event stream samples, value conversion cases, status conversion cases, auth
|
||||
error expectations, and timeout/cancel expectations.
|
||||
|
||||
The behavior fixtures let each generated client wrapper test the same
|
||||
expectations without a live gateway. Protobuf message fixtures parse with the
|
||||
generated types. Auth and timeout/cancel files describe wrapper behavior above
|
||||
the generated transport layer, including credential redaction and the rule that
|
||||
client cancellation does not abort an in-flight MXAccess COM call.
|
||||
|
||||
Run the focused validation script after changing these fixtures:
|
||||
|
||||
```powershell
|
||||
scripts/validate-client-behavior-fixtures.ps1
|
||||
```
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Protobuf Contracts](./Contracts.md)
|
||||
- [Client Libraries Detailed Design](./client-libraries-design.md)
|
||||
- [Client Behavior Fixtures](./ClientBehaviorFixtures.md)
|
||||
- [Client Libraries Implementation Plan](./implementation-plan-clients.md)
|
||||
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
|
||||
|
||||
@@ -16,6 +16,7 @@ Recommended layout:
|
||||
|
||||
```text
|
||||
clients/dotnet/
|
||||
MxGateway.Client.sln
|
||||
MxGateway.Client/
|
||||
MxGateway.Client.csproj
|
||||
GatewayClient.cs
|
||||
@@ -41,6 +42,12 @@ Target framework:
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
```
|
||||
|
||||
The scaffold uses a project reference to
|
||||
`src/MxGateway.Contracts/MxGateway.Contracts.csproj` for generated protobuf and
|
||||
gRPC types. `clients/dotnet/generated` remains reserved for client-local
|
||||
generator output if the .NET client later needs to decouple from the contracts
|
||||
project.
|
||||
|
||||
Expected packages:
|
||||
|
||||
- `Grpc.Net.Client`
|
||||
|
||||
@@ -618,13 +618,19 @@ Do not drop or coalesce events in v1.
|
||||
|
||||
## Heartbeat And Watchdog
|
||||
|
||||
The worker heartbeat should prove that:
|
||||
`WorkerPipeSession` starts the heartbeat loop after the gateway validates
|
||||
`WorkerHello` and receives `WorkerReady`. Heartbeats continue until
|
||||
`WorkerShutdown`, cancellation, or a pipe/protocol failure stops the session.
|
||||
The loop uses `WorkerPipeSessionOptions.HeartbeatInterval`; the default matches
|
||||
the gateway worker heartbeat interval.
|
||||
|
||||
The worker heartbeat proves that:
|
||||
|
||||
- pipe writer is alive,
|
||||
- worker host is alive,
|
||||
- STA has recently pumped or completed work.
|
||||
|
||||
Heartbeat payload should include:
|
||||
Heartbeat payload includes:
|
||||
|
||||
- worker process id,
|
||||
- session id,
|
||||
@@ -635,13 +641,19 @@ Heartbeat payload should include:
|
||||
- event sequence,
|
||||
- current command correlation id if any.
|
||||
|
||||
The STA watchdog should warn when:
|
||||
`MxAccessStaSession.CaptureHeartbeat()` reads `StaRuntime.LastActivityUtc` and
|
||||
`StaCommandDispatcher` queue state without touching the raw MXAccess COM object
|
||||
outside the STA. Event queue depth and event sequence are reported as zero until
|
||||
the event queue implementation owns those counters.
|
||||
|
||||
- one command exceeds its expected duration,
|
||||
- the STA has not pumped messages within the heartbeat grace period,
|
||||
- event queue depth remains high.
|
||||
|
||||
The worker can report the problem, but the gateway owns the final kill decision.
|
||||
The STA watchdog currently emits a `WorkerFault` with
|
||||
`WorkerFaultCategory.StaHung` when `LastStaActivityUtc` is older than
|
||||
`WorkerPipeSessionOptions.HeartbeatGrace`. The fault includes the current
|
||||
command correlation id when a command is active. Command duration and high event
|
||||
queue depth remain observable through heartbeat fields until dedicated
|
||||
thresholds own those warnings. The worker reports stale STA activity, but the
|
||||
gateway owns the final kill decision through its existing heartbeat and worker
|
||||
lifecycle policy.
|
||||
|
||||
## Shutdown
|
||||
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
[CmdletBinding()]
|
||||
param(
|
||||
[switch]$NoBuild
|
||||
)
|
||||
|
||||
Set-StrictMode -Version Latest
|
||||
$ErrorActionPreference = "Stop"
|
||||
|
||||
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
|
||||
$testProject = Join-Path $repoRoot "src/MxGateway.Tests/MxGateway.Tests.csproj"
|
||||
$arguments = @(
|
||||
"test",
|
||||
$testProject,
|
||||
"--filter",
|
||||
"ClientBehaviorFixtureTests"
|
||||
)
|
||||
|
||||
if ($NoBuild) {
|
||||
$arguments += "--no-build"
|
||||
}
|
||||
|
||||
& dotnet @arguments
|
||||
|
||||
if ($LASTEXITCODE -ne 0) {
|
||||
throw "Client behavior fixture validation failed with exit code $LASTEXITCODE."
|
||||
}
|
||||
@@ -0,0 +1,379 @@
|
||||
using System.Text.Json;
|
||||
using Google.Protobuf;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Tests.Contracts;
|
||||
|
||||
public sealed class ClientBehaviorFixtureTests
|
||||
{
|
||||
private static readonly JsonParser ProtobufJsonParser = new(JsonParser.Settings.Default);
|
||||
|
||||
[Fact]
|
||||
public void BehaviorManifest_DeclaresCurrentProtocolVersionsAndExistingFixtures()
|
||||
{
|
||||
using JsonDocument manifest = LoadBehaviorManifest();
|
||||
JsonElement root = manifest.RootElement;
|
||||
|
||||
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
|
||||
Assert.Equal("mxaccess-gateway-client-behavior", root.GetProperty("fixtureSet").GetString());
|
||||
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
|
||||
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
|
||||
|
||||
HashSet<string> fixtureIds = new(StringComparer.Ordinal);
|
||||
foreach (JsonElement fixture in root.GetProperty("fixtures").EnumerateArray())
|
||||
{
|
||||
string id = fixture.GetProperty("id").GetString()!;
|
||||
string path = fixture.GetProperty("path").GetString()!;
|
||||
string category = fixture.GetProperty("category").GetString()!;
|
||||
string messageType = fixture.GetProperty("messageType").GetString()!;
|
||||
|
||||
Assert.True(fixtureIds.Add(id), $"Duplicate behavior fixture id '{id}'.");
|
||||
Assert.Contains(category, KnownCategories);
|
||||
Assert.Contains(messageType, KnownMessageTypes);
|
||||
Assert.True(
|
||||
File.Exists(Path.Combine(GetBehaviorFixtureRoot().FullName, path)),
|
||||
$"Expected behavior fixture '{path}' to exist.");
|
||||
Assert.False(Path.IsPathRooted(path), $"Fixture path '{path}' must be relative.");
|
||||
Assert.NotEmpty(fixture.GetProperty("expectation").GetString()!);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProtoInputManifest_ReferencesBehaviorFixtureRoot()
|
||||
{
|
||||
DirectoryInfo repositoryRoot = FindRepositoryRoot();
|
||||
string manifestPath = Path.Combine(repositoryRoot.FullName, "clients", "proto", "proto-inputs.json");
|
||||
|
||||
using JsonDocument manifest = JsonDocument.Parse(File.ReadAllText(manifestPath));
|
||||
string fixtureRoot = manifest.RootElement.GetProperty("behaviorFixtureRoot").GetString()!;
|
||||
|
||||
Assert.Equal("clients/proto/fixtures/behavior", fixtureRoot);
|
||||
Assert.True(Directory.Exists(Path.Combine(repositoryRoot.FullName, fixtureRoot)));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CommandReplyFixtures_ParseWithCurrentContractAndPreserveMxAccessDetails()
|
||||
{
|
||||
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("command_replies");
|
||||
Assert.NotEmpty(fixtures);
|
||||
|
||||
foreach (JsonElement fixture in fixtures)
|
||||
{
|
||||
MxCommandReply reply = ParseFixture<MxCommandReply>(
|
||||
fixture,
|
||||
MxCommandReply.Parser);
|
||||
|
||||
Assert.NotEqual(MxCommandKind.Unspecified, reply.Kind);
|
||||
Assert.NotEqual(ProtocolStatusCode.Unspecified, reply.ProtocolStatus.Code);
|
||||
Assert.True(reply.HasHresult, $"Fixture '{GetFixtureId(fixture)}' must carry an HRESULT.");
|
||||
Assert.NotEmpty(reply.Statuses);
|
||||
Assert.NotEqual(MxDataType.Unspecified, reply.ReturnValue.DataType);
|
||||
Assert.True(
|
||||
reply.ReturnValue.KindCase != MxValue.KindOneofCase.None || reply.ReturnValue.IsNull,
|
||||
$"Fixture '{GetFixtureId(fixture)}' must carry a typed value, raw value, or explicit null.");
|
||||
}
|
||||
|
||||
MxCommandReply failedWrite = ParseFixture<MxCommandReply>(
|
||||
Assert.Single(fixtures, fixture => GetFixtureId(fixture) == "command-reply.write.mxaccess-failure"),
|
||||
MxCommandReply.Parser);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.MxaccessFailure, failedWrite.ProtocolStatus.Code);
|
||||
Assert.Equal(-2147220992, failedWrite.Hresult);
|
||||
Assert.True(failedWrite.Statuses.Count > 1);
|
||||
Assert.All(failedWrite.Statuses, status => Assert.Equal(0, status.Success));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void EventStreamFixtures_ParseWithMonotonicSequencesAndExpectedFamilies()
|
||||
{
|
||||
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("event_streams");
|
||||
Assert.NotEmpty(fixtures);
|
||||
|
||||
foreach (JsonElement fixture in fixtures)
|
||||
{
|
||||
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
|
||||
ulong previousSequence = 0;
|
||||
List<MxEventFamily> families = [];
|
||||
|
||||
foreach (JsonElement eventElement in document.RootElement.GetProperty("events").EnumerateArray())
|
||||
{
|
||||
MxEvent gatewayEvent = ProtobufJsonParser.Parse<MxEvent>(eventElement.GetRawText());
|
||||
|
||||
Assert.True(gatewayEvent.WorkerSequence > previousSequence);
|
||||
Assert.Equal(document.RootElement.GetProperty("sessionId").GetString(), gatewayEvent.SessionId);
|
||||
Assert.NotEmpty(gatewayEvent.Statuses);
|
||||
AssertEventBodyMatchesFamily(gatewayEvent);
|
||||
|
||||
previousSequence = gatewayEvent.WorkerSequence;
|
||||
families.Add(gatewayEvent.Family);
|
||||
}
|
||||
|
||||
Assert.Contains(MxEventFamily.OnDataChange, families);
|
||||
Assert.Contains(MxEventFamily.OnWriteComplete, families);
|
||||
Assert.Contains(MxEventFamily.OperationComplete, families);
|
||||
Assert.Contains(MxEventFamily.OnBufferedDataChange, families);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ValueConversionFixtures_ParseTypedValuesAndRawFallbacks()
|
||||
{
|
||||
JsonElement cases = LoadCaseSet("value_conversion", "cases");
|
||||
bool sawRawFallback = false;
|
||||
bool sawRawArrayFallback = false;
|
||||
bool sawTypedArray = false;
|
||||
|
||||
foreach (JsonElement valueCase in cases.EnumerateArray())
|
||||
{
|
||||
MxValue value = ProtobufJsonParser.Parse<MxValue>(
|
||||
valueCase.GetProperty("value").GetRawText());
|
||||
string expectedKind = valueCase.GetProperty("expectedKind").GetString()!;
|
||||
|
||||
Assert.NotEqual(MxDataType.Unspecified, value.DataType);
|
||||
AssertJsonKindMatchesValueKind(expectedKind, value);
|
||||
|
||||
sawRawFallback |= value.KindCase == MxValue.KindOneofCase.RawValue
|
||||
&& !string.IsNullOrWhiteSpace(value.RawDiagnostic)
|
||||
&& value.RawDataType != 0;
|
||||
sawRawArrayFallback |= value.KindCase == MxValue.KindOneofCase.ArrayValue
|
||||
&& value.ArrayValue.ValuesCase == MxArray.ValuesOneofCase.RawValues
|
||||
&& !string.IsNullOrWhiteSpace(value.ArrayValue.RawDiagnostic)
|
||||
&& value.ArrayValue.RawElementDataType != 0;
|
||||
sawTypedArray |= value.KindCase == MxValue.KindOneofCase.ArrayValue
|
||||
&& value.ArrayValue.ValuesCase != MxArray.ValuesOneofCase.RawValues;
|
||||
}
|
||||
|
||||
Assert.True(sawRawFallback, "Expected at least one raw scalar fallback case.");
|
||||
Assert.True(sawRawArrayFallback, "Expected at least one raw array fallback case.");
|
||||
Assert.True(sawTypedArray, "Expected at least one typed array case.");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StatusConversionFixtures_ParseStatusArraysAndRawFields()
|
||||
{
|
||||
JsonElement cases = LoadCaseSet("status_conversion", "cases");
|
||||
bool sawRawUnknown = false;
|
||||
|
||||
foreach (JsonElement statusCase in cases.EnumerateArray())
|
||||
{
|
||||
MxStatusProxy status = ProtobufJsonParser.Parse<MxStatusProxy>(
|
||||
statusCase.GetProperty("status").GetRawText());
|
||||
|
||||
Assert.NotEqual(MxStatusCategory.Unspecified, status.Category);
|
||||
Assert.NotEqual(MxStatusSource.Unspecified, status.DetectedBy);
|
||||
Assert.NotEmpty(status.DiagnosticText);
|
||||
|
||||
sawRawUnknown |= status.Category == MxStatusCategory.Unknown
|
||||
&& status.RawCategory != 0
|
||||
&& status.RawDetectedBy != 0;
|
||||
}
|
||||
|
||||
Assert.True(sawRawUnknown, "Expected a status case with unknown raw native fields.");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AuthErrorFixtures_MapAuthenticationAuthorizationAndRedactCredentials()
|
||||
{
|
||||
JsonElement cases = LoadCaseSet("auth_errors", "cases");
|
||||
HashSet<string> statusCodes = new(StringComparer.Ordinal);
|
||||
|
||||
foreach (JsonElement authCase in cases.EnumerateArray())
|
||||
{
|
||||
string grpcStatusCode = authCase.GetProperty("grpcStatusCode").GetString()!;
|
||||
string category = authCase.GetProperty("clientErrorCategory").GetString()!;
|
||||
string redactedOutput = authCase.GetProperty("expectedRedactedOutput").GetString()!;
|
||||
string serialized = authCase.GetRawText();
|
||||
|
||||
Assert.Contains(grpcStatusCode, AuthGrpcStatusCodes);
|
||||
Assert.Contains(category, AuthClientErrorCategories);
|
||||
string authorization = authCase.GetProperty("inputMetadata").GetProperty("authorization").GetString()!;
|
||||
if (!string.IsNullOrEmpty(authorization))
|
||||
{
|
||||
Assert.Contains("<redacted>", serialized);
|
||||
}
|
||||
|
||||
Assert.DoesNotContain("mxgw_", serialized, StringComparison.Ordinal);
|
||||
Assert.DoesNotContain("secret", redactedOutput, StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
statusCodes.Add(grpcStatusCode);
|
||||
}
|
||||
|
||||
Assert.Contains("UNAUTHENTICATED", statusCodes);
|
||||
Assert.Contains("PERMISSION_DENIED", statusCodes);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TimeoutCancelFixtures_DocumentClientWaitAndWorkerCommandBehavior()
|
||||
{
|
||||
JsonElement cases = LoadCaseSet("timeout_cancel", "cases");
|
||||
HashSet<string> statusCodes = new(StringComparer.Ordinal);
|
||||
|
||||
foreach (JsonElement timeoutCase in cases.EnumerateArray())
|
||||
{
|
||||
string grpcStatusCode = timeoutCase.GetProperty("grpcStatusCode").GetString()!;
|
||||
|
||||
Assert.Contains(grpcStatusCode, TimeoutGrpcStatusCodes);
|
||||
Assert.NotEmpty(timeoutCase.GetProperty("clientDeadline").GetString()!);
|
||||
Assert.NotEmpty(timeoutCase.GetProperty("gatewayWaitBehavior").GetString()!);
|
||||
Assert.NotEmpty(timeoutCase.GetProperty("workerCommandBehavior").GetString()!);
|
||||
Assert.NotEmpty(timeoutCase.GetProperty("expectedClientAction").GetString()!);
|
||||
|
||||
statusCodes.Add(grpcStatusCode);
|
||||
}
|
||||
|
||||
Assert.Contains("DEADLINE_EXCEEDED", statusCodes);
|
||||
Assert.Contains("CANCELLED", statusCodes);
|
||||
}
|
||||
|
||||
private static readonly string[] KnownCategories =
|
||||
[
|
||||
"command_replies",
|
||||
"event_streams",
|
||||
"value_conversion",
|
||||
"status_conversion",
|
||||
"auth_errors",
|
||||
"timeout_cancel",
|
||||
];
|
||||
|
||||
private static readonly string[] KnownMessageTypes =
|
||||
[
|
||||
"mxaccess_gateway.v1.MxCommandReply",
|
||||
"mxaccess_gateway.v1.MxEvent",
|
||||
"mxaccess_gateway.v1.MxValue",
|
||||
"mxaccess_gateway.v1.MxStatusProxy",
|
||||
"client_behavior.v1.AuthErrorCase",
|
||||
"client_behavior.v1.TimeoutCancelCase",
|
||||
];
|
||||
|
||||
private static readonly string[] AuthGrpcStatusCodes =
|
||||
[
|
||||
"UNAUTHENTICATED",
|
||||
"PERMISSION_DENIED",
|
||||
];
|
||||
|
||||
private static readonly string[] AuthClientErrorCategories =
|
||||
[
|
||||
"AuthenticationError",
|
||||
"AuthorizationError",
|
||||
];
|
||||
|
||||
private static readonly string[] TimeoutGrpcStatusCodes =
|
||||
[
|
||||
"DEADLINE_EXCEEDED",
|
||||
"CANCELLED",
|
||||
];
|
||||
|
||||
private static T ParseFixture<T>(
|
||||
JsonElement fixture,
|
||||
MessageParser<T> parser)
|
||||
where T : IMessage<T>
|
||||
{
|
||||
return parser.ParseJson(File.ReadAllText(GetFixturePath(fixture)));
|
||||
}
|
||||
|
||||
private static JsonElement LoadCaseSet(
|
||||
string category,
|
||||
string propertyName)
|
||||
{
|
||||
JsonElement fixture = Assert.Single(LoadManifestFixtures(category));
|
||||
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
|
||||
|
||||
return document.RootElement.GetProperty(propertyName).Clone();
|
||||
}
|
||||
|
||||
private static IReadOnlyList<JsonElement> LoadManifestFixtures(string category)
|
||||
{
|
||||
using JsonDocument manifest = LoadBehaviorManifest();
|
||||
|
||||
return manifest.RootElement
|
||||
.GetProperty("fixtures")
|
||||
.EnumerateArray()
|
||||
.Where(fixture => fixture.GetProperty("category").GetString() == category)
|
||||
.Select(fixture => fixture.Clone())
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
private static JsonDocument LoadBehaviorManifest()
|
||||
{
|
||||
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetBehaviorFixtureRoot().FullName, "manifest.json")));
|
||||
}
|
||||
|
||||
private static string GetFixturePath(JsonElement fixture)
|
||||
{
|
||||
return Path.Combine(GetBehaviorFixtureRoot().FullName, fixture.GetProperty("path").GetString()!);
|
||||
}
|
||||
|
||||
private static string GetFixtureId(JsonElement fixture)
|
||||
{
|
||||
return fixture.GetProperty("id").GetString()!;
|
||||
}
|
||||
|
||||
private static DirectoryInfo GetBehaviorFixtureRoot()
|
||||
{
|
||||
DirectoryInfo repositoryRoot = FindRepositoryRoot();
|
||||
|
||||
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "behavior"));
|
||||
}
|
||||
|
||||
private static DirectoryInfo FindRepositoryRoot()
|
||||
{
|
||||
DirectoryInfo? current = new(AppContext.BaseDirectory);
|
||||
|
||||
while (current is not null)
|
||||
{
|
||||
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
|
||||
&& Directory.Exists(Path.Combine(current.FullName, "src"))
|
||||
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
|
||||
{
|
||||
return current;
|
||||
}
|
||||
|
||||
current = current.Parent;
|
||||
}
|
||||
|
||||
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
|
||||
}
|
||||
|
||||
private static void AssertEventBodyMatchesFamily(MxEvent gatewayEvent)
|
||||
{
|
||||
switch (gatewayEvent.Family)
|
||||
{
|
||||
case MxEventFamily.OnDataChange:
|
||||
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, gatewayEvent.BodyCase);
|
||||
break;
|
||||
case MxEventFamily.OnWriteComplete:
|
||||
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, gatewayEvent.BodyCase);
|
||||
break;
|
||||
case MxEventFamily.OperationComplete:
|
||||
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, gatewayEvent.BodyCase);
|
||||
break;
|
||||
case MxEventFamily.OnBufferedDataChange:
|
||||
Assert.Equal(MxEvent.BodyOneofCase.OnBufferedDataChange, gatewayEvent.BodyCase);
|
||||
break;
|
||||
default:
|
||||
throw new InvalidOperationException($"Unexpected event family '{gatewayEvent.Family}'.");
|
||||
}
|
||||
}
|
||||
|
||||
private static void AssertJsonKindMatchesValueKind(
|
||||
string expectedKind,
|
||||
MxValue value)
|
||||
{
|
||||
MxValue.KindOneofCase expected = expectedKind switch
|
||||
{
|
||||
"boolValue" => MxValue.KindOneofCase.BoolValue,
|
||||
"int32Value" => MxValue.KindOneofCase.Int32Value,
|
||||
"int64Value" => MxValue.KindOneofCase.Int64Value,
|
||||
"floatValue" => MxValue.KindOneofCase.FloatValue,
|
||||
"doubleValue" => MxValue.KindOneofCase.DoubleValue,
|
||||
"stringValue" => MxValue.KindOneofCase.StringValue,
|
||||
"timestampValue" => MxValue.KindOneofCase.TimestampValue,
|
||||
"arrayValue" => MxValue.KindOneofCase.ArrayValue,
|
||||
"rawValue" => MxValue.KindOneofCase.RawValue,
|
||||
_ => throw new InvalidOperationException($"Unexpected expected value kind '{expectedKind}'."),
|
||||
};
|
||||
|
||||
Assert.Equal(expected, value.KindCase);
|
||||
}
|
||||
}
|
||||
@@ -105,6 +105,25 @@ public sealed class FakeWorkerHarnessTests
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
|
||||
{
|
||||
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
||||
await using WorkerClient client = fakeWorker.CreateClient();
|
||||
await StartClientAsync(fakeWorker, client);
|
||||
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
await fakeWorker.SendHeartbeatAsync(
|
||||
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
|
||||
|
||||
await WaitUntilAsync(
|
||||
() => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat,
|
||||
TestTimeout);
|
||||
|
||||
Assert.Equal(WorkerClientState.Ready, client.State);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
|
||||
{
|
||||
|
||||
@@ -284,6 +284,26 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task SendHeartbeatAsync(
|
||||
WorkerState state = WorkerState.Ready,
|
||||
CancellationToken cancellationToken = default,
|
||||
Action<WorkerHeartbeat>? configureHeartbeat = null)
|
||||
{
|
||||
WorkerHeartbeat heartbeat = new()
|
||||
{
|
||||
WorkerProcessId = DefaultWorkerProcessId,
|
||||
State = state,
|
||||
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
};
|
||||
configureHeartbeat?.Invoke(heartbeat);
|
||||
|
||||
await _writer.WriteAsync(
|
||||
CreateEnvelope(
|
||||
correlationId: string.Empty,
|
||||
envelope => envelope.WorkerHeartbeat = heartbeat),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task SendShutdownAckAsync(
|
||||
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
|
||||
CancellationToken cancellationToken = default)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.IO.Pipes;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Workers;
|
||||
@@ -151,6 +152,24 @@ public sealed class WorkerClientTests
|
||||
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
|
||||
{
|
||||
await using PipePair pipePair = await PipePair.CreateAsync();
|
||||
await using WorkerClient client = CreateClient(pipePair);
|
||||
await CompleteHandshakeAsync(client, pipePair);
|
||||
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
||||
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
|
||||
|
||||
await WaitUntilAsync(
|
||||
() => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat,
|
||||
TestTimeout);
|
||||
|
||||
Assert.Equal(WorkerClientState.Ready, client.State);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient()
|
||||
{
|
||||
@@ -276,6 +295,21 @@ public sealed class WorkerClientTests
|
||||
});
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
|
||||
{
|
||||
return CreateWorkerEnvelope(
|
||||
correlationId: string.Empty,
|
||||
sequence: 20,
|
||||
envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat
|
||||
{
|
||||
WorkerProcessId = workerProcessId,
|
||||
State = WorkerState.Ready,
|
||||
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
PendingCommandCount = 0,
|
||||
OutboundEventQueueDepth = 0,
|
||||
});
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateWorkerEnvelope(
|
||||
string correlationId,
|
||||
ulong sequence,
|
||||
|
||||
@@ -30,7 +30,7 @@ public sealed class WorkerApplicationTests
|
||||
Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]);
|
||||
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]);
|
||||
Assert.Equal("[redacted]", entry.Fields["nonce"]);
|
||||
Assert.Equal("WorkerPipeHandshakeSucceeded", logger.Entries[1].EventName);
|
||||
Assert.Equal("WorkerPipeSessionCompleted", logger.Entries[1].EventName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Worker.Bootstrap;
|
||||
using MxGateway.Worker.Ipc;
|
||||
using MxGateway.Worker.MxAccess;
|
||||
using MxGateway.Worker.Sta;
|
||||
|
||||
namespace MxGateway.Worker.Tests.Ipc;
|
||||
|
||||
@@ -28,7 +33,9 @@ public sealed class WorkerPipeClientTests
|
||||
PipeTransmissionMode.Byte,
|
||||
PipeOptions.Asynchronous);
|
||||
|
||||
WorkerPipeClient client = new(connectTimeoutMilliseconds: 5000);
|
||||
WorkerPipeClient client = new(
|
||||
connectTimeoutMilliseconds: 5000,
|
||||
(stream, options) => CreateSession(stream, options));
|
||||
Task clientTask = client.RunAsync(workerOptions);
|
||||
|
||||
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
|
||||
@@ -56,6 +63,87 @@ public sealed class WorkerPipeClientTests
|
||||
WorkerEnvelope ready = await reader.ReadAsync();
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
|
||||
|
||||
await writer.WriteAsync(new WorkerEnvelope
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
SessionId = "session-1",
|
||||
Sequence = 2,
|
||||
WorkerShutdown = new WorkerShutdown
|
||||
{
|
||||
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
|
||||
Reason = "test-complete",
|
||||
},
|
||||
});
|
||||
|
||||
WorkerEnvelope shutdownAck = await reader.ReadAsync();
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase);
|
||||
await clientTask;
|
||||
}
|
||||
|
||||
private static WorkerPipeSession CreateSession(
|
||||
Stream stream,
|
||||
WorkerFrameProtocolOptions options)
|
||||
{
|
||||
return new WorkerPipeSession(
|
||||
new WorkerFrameReader(stream, options),
|
||||
new WorkerFrameWriter(stream, options),
|
||||
options,
|
||||
() => 1234,
|
||||
new WorkerPipeSessionOptions
|
||||
{
|
||||
HeartbeatInterval = TimeSpan.FromSeconds(30),
|
||||
HeartbeatGrace = TimeSpan.FromSeconds(30),
|
||||
},
|
||||
() => new FakeRuntimeSession());
|
||||
}
|
||||
|
||||
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
|
||||
{
|
||||
public Task<WorkerReady> StartAsync(
|
||||
string sessionId,
|
||||
int workerProcessId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return Task.FromResult(new WorkerReady
|
||||
{
|
||||
WorkerProcessId = workerProcessId,
|
||||
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
|
||||
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
|
||||
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
});
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> DispatchAsync(StaCommand command)
|
||||
{
|
||||
return Task.FromResult(new MxCommandReply
|
||||
{
|
||||
SessionId = command.SessionId,
|
||||
CorrelationId = command.CorrelationId,
|
||||
Kind = command.Kind,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = "OK",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
|
||||
{
|
||||
return new WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset.UtcNow,
|
||||
pendingCommandCount: 0,
|
||||
outboundEventQueueDepth: 0,
|
||||
lastEventSequence: 0,
|
||||
currentCommandCorrelationId: string.Empty);
|
||||
}
|
||||
|
||||
public void RequestShutdown()
|
||||
{
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Worker.Ipc;
|
||||
using MxGateway.Worker.MxAccess;
|
||||
using MxGateway.Worker.Sta;
|
||||
|
||||
namespace MxGateway.Worker.Tests.Ipc;
|
||||
|
||||
@@ -147,6 +152,127 @@ public sealed class WorkerPipeSessionTests
|
||||
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RunAsync_SendsHeartbeatPayloadFromRuntimeSnapshot()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
|
||||
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
|
||||
FakeRuntimeSession runtime = new();
|
||||
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset.UtcNow,
|
||||
pendingCommandCount: 2,
|
||||
outboundEventQueueDepth: 3,
|
||||
lastEventSequence: 42,
|
||||
currentCommandCorrelationId: "current-command"));
|
||||
WorkerPipeSession session = CreatePipeSession(
|
||||
pipePair.WorkerStream,
|
||||
runtime,
|
||||
new WorkerPipeSessionOptions
|
||||
{
|
||||
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
|
||||
HeartbeatGrace = TimeSpan.FromSeconds(5),
|
||||
});
|
||||
Task runTask = session.RunAsync(cancellation.Token);
|
||||
|
||||
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
|
||||
await ThrowIfCompletedAsync(runTask);
|
||||
|
||||
WorkerEnvelope heartbeat = await ReadUntilAsync(
|
||||
pipePair.GatewayReader,
|
||||
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
|
||||
cancellation.Token);
|
||||
|
||||
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
|
||||
Assert.Equal(1234, heartbeat.WorkerHeartbeat.WorkerProcessId);
|
||||
Assert.Equal(2u, heartbeat.WorkerHeartbeat.PendingCommandCount);
|
||||
Assert.Equal(3u, heartbeat.WorkerHeartbeat.OutboundEventQueueDepth);
|
||||
Assert.Equal(42UL, heartbeat.WorkerHeartbeat.LastEventSequence);
|
||||
Assert.Equal("current-command", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
|
||||
|
||||
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RunAsync_WhenCommandIsExecuting_HeartbeatReportsCurrentCorrelation()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
|
||||
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
|
||||
FakeRuntimeSession runtime = new()
|
||||
{
|
||||
BlockDispatch = true,
|
||||
};
|
||||
WorkerPipeSession session = CreatePipeSession(
|
||||
pipePair.WorkerStream,
|
||||
runtime,
|
||||
new WorkerPipeSessionOptions
|
||||
{
|
||||
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
|
||||
HeartbeatGrace = TimeSpan.FromSeconds(5),
|
||||
});
|
||||
Task runTask = session.RunAsync(cancellation.Token);
|
||||
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
|
||||
|
||||
await pipePair.GatewayWriter.WriteAsync(
|
||||
CreateCommandEnvelope("command-1"),
|
||||
cancellation.Token);
|
||||
|
||||
Assert.True(runtime.DispatchStarted.Wait(TimeSpan.FromSeconds(2)));
|
||||
WorkerEnvelope heartbeat = await ReadUntilAsync(
|
||||
pipePair.GatewayReader,
|
||||
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
|
||||
envelope => envelope.WorkerHeartbeat.CurrentCommandCorrelationId == "command-1",
|
||||
cancellation.Token);
|
||||
|
||||
Assert.Equal("command-1", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
|
||||
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
|
||||
|
||||
runtime.ReleaseDispatch();
|
||||
WorkerEnvelope reply = await ReadUntilAsync(
|
||||
pipePair.GatewayReader,
|
||||
WorkerEnvelope.BodyOneofCase.WorkerCommandReply,
|
||||
cancellation.Token);
|
||||
|
||||
Assert.Equal("command-1", reply.CorrelationId);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.WorkerCommandReply.Reply.ProtocolStatus.Code);
|
||||
|
||||
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RunAsync_WhenStaActivityIsStale_WritesWatchdogFault()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
|
||||
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
|
||||
FakeRuntimeSession runtime = new();
|
||||
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset.UtcNow - TimeSpan.FromSeconds(5),
|
||||
pendingCommandCount: 0,
|
||||
outboundEventQueueDepth: 0,
|
||||
lastEventSequence: 0,
|
||||
currentCommandCorrelationId: "stuck-command"));
|
||||
WorkerPipeSession session = CreatePipeSession(
|
||||
pipePair.WorkerStream,
|
||||
runtime,
|
||||
new WorkerPipeSessionOptions
|
||||
{
|
||||
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
|
||||
HeartbeatGrace = TimeSpan.FromMilliseconds(50),
|
||||
});
|
||||
Task runTask = session.RunAsync(cancellation.Token);
|
||||
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
|
||||
|
||||
WorkerEnvelope fault = await ReadUntilAsync(
|
||||
pipePair.GatewayReader,
|
||||
WorkerEnvelope.BodyOneofCase.WorkerFault,
|
||||
cancellation.Token);
|
||||
|
||||
Assert.Equal(WorkerFaultCategory.StaHung, fault.WorkerFault.Category);
|
||||
Assert.Equal("stuck-command", fault.WorkerFault.CommandMethod);
|
||||
Assert.Contains("STA activity is stale", fault.WorkerFault.DiagnosticMessage);
|
||||
|
||||
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
|
||||
}
|
||||
|
||||
private static WorkerPipeSession CreateSession(
|
||||
Stream inbound,
|
||||
Stream outbound,
|
||||
@@ -159,6 +285,21 @@ public sealed class WorkerPipeSessionTests
|
||||
() => 1234);
|
||||
}
|
||||
|
||||
private static WorkerPipeSession CreatePipeSession(
|
||||
Stream stream,
|
||||
FakeRuntimeSession runtime,
|
||||
WorkerPipeSessionOptions sessionOptions)
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
return new WorkerPipeSession(
|
||||
new WorkerFrameReader(stream, options),
|
||||
new WorkerFrameWriter(stream, options),
|
||||
options,
|
||||
() => 1234,
|
||||
sessionOptions,
|
||||
() => runtime);
|
||||
}
|
||||
|
||||
private static WorkerFrameProtocolOptions CreateOptions()
|
||||
{
|
||||
return new WorkerFrameProtocolOptions(
|
||||
@@ -185,6 +326,119 @@ public sealed class WorkerPipeSessionTests
|
||||
};
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateCommandEnvelope(string correlationId)
|
||||
{
|
||||
return new WorkerEnvelope
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
SessionId = SessionId,
|
||||
Sequence = 2,
|
||||
CorrelationId = correlationId,
|
||||
WorkerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Ping,
|
||||
Ping = new PingCommand
|
||||
{
|
||||
Message = "ping",
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static WorkerEnvelope CreateShutdownEnvelope()
|
||||
{
|
||||
return new WorkerEnvelope
|
||||
{
|
||||
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||
SessionId = SessionId,
|
||||
Sequence = 3,
|
||||
WorkerShutdown = new WorkerShutdown
|
||||
{
|
||||
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
|
||||
Reason = "test-complete",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static async Task CompleteGatewayHandshakeAsync(
|
||||
PipePair pipePair,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await pipePair.GatewayWriter
|
||||
.WriteAsync(CreateGatewayHelloEnvelope(), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
WorkerEnvelope hello = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||
WorkerEnvelope ready = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase);
|
||||
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
|
||||
}
|
||||
|
||||
private static async Task SendShutdownAndWaitAsync(
|
||||
PipePair pipePair,
|
||||
Task runTask,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await pipePair.GatewayWriter
|
||||
.WriteAsync(CreateShutdownEnvelope(), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
WorkerEnvelope shutdownAck = await ReadUntilAsync(
|
||||
pipePair.GatewayReader,
|
||||
WorkerEnvelope.BodyOneofCase.WorkerShutdownAck,
|
||||
cancellationToken);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, shutdownAck.WorkerShutdownAck.Status.Code);
|
||||
Task completedTask = await Task
|
||||
.WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(2), cancellationToken))
|
||||
.ConfigureAwait(false);
|
||||
|
||||
Assert.Same(runTask, completedTask);
|
||||
await runTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task ThrowIfCompletedAsync(Task task)
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(100));
|
||||
if (task.IsCompleted)
|
||||
{
|
||||
await task;
|
||||
}
|
||||
}
|
||||
|
||||
private static Task<WorkerEnvelope> ReadUntilAsync(
|
||||
WorkerFrameReader reader,
|
||||
WorkerEnvelope.BodyOneofCase expectedBody,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return ReadUntilAsync(
|
||||
reader,
|
||||
expectedBody,
|
||||
_ => true,
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static async Task<WorkerEnvelope> ReadUntilAsync(
|
||||
WorkerFrameReader reader,
|
||||
WorkerEnvelope.BodyOneofCase expectedBody,
|
||||
Func<WorkerEnvelope, bool> predicate,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
WorkerEnvelope envelope = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (envelope.BodyCase == expectedBody && predicate(envelope))
|
||||
{
|
||||
return envelope;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static WorkerEnvelope[] ReadWrittenFrames(
|
||||
MemoryStream stream,
|
||||
WorkerFrameProtocolOptions options)
|
||||
@@ -219,4 +473,158 @@ public sealed class WorkerPipeSessionTests
|
||||
buffer[2] = (byte)(value >> 16);
|
||||
buffer[3] = (byte)(value >> 24);
|
||||
}
|
||||
|
||||
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
|
||||
{
|
||||
private readonly ManualResetEventSlim releaseDispatch = new(false);
|
||||
private readonly object gate = new();
|
||||
private WorkerRuntimeHeartbeatSnapshot snapshot = new(
|
||||
DateTimeOffset.UtcNow,
|
||||
pendingCommandCount: 0,
|
||||
outboundEventQueueDepth: 0,
|
||||
lastEventSequence: 0,
|
||||
currentCommandCorrelationId: string.Empty);
|
||||
|
||||
public ManualResetEventSlim DispatchStarted { get; } = new(false);
|
||||
|
||||
public bool BlockDispatch { get; set; }
|
||||
|
||||
public Task<WorkerReady> StartAsync(
|
||||
string sessionId,
|
||||
int workerProcessId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return Task.FromResult(new WorkerReady
|
||||
{
|
||||
WorkerProcessId = workerProcessId,
|
||||
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
|
||||
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
|
||||
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
});
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> DispatchAsync(StaCommand command)
|
||||
{
|
||||
return Task.Run(
|
||||
() =>
|
||||
{
|
||||
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset.UtcNow,
|
||||
pendingCommandCount: 0,
|
||||
outboundEventQueueDepth: 0,
|
||||
lastEventSequence: 0,
|
||||
command.CorrelationId));
|
||||
DispatchStarted.Set();
|
||||
|
||||
if (BlockDispatch)
|
||||
{
|
||||
releaseDispatch.Wait(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset.UtcNow,
|
||||
pendingCommandCount: 0,
|
||||
outboundEventQueueDepth: 0,
|
||||
lastEventSequence: 0,
|
||||
currentCommandCorrelationId: string.Empty));
|
||||
|
||||
return new MxCommandReply
|
||||
{
|
||||
SessionId = command.SessionId,
|
||||
CorrelationId = command.CorrelationId,
|
||||
Kind = command.Kind,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = "OK",
|
||||
},
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
|
||||
{
|
||||
lock (gate)
|
||||
{
|
||||
return snapshot;
|
||||
}
|
||||
}
|
||||
|
||||
public void RequestShutdown()
|
||||
{
|
||||
releaseDispatch.Set();
|
||||
}
|
||||
|
||||
public void ReleaseDispatch()
|
||||
{
|
||||
releaseDispatch.Set();
|
||||
}
|
||||
|
||||
public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value)
|
||||
{
|
||||
lock (gate)
|
||||
{
|
||||
snapshot = value;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
releaseDispatch.Set();
|
||||
releaseDispatch.Dispose();
|
||||
DispatchStarted.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class PipePair : IDisposable
|
||||
{
|
||||
private readonly NamedPipeServerStream gatewayStream;
|
||||
|
||||
private PipePair(
|
||||
NamedPipeServerStream gatewayStream,
|
||||
NamedPipeClientStream workerStream)
|
||||
{
|
||||
this.gatewayStream = gatewayStream;
|
||||
WorkerStream = workerStream;
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
GatewayReader = new WorkerFrameReader(gatewayStream, options);
|
||||
GatewayWriter = new WorkerFrameWriter(gatewayStream, options);
|
||||
}
|
||||
|
||||
public Stream WorkerStream { get; }
|
||||
|
||||
public WorkerFrameReader GatewayReader { get; }
|
||||
|
||||
public WorkerFrameWriter GatewayWriter { get; }
|
||||
|
||||
public static async Task<PipePair> CreateAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
string pipeName = $"mxaccessgw-worker-session-tests-{Guid.NewGuid():N}";
|
||||
NamedPipeServerStream gatewayStream = new(
|
||||
pipeName,
|
||||
PipeDirection.InOut,
|
||||
maxNumberOfServerInstances: 1,
|
||||
PipeTransmissionMode.Byte,
|
||||
PipeOptions.Asynchronous);
|
||||
NamedPipeClientStream workerStream = new(
|
||||
".",
|
||||
pipeName,
|
||||
PipeDirection.InOut,
|
||||
PipeOptions.Asynchronous);
|
||||
|
||||
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync();
|
||||
await Task
|
||||
.Run(() => workerStream.Connect(5000), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
await waitForConnectionTask.ConfigureAwait(false);
|
||||
|
||||
return new PipePair(gatewayStream, workerStream);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
WorkerStream.Dispose();
|
||||
gatewayStream.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
@@ -11,6 +12,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
public const int DefaultConnectTimeoutMilliseconds = 30000;
|
||||
|
||||
private readonly int _connectTimeoutMilliseconds;
|
||||
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
|
||||
|
||||
public WorkerPipeClient()
|
||||
: this(DefaultConnectTimeoutMilliseconds)
|
||||
@@ -18,6 +20,15 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
}
|
||||
|
||||
public WorkerPipeClient(int connectTimeoutMilliseconds)
|
||||
: this(
|
||||
connectTimeoutMilliseconds,
|
||||
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(
|
||||
int connectTimeoutMilliseconds,
|
||||
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
|
||||
{
|
||||
if (connectTimeoutMilliseconds <= 0)
|
||||
{
|
||||
@@ -26,6 +37,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
"Worker pipe connect timeout must be greater than zero.");
|
||||
}
|
||||
|
||||
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
|
||||
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
|
||||
}
|
||||
|
||||
@@ -48,8 +60,8 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
|
||||
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
WorkerPipeSession session = new(pipe, frameOptions);
|
||||
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
|
||||
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
|
||||
await session.RunAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private Task ConnectAsync(
|
||||
|
||||
@@ -6,6 +6,7 @@ using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Worker.MxAccess;
|
||||
using MxGateway.Worker.Sta;
|
||||
|
||||
namespace MxGateway.Worker.Ipc;
|
||||
|
||||
@@ -13,10 +14,14 @@ public sealed class WorkerPipeSession
|
||||
{
|
||||
private readonly WorkerFrameProtocolOptions _options;
|
||||
private readonly Func<int> _processIdProvider;
|
||||
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
|
||||
private readonly WorkerPipeSessionOptions _sessionOptions;
|
||||
private readonly WorkerFrameReader _reader;
|
||||
private readonly WorkerFrameWriter _writer;
|
||||
private MxAccessStaSession? _mxAccessStaSession;
|
||||
private IWorkerRuntimeSession? _runtimeSession;
|
||||
private long _nextSequence;
|
||||
private WorkerState _state = WorkerState.Starting;
|
||||
private bool _watchdogFaultSent;
|
||||
|
||||
public WorkerPipeSession(
|
||||
Stream stream,
|
||||
@@ -34,11 +39,49 @@ public sealed class WorkerPipeSession
|
||||
WorkerFrameWriter writer,
|
||||
WorkerFrameProtocolOptions options,
|
||||
Func<int> processIdProvider)
|
||||
: this(
|
||||
reader,
|
||||
writer,
|
||||
options,
|
||||
processIdProvider,
|
||||
new WorkerPipeSessionOptions(),
|
||||
() => new MxAccessStaSession())
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeSession(
|
||||
WorkerFrameReader reader,
|
||||
WorkerFrameWriter writer,
|
||||
WorkerFrameProtocolOptions options,
|
||||
Func<int> processIdProvider,
|
||||
WorkerPipeSessionOptions sessionOptions,
|
||||
Func<IWorkerRuntimeSession> runtimeSessionFactory)
|
||||
{
|
||||
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
|
||||
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
|
||||
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
|
||||
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
|
||||
_sessionOptions.Validate();
|
||||
}
|
||||
|
||||
public async Task RunAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
_runtimeSession = _runtimeSessionFactory();
|
||||
try
|
||||
{
|
||||
await CompleteStartupHandshakeAsync(
|
||||
token => _runtimeSession.StartAsync(_options.SessionId, _processIdProvider(), token),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_runtimeSession?.Dispose();
|
||||
_runtimeSession = null;
|
||||
_state = WorkerState.Stopped;
|
||||
}
|
||||
}
|
||||
|
||||
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
|
||||
@@ -76,11 +119,14 @@ public sealed class WorkerPipeSession
|
||||
try
|
||||
{
|
||||
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||
_state = WorkerState.Handshaking;
|
||||
ValidateGatewayHello(envelope);
|
||||
|
||||
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
|
||||
_state = WorkerState.InitializingSta;
|
||||
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
|
||||
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
|
||||
_state = WorkerState.Ready;
|
||||
}
|
||||
catch (WorkerFrameProtocolException exception)
|
||||
{
|
||||
@@ -140,6 +186,174 @@ public sealed class WorkerPipeSession
|
||||
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
|
||||
}
|
||||
|
||||
private async Task RunMessageLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
using CancellationTokenSource heartbeatCancellation = CancellationTokenSource
|
||||
.CreateLinkedTokenSource(cancellationToken);
|
||||
Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token);
|
||||
|
||||
try
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Task<WorkerEnvelope> readTask = _reader.ReadAsync(cancellationToken);
|
||||
Task completedTask = await Task.WhenAny(readTask, heartbeatTask).ConfigureAwait(false);
|
||||
if (completedTask == heartbeatTask)
|
||||
{
|
||||
await heartbeatTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
WorkerEnvelope envelope = await readTask.ConfigureAwait(false);
|
||||
bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false);
|
||||
if (!keepReading)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
heartbeatCancellation.Cancel();
|
||||
try
|
||||
{
|
||||
await heartbeatTask.ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> DispatchGatewayEnvelopeAsync(
|
||||
WorkerEnvelope envelope,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
switch (envelope.BodyCase)
|
||||
{
|
||||
case WorkerEnvelope.BodyOneofCase.WorkerCommand:
|
||||
_ = ProcessCommandAsync(envelope, cancellationToken);
|
||||
return true;
|
||||
case WorkerEnvelope.BodyOneofCase.WorkerShutdown:
|
||||
await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false);
|
||||
return false;
|
||||
case WorkerEnvelope.BodyOneofCase.WorkerCancel:
|
||||
return true;
|
||||
default:
|
||||
throw new WorkerFrameProtocolException(
|
||||
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
|
||||
$"Worker received unexpected gateway envelope body {envelope.BodyCase}.");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessCommandAsync(
|
||||
WorkerEnvelope envelope,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
IWorkerRuntimeSession runtimeSession = _runtimeSession
|
||||
?? throw new InvalidOperationException("Worker runtime session has not been initialized.");
|
||||
WorkerCommand workerCommand = envelope.WorkerCommand;
|
||||
MxCommand command = workerCommand.Command;
|
||||
StaCommand staCommand = new(
|
||||
_options.SessionId,
|
||||
envelope.CorrelationId,
|
||||
command,
|
||||
workerCommand.EnqueueTimestamp,
|
||||
cancellationToken);
|
||||
|
||||
try
|
||||
{
|
||||
MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false);
|
||||
await _writer
|
||||
.WriteAsync(
|
||||
CreateEnvelope(new WorkerCommandReply
|
||||
{
|
||||
Reply = reply,
|
||||
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception exception) when (exception is not OperationCanceledException)
|
||||
{
|
||||
_state = WorkerState.Faulted;
|
||||
await TryWriteFaultAsync(
|
||||
CreateFault(
|
||||
WorkerFaultCategory.MxaccessCommandFailed,
|
||||
staCommand.MethodName,
|
||||
exception),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ShutdownAsync(
|
||||
WorkerShutdown shutdown,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
_state = WorkerState.ShuttingDown;
|
||||
_runtimeSession?.RequestShutdown();
|
||||
|
||||
await _writer
|
||||
.WriteAsync(
|
||||
CreateEnvelope(
|
||||
new WorkerShutdownAck
|
||||
{
|
||||
Status = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
|
||||
? "Worker shutdown accepted."
|
||||
: $"Worker shutdown accepted: {shutdown.Reason}",
|
||||
},
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
|
||||
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
|
||||
if (runtimeSession is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
WorkerRuntimeHeartbeatSnapshot snapshot = runtimeSession.CaptureHeartbeat();
|
||||
await _writer
|
||||
.WriteAsync(CreateEnvelope(CreateHeartbeat(snapshot)), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await ReportWatchdogFaultIfNeededAsync(snapshot, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReportWatchdogFaultIfNeededAsync(
|
||||
WorkerRuntimeHeartbeatSnapshot snapshot,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
TimeSpan staleFor = DateTimeOffset.UtcNow - snapshot.LastStaActivityUtc;
|
||||
if (staleFor <= _sessionOptions.HeartbeatGrace)
|
||||
{
|
||||
_watchdogFaultSent = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (_watchdogFaultSent)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_watchdogFaultSent = true;
|
||||
await TryWriteFaultAsync(
|
||||
CreateFault(
|
||||
WorkerFaultCategory.StaHung,
|
||||
snapshot.CurrentCommandCorrelationId,
|
||||
$"STA activity is stale by {staleFor}."),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task TryWriteFaultAsync(
|
||||
WorkerFrameProtocolException exception,
|
||||
CancellationToken cancellationToken)
|
||||
@@ -178,6 +392,25 @@ public sealed class WorkerPipeSession
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TryWriteFaultAsync(
|
||||
WorkerFault fault,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _writer
|
||||
.WriteAsync(CreateEnvelope(fault), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception faultWriteException) when (
|
||||
faultWriteException is IOException
|
||||
|| faultWriteException is ObjectDisposedException
|
||||
|| faultWriteException is WorkerFrameProtocolException)
|
||||
{
|
||||
// The runtime fault remains observable through worker exit or pipe closure.
|
||||
}
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
|
||||
{
|
||||
return CreateBaseEnvelope(hello);
|
||||
@@ -193,6 +426,21 @@ public sealed class WorkerPipeSession
|
||||
return CreateBaseEnvelope(fault);
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply)
|
||||
{
|
||||
return CreateBaseEnvelope(reply);
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck)
|
||||
{
|
||||
return CreateBaseEnvelope(shutdownAck);
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateEnvelope(WorkerHeartbeat heartbeat)
|
||||
{
|
||||
return CreateBaseEnvelope(heartbeat);
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
|
||||
{
|
||||
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||
@@ -214,6 +462,28 @@ public sealed class WorkerPipeSession
|
||||
return envelope;
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body)
|
||||
{
|
||||
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||
envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty;
|
||||
envelope.WorkerCommandReply = body;
|
||||
return envelope;
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body)
|
||||
{
|
||||
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||
envelope.WorkerShutdownAck = body;
|
||||
return envelope;
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateBaseEnvelope(WorkerHeartbeat body)
|
||||
{
|
||||
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||
envelope.WorkerHeartbeat = body;
|
||||
return envelope;
|
||||
}
|
||||
|
||||
private WorkerEnvelope CreateBaseEnvelope()
|
||||
{
|
||||
return new WorkerEnvelope
|
||||
@@ -231,21 +501,39 @@ public sealed class WorkerPipeSession
|
||||
|
||||
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_mxAccessStaSession = new MxAccessStaSession();
|
||||
_runtimeSession = new MxAccessStaSession();
|
||||
try
|
||||
{
|
||||
return await _mxAccessStaSession
|
||||
return await _runtimeSession
|
||||
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
_mxAccessStaSession.Dispose();
|
||||
_mxAccessStaSession = null;
|
||||
_runtimeSession.Dispose();
|
||||
_runtimeSession = null;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private WorkerHeartbeat CreateHeartbeat(WorkerRuntimeHeartbeatSnapshot snapshot)
|
||||
{
|
||||
WorkerState state = string.IsNullOrWhiteSpace(snapshot.CurrentCommandCorrelationId)
|
||||
? _state
|
||||
: WorkerState.ExecutingCommand;
|
||||
|
||||
return new WorkerHeartbeat
|
||||
{
|
||||
WorkerProcessId = _processIdProvider(),
|
||||
State = state,
|
||||
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(snapshot.LastStaActivityUtc),
|
||||
PendingCommandCount = snapshot.PendingCommandCount,
|
||||
OutboundEventQueueDepth = snapshot.OutboundEventQueueDepth,
|
||||
LastEventSequence = snapshot.LastEventSequence,
|
||||
CurrentCommandCorrelationId = snapshot.CurrentCommandCorrelationId,
|
||||
};
|
||||
}
|
||||
|
||||
private WorkerReady CreateWorkerReady()
|
||||
{
|
||||
return new WorkerReady
|
||||
@@ -295,6 +583,42 @@ public sealed class WorkerPipeSession
|
||||
return fault;
|
||||
}
|
||||
|
||||
private static WorkerFault CreateFault(
|
||||
WorkerFaultCategory category,
|
||||
string commandMethod,
|
||||
Exception exception)
|
||||
{
|
||||
WorkerFault fault = CreateFault(
|
||||
category,
|
||||
commandMethod,
|
||||
exception.Message);
|
||||
fault.ExceptionType = exception.GetType().FullName ?? string.Empty;
|
||||
fault.ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.WorkerUnavailable,
|
||||
Message = exception.Message,
|
||||
};
|
||||
return fault;
|
||||
}
|
||||
|
||||
private static WorkerFault CreateFault(
|
||||
WorkerFaultCategory category,
|
||||
string commandMethod,
|
||||
string diagnosticMessage)
|
||||
{
|
||||
return new WorkerFault
|
||||
{
|
||||
Category = category,
|
||||
CommandMethod = commandMethod ?? string.Empty,
|
||||
DiagnosticMessage = diagnosticMessage,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.WorkerUnavailable,
|
||||
Message = diagnosticMessage,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
|
||||
{
|
||||
return errorCode switch
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
using System;
|
||||
|
||||
namespace MxGateway.Worker.Ipc;
|
||||
|
||||
public sealed class WorkerPipeSessionOptions
|
||||
{
|
||||
public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(5);
|
||||
public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15);
|
||||
|
||||
public WorkerPipeSessionOptions()
|
||||
{
|
||||
HeartbeatInterval = DefaultHeartbeatInterval;
|
||||
HeartbeatGrace = DefaultHeartbeatGrace;
|
||||
}
|
||||
|
||||
public TimeSpan HeartbeatInterval { get; set; }
|
||||
|
||||
public TimeSpan HeartbeatGrace { get; set; }
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
if (HeartbeatInterval <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(
|
||||
nameof(HeartbeatInterval),
|
||||
"Worker heartbeat interval must be greater than zero.");
|
||||
}
|
||||
|
||||
if (HeartbeatGrace <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(
|
||||
nameof(HeartbeatGrace),
|
||||
"Worker heartbeat grace must be greater than zero.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Worker.Sta;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
public interface IWorkerRuntimeSession : IDisposable
|
||||
{
|
||||
Task<WorkerReady> StartAsync(
|
||||
string sessionId,
|
||||
int workerProcessId,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
Task<MxCommandReply> DispatchAsync(StaCommand command);
|
||||
|
||||
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
|
||||
|
||||
void RequestShutdown();
|
||||
}
|
||||
@@ -7,7 +7,7 @@ using MxGateway.Worker.Sta;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
public sealed class MxAccessStaSession : IDisposable
|
||||
public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
||||
{
|
||||
private readonly IMxAccessComObjectFactory factory;
|
||||
private readonly IMxAccessEventSink eventSink;
|
||||
@@ -97,6 +97,30 @@ public sealed class MxAccessStaSession : IDisposable
|
||||
return commandDispatcher.DispatchAsync(command);
|
||||
}
|
||||
|
||||
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
|
||||
{
|
||||
uint pendingCommandCount = 0;
|
||||
string currentCommandCorrelationId = string.Empty;
|
||||
|
||||
if (commandDispatcher is not null)
|
||||
{
|
||||
pendingCommandCount = (uint)commandDispatcher.PendingCommandCount;
|
||||
currentCommandCorrelationId = commandDispatcher.CurrentCommandCorrelationId;
|
||||
}
|
||||
|
||||
return new WorkerRuntimeHeartbeatSnapshot(
|
||||
staRuntime.LastActivityUtc,
|
||||
pendingCommandCount,
|
||||
(uint)eventQueue.Count,
|
||||
eventQueue.LastEventSequence,
|
||||
currentCommandCorrelationId);
|
||||
}
|
||||
|
||||
public void RequestShutdown()
|
||||
{
|
||||
commandDispatcher?.RequestShutdown();
|
||||
}
|
||||
|
||||
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
|
||||
{
|
||||
return eventQueue.Drain(maxEvents);
|
||||
@@ -148,7 +172,7 @@ public sealed class MxAccessStaSession : IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
commandDispatcher?.RequestShutdown();
|
||||
RequestShutdown();
|
||||
|
||||
if (session is not null)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
using System;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
public sealed class WorkerRuntimeHeartbeatSnapshot
|
||||
{
|
||||
public WorkerRuntimeHeartbeatSnapshot(
|
||||
DateTimeOffset lastStaActivityUtc,
|
||||
uint pendingCommandCount,
|
||||
uint outboundEventQueueDepth,
|
||||
ulong lastEventSequence,
|
||||
string currentCommandCorrelationId)
|
||||
{
|
||||
LastStaActivityUtc = lastStaActivityUtc;
|
||||
PendingCommandCount = pendingCommandCount;
|
||||
OutboundEventQueueDepth = outboundEventQueueDepth;
|
||||
LastEventSequence = lastEventSequence;
|
||||
CurrentCommandCorrelationId = currentCommandCorrelationId ?? string.Empty;
|
||||
}
|
||||
|
||||
public DateTimeOffset LastStaActivityUtc { get; }
|
||||
|
||||
public uint PendingCommandCount { get; }
|
||||
|
||||
public uint OutboundEventQueueDepth { get; }
|
||||
|
||||
public ulong LastEventSequence { get; }
|
||||
|
||||
public string CurrentCommandCorrelationId { get; }
|
||||
}
|
||||
@@ -84,7 +84,7 @@ public static class WorkerApplication
|
||||
|
||||
pipeClient.RunAsync(options).GetAwaiter().GetResult();
|
||||
|
||||
logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary<string, object?>
|
||||
logger.Information("WorkerPipeSessionCompleted", new Dictionary<string, object?>
|
||||
{
|
||||
["session_id"] = options.SessionId,
|
||||
["pipe_name"] = options.PipeName,
|
||||
|
||||
Reference in New Issue
Block a user