Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty 1f92078777 Merge remote-tracking branch 'origin/main' into agent-1/issue-37-create-cross-language-client-behavior-fixtures 2026-04-26 19:12:19 -04:00
Joseph Doherty 108a3d3f8a Add client behavior fixtures 2026-04-26 19:11:04 -04:00
dohertj2 95e71cd819 Merge pull request #83 from agent-2/issue-29-implement-event-sink-and-event-queue
Issue #29: implement event sink and event queue
2026-04-26 19:08:52 -04:00
Joseph Doherty 647fe9a4b5 Merge remote-tracking branch 'origin/main' into agent-2/issue-29-implement-event-sink-and-event-queue 2026-04-26 19:05:31 -04:00
Joseph Doherty dd455089b4 Implement worker MXAccess event queue 2026-04-26 19:04:56 -04:00
dohertj2 d0bc4e3c01 Merge pull request #82 from agent-1/issue-36-publish-stable-client-proto-generation-inputs
Issue #36: publish stable client proto generation inputs
2026-04-26 18:56:37 -04:00
28 changed files with 1821 additions and 17 deletions
@@ -0,0 +1,36 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "missing-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": ""
},
"expectedRedactedOutput": "authentication failed: missing bearer token",
"retryableWithoutCredentialChange": false
},
{
"id": "invalid-api-key",
"grpcStatusCode": "UNAUTHENTICATED",
"clientErrorCategory": "AuthenticationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"expectedRedactedOutput": "authentication failed: invalid API key <redacted>",
"retryableWithoutCredentialChange": false
},
{
"id": "missing-write-scope",
"grpcStatusCode": "PERMISSION_DENIED",
"clientErrorCategory": "AuthorizationError",
"inputMetadata": {
"authorization": "Bearer <redacted>"
},
"requiredScope": "mxaccess.write",
"expectedRedactedOutput": "authorization failed: missing scope mxaccess.write",
"retryableWithoutCredentialChange": false
}
]
}
@@ -0,0 +1,30 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-register-1",
"kind": "MX_COMMAND_KIND_REGISTER",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_OK",
"message": "Register completed."
},
"hresult": 0,
"returnValue": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 12
},
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"diagnosticMessage": "COM Register returned server handle 12.",
"register": {
"serverHandle": 12
}
}
@@ -0,0 +1,38 @@
{
"sessionId": "session-fixture",
"correlationId": "gateway-correlation-write-1",
"kind": "MX_COMMAND_KIND_WRITE",
"protocolStatus": {
"code": "PROTOCOL_STATUS_CODE_MXACCESS_FAILURE",
"message": "MXAccess rejected the write."
},
"hresult": -2147220992,
"returnValue": {
"dataType": "MX_DATA_TYPE_NO_DATA",
"variantType": "VT_EMPTY",
"isNull": true,
"rawDiagnostic": "MXAccess returned no value for the failed write.",
"rawDataType": 2
},
"statuses": [
{
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 321,
"rawCategory": 8,
"rawDetectedBy": 3,
"diagnosticText": "Write denied by provider security."
},
{
"success": 0,
"category": "MX_STATUS_CATEGORY_OPERATIONAL_ERROR",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 902,
"rawCategory": 7,
"rawDetectedBy": 5,
"diagnosticText": "Provider rejected the item state."
}
],
"diagnosticMessage": "Fixture preserves a data-bearing MXAccess failure reply with HRESULT and status array."
}
@@ -0,0 +1,159 @@
{
"sessionId": "session-fixture",
"description": "Ordered event stream sample for one worker-backed session.",
"events": [
{
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I4",
"int32Value": 123
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:00Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
],
"workerSequence": "1",
"workerTimestamp": "2026-01-01T00:00:00.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:00.015Z",
"onDataChange": {}
},
{
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_DOUBLE",
"variantType": "VT_R8",
"doubleValue": 45.5
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:01Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Write complete."
}
],
"workerSequence": "2",
"workerTimestamp": "2026-01-01T00:00:01.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:01.015Z",
"hresult": 0,
"onWriteComplete": {}
},
{
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_BSTR",
"stringValue": "operation-complete"
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:02Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_NMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Operation complete."
}
],
"workerSequence": "3",
"workerTimestamp": "2026-01-01T00:00:02.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:02.015Z",
"operationComplete": {}
},
{
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
"sessionId": "session-fixture",
"serverHandle": 12,
"itemHandle": 34,
"value": {
"dataType": "MX_DATA_TYPE_FLOAT",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_FLOAT",
"variantType": "VT_ARRAY|VT_R4",
"dimensions": [
2
],
"floatValues": {
"values": [
1.5,
2.5
]
}
}
},
"quality": 192,
"sourceTimestamp": "2026-01-01T00:00:03Z",
"statuses": [
{
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "Buffered data delivered."
}
],
"workerSequence": "4",
"workerTimestamp": "2026-01-01T00:00:03.010Z",
"gatewayReceiveTimestamp": "2026-01-01T00:00:03.015Z",
"onBufferedDataChange": {
"dataType": "MX_DATA_TYPE_FLOAT",
"qualityValues": {
"elementDataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_ARRAY|VT_I4",
"dimensions": [
2
],
"int32Values": {
"values": [
192,
192
]
}
},
"timestampValues": {
"elementDataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_ARRAY|VT_DATE",
"dimensions": [
2
],
"timestampValues": {
"values": [
"2026-01-01T00:00:02Z",
"2026-01-01T00:00:03Z"
]
}
},
"rawDataType": 5
}
}
]
}
@@ -0,0 +1,59 @@
{
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-client-behavior",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"protoInputManifest": "clients/proto/proto-inputs.json",
"fixtures": [
{
"id": "command-reply.register.ok",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/register.ok.reply.json",
"expectation": "Successful command replies preserve protocol status, HRESULT, return value, status arrays, and method-specific output."
},
{
"id": "command-reply.write.mxaccess-failure",
"category": "command_replies",
"messageType": "mxaccess_gateway.v1.MxCommandReply",
"path": "command-replies/write.mxaccess-failure.reply.json",
"expectation": "MXAccess failures are data-bearing replies with HRESULT and status details, not transport failures."
},
{
"id": "event-stream.session-ordered",
"category": "event_streams",
"messageType": "mxaccess_gateway.v1.MxEvent",
"path": "event-streams/session-event-stream.json",
"expectation": "Clients preserve per-session event order and event family bodies exactly as emitted."
},
{
"id": "values.conversion-cases",
"category": "value_conversion",
"messageType": "mxaccess_gateway.v1.MxValue",
"path": "values/value-conversion-cases.json",
"expectation": "Clients expose typed projections and keep raw fallback metadata when conversion is incomplete."
},
{
"id": "statuses.conversion-cases",
"category": "status_conversion",
"messageType": "mxaccess_gateway.v1.MxStatusProxy",
"path": "statuses/status-conversion-cases.json",
"expectation": "Clients preserve every MXSTATUS_PROXY field, including raw category/source values."
},
{
"id": "auth.error-cases",
"category": "auth_errors",
"messageType": "client_behavior.v1.AuthErrorCase",
"path": "auth/auth-error-cases.json",
"expectation": "Clients map authentication and authorization failures distinctly and redact credentials."
},
{
"id": "timeout-cancel.expected-behavior",
"category": "timeout_cancel",
"messageType": "client_behavior.v1.TimeoutCancelCase",
"path": "timeout-cancel/timeout-cancel-cases.json",
"expectation": "Client cancellation stops waiting locally but does not imply an in-flight MXAccess COM call was aborted."
}
]
}
@@ -0,0 +1,41 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "ok.responding-lmx",
"status": {
"success": 1,
"category": "MX_STATUS_CATEGORY_OK",
"detectedBy": "MX_STATUS_SOURCE_RESPONDING_LMX",
"detail": 0,
"rawCategory": 0,
"rawDetectedBy": 0,
"diagnosticText": "OK"
}
},
{
"id": "security-error.requesting-lmx",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_SECURITY_ERROR",
"detectedBy": "MX_STATUS_SOURCE_REQUESTING_LMX",
"detail": 401,
"rawCategory": 8,
"rawDetectedBy": 2,
"diagnosticText": "Requesting LMX denied the secured operation."
}
},
{
"id": "raw-unknown-category",
"status": {
"success": 0,
"category": "MX_STATUS_CATEGORY_UNKNOWN",
"detectedBy": "MX_STATUS_SOURCE_UNKNOWN",
"detail": 65535,
"rawCategory": 99,
"rawDetectedBy": 77,
"diagnosticText": "Unknown native MXSTATUS_PROXY fields are preserved."
}
}
]
}
@@ -0,0 +1,27 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "unary-deadline-exceeded",
"operation": "Invoke",
"clientDeadline": "2s",
"grpcStatusCode": "DEADLINE_EXCEEDED",
"clientErrorCategory": "TimeoutError",
"gatewayWaitBehavior": "stops_waiting_for_reply",
"workerCommandBehavior": "continues_until_worker_reply_or_worker_fault",
"sessionExpectation": "session_state_is_unknown_until_follow_up_status_or_close",
"expectedClientAction": "issue GetSessionState or CloseSession before reusing handles"
},
{
"id": "stream-cancel",
"operation": "StreamEvents",
"clientDeadline": "5s",
"grpcStatusCode": "CANCELLED",
"clientErrorCategory": "CancelledError",
"gatewayWaitBehavior": "stops_streaming_to_that_call",
"workerCommandBehavior": "does_not_cancel_worker_session",
"sessionExpectation": "session_remains_ready_if_worker_stays_healthy",
"expectedClientAction": "open a new StreamEvents call with the last observed worker sequence"
}
]
}
@@ -0,0 +1,85 @@
{
"schemaVersion": 1,
"cases": [
{
"id": "bool.true",
"expectedKind": "boolValue",
"value": {
"dataType": "MX_DATA_TYPE_BOOLEAN",
"variantType": "VT_BOOL",
"boolValue": true
}
},
{
"id": "int64.large",
"expectedKind": "int64Value",
"value": {
"dataType": "MX_DATA_TYPE_INTEGER",
"variantType": "VT_I8",
"int64Value": "9223372036854770000"
}
},
{
"id": "timestamp.utc",
"expectedKind": "timestampValue",
"value": {
"dataType": "MX_DATA_TYPE_TIME",
"variantType": "VT_DATE",
"timestampValue": "2026-01-01T00:00:04Z"
}
},
{
"id": "string-array",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_STRING",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_STRING",
"variantType": "VT_ARRAY|VT_BSTR",
"dimensions": [
2
],
"stringValues": {
"values": [
"alpha",
"beta"
]
}
}
}
},
{
"id": "raw-fallback.variant",
"expectedKind": "rawValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_RECORD",
"rawDiagnostic": "No lossless typed projection exists for this VARIANT.",
"rawDataType": 32767,
"rawValue": "AQIDBAU="
}
},
{
"id": "raw-array-fallback",
"expectedKind": "arrayValue",
"value": {
"dataType": "MX_DATA_TYPE_UNKNOWN",
"arrayValue": {
"elementDataType": "MX_DATA_TYPE_UNKNOWN",
"variantType": "VT_ARRAY|VT_VARIANT",
"dimensions": [
2
],
"rawDiagnostic": "Array elements contain mixed VARIANT types.",
"rawElementDataType": 32767,
"rawValues": {
"values": [
"AAE=",
"AgM="
]
}
}
}
}
]
}
+1
View File
@@ -16,6 +16,7 @@
],
"descriptorSet": "clients/proto/descriptors/mxaccessgw-client-v1.protoset",
"fixtureRoot": "clients/proto/fixtures/golden",
"behaviorFixtureRoot": "clients/proto/fixtures/behavior",
"generatedOutputs": {
"dotnet": "clients/dotnet/generated",
"go": "clients/go/internal/generated",
+106
View File
@@ -0,0 +1,106 @@
# Client Behavior Fixtures
Client behavior fixtures define the shared expectations used by the official
.NET, Go, Rust, Python, and Java clients. They keep wrapper behavior aligned
while each language exposes idiomatic APIs over the same protobuf contract.
## Fixture Set
The fixture manifest is `clients/proto/fixtures/behavior/manifest.json`.
`clients/proto/proto-inputs.json` references the fixture root through
`behaviorFixtureRoot` so generators and client test projects can discover the
same files they use for descriptor inputs.
The fixture set contains:
- command reply protobuf JSON,
- ordered event stream protobuf JSON samples,
- `MxValue` conversion case sets,
- `MxStatusProxy` conversion case sets,
- authentication and authorization error expectations,
- timeout and cancellation behavior expectations.
Protobuf message fixtures use protobuf JSON field names and enum values. Files
that describe client wrapper behavior use explicit JSON fields instead of a
proto message because those expectations apply above the generated transport
types.
## Command Replies
Command reply fixtures live in
`clients/proto/fixtures/behavior/command-replies/`. They parse as
`mxaccess_gateway.v1.MxCommandReply`.
Clients use these fixtures to verify that successful and failed MXAccess
commands both carry the full reply details:
- `protocolStatus`,
- `hresult`,
- `returnValue`,
- repeated `statuses`,
- method-specific reply payloads when MXAccess returns out parameters.
MXAccess failures remain command replies when the gateway reached the worker and
the worker captured HRESULT or `MXSTATUS_PROXY` details. Client wrappers should
map those replies to rich command errors without discarding the raw reply.
## Event Streams
Event stream fixtures live in
`clients/proto/fixtures/behavior/event-streams/`. Each file contains an ordered
`events` array whose entries parse as `mxaccess_gateway.v1.MxEvent`.
Clients use these fixtures to verify that stream helpers preserve
`workerSequence` order and expose each native event family:
- `OnDataChange`,
- `OnWriteComplete`,
- `OperationComplete`,
- `OnBufferedDataChange`.
Wrappers must not reorder, coalesce, or drop events while reading the fixture.
## Value And Status Conversion
Value fixtures live in `clients/proto/fixtures/behavior/values/`. Each case
contains a `value` object that parses as `mxaccess_gateway.v1.MxValue`.
Status fixtures live in `clients/proto/fixtures/behavior/statuses/`. Each case
contains a `status` object that parses as
`mxaccess_gateway.v1.MxStatusProxy`.
Clients use these fixtures to verify typed projections and raw fallback
behavior. A language helper may expose native booleans, integers, strings,
arrays, and timestamps, but it must keep `rawDiagnostic`, raw data type fields,
and raw byte payloads accessible when conversion is incomplete.
## Auth, Timeout, And Cancel Behavior
Authentication fixtures live in `clients/proto/fixtures/behavior/auth/`. They
separate `UNAUTHENTICATED` from `PERMISSION_DENIED` so clients map missing or
invalid credentials differently from missing scopes. Expected output strings
contain only redacted credentials.
Timeout and cancellation fixtures live in
`clients/proto/fixtures/behavior/timeout-cancel/`. They document that canceling
or timing out a client call stops the client from waiting, but it does not abort
an in-flight MXAccess COM call on the worker STA. Clients should follow up with
`GetSessionState` or `CloseSession` before reusing handles after an uncertain
command timeout.
## Validation
Run the fixture validation tests after changing the behavior fixture set:
```bash
powershell -ExecutionPolicy Bypass -File scripts/validate-client-behavior-fixtures.ps1
```
The script runs the focused C# contract tests that parse all protobuf JSON
fixtures and validate deterministic wrapper expectation files.
## Related Documentation
- [Client Proto Generation](./client-proto-generation.md)
- [Client Libraries Detailed Design](./client-libraries-design.md)
- [Protobuf Contracts](./Contracts.md)
+6
View File
@@ -29,6 +29,7 @@ Language-specific plans:
Shared generation inputs:
- `docs/client-proto-generation.md`
- `docs/ClientBehaviorFixtures.md`
- `clients/proto/proto-inputs.json`
Language style guides:
@@ -310,6 +311,11 @@ CLI output should support JSON for automated tests.
Unit tests must run without a live gateway. Use fake gRPC services, mock
transports, or generated test servers depending on language.
Shared behavior fixtures live in `clients/proto/fixtures/behavior`. Every
client should include tests that load the fixture manifest and verify wrapper
behavior against the common command reply, event stream, value conversion,
status conversion, auth error, and timeout/cancel cases.
Required unit test areas:
- options parsing,
+22
View File
@@ -16,6 +16,7 @@ records:
- the public and worker source files,
- the descriptor set path,
- golden fixture locations,
- behavior fixture locations,
- generated-code output directories for each planned client.
The source files listed by the manifest are:
@@ -125,9 +126,30 @@ The fixtures use protobuf JSON field names and enum values. Contract tests parse
them with the generated C# types so schema drift is caught before client
generation work starts.
## Behavior Fixtures
Cross-language behavior fixtures live in
`clients/proto/fixtures/behavior`. The manifest
`clients/proto/fixtures/behavior/manifest.json` lists command replies, ordered
event stream samples, value conversion cases, status conversion cases, auth
error expectations, and timeout/cancel expectations.
The behavior fixtures let each generated client wrapper test the same
expectations without a live gateway. Protobuf message fixtures parse with the
generated types. Auth and timeout/cancel files describe wrapper behavior above
the generated transport layer, including credential redaction and the rule that
client cancellation does not abort an in-flight MXAccess COM call.
Run the focused validation script after changing these fixtures:
```powershell
scripts/validate-client-behavior-fixtures.ps1
```
## Related Documentation
- [Protobuf Contracts](./Contracts.md)
- [Client Libraries Detailed Design](./client-libraries-design.md)
- [Client Behavior Fixtures](./ClientBehaviorFixtures.md)
- [Client Libraries Implementation Plan](./implementation-plan-clients.md)
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
@@ -277,6 +277,8 @@ Live tests:
Labels: `area:worker`, `type:feature`, `priority:p0`
Status: implemented.
Deliverables:
- handlers for `OnDataChange`,
+22 -3
View File
@@ -348,9 +348,28 @@ Event handling rules:
- Enqueue to the outbound event queue.
- Return quickly to preserve message pumping.
If event conversion throws, catch it inside the event handler, enqueue a
structured `WorkerFault` or diagnostic event, and keep the worker alive only if
the fault policy allows it.
`MxAccessBaseEventSink` implements the COM connection-point handlers and keeps
the handlers limited to event argument conversion plus enqueue. It uses
`MxAccessEventMapper` to create `MxEvent` DTOs for `OnDataChange`,
`OnWriteComplete`, `OperationComplete`, and `OnBufferedDataChange`. The mapper
converts scalar and array values through `VariantConverter`, converts
`MXSTATUS_PROXY[]` through `MxStatusProxyConverter`, and maps installed
`MxDataType` values to the public protobuf enum while preserving the raw data
type on buffered events. `OperationComplete` is only emitted from the native
`OperationComplete` handler; write completion does not synthesize it.
`MxAccessEventQueue` is the bounded outbound event queue for one worker
session. It assigns the monotonic `WorkerSequence` and `WorkerTimestamp` when an
event is accepted, preserving the order in which MXAccess handlers enqueue
events. The default capacity is `10000`. When the queue reaches capacity it
records a `WorkerFaultCategory.QueueOverflow` fault and rejects further events.
The event handler catches conversion and enqueue failures, records the first
fault on the queue, and returns to the STA message pump instead of writing to
the pipe.
If event conversion throws, catch it inside the event handler, record a
structured `WorkerFault`, and keep the worker alive only if the fault policy
allows it.
## Command Queue
@@ -0,0 +1,26 @@
[CmdletBinding()]
param(
[switch]$NoBuild
)
Set-StrictMode -Version Latest
$ErrorActionPreference = "Stop"
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
$testProject = Join-Path $repoRoot "src/MxGateway.Tests/MxGateway.Tests.csproj"
$arguments = @(
"test",
$testProject,
"--filter",
"ClientBehaviorFixtureTests"
)
if ($NoBuild) {
$arguments += "--no-build"
}
& dotnet @arguments
if ($LASTEXITCODE -ne 0) {
throw "Client behavior fixture validation failed with exit code $LASTEXITCODE."
}
@@ -0,0 +1,379 @@
using System.Text.Json;
using Google.Protobuf;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
namespace MxGateway.Tests.Contracts;
public sealed class ClientBehaviorFixtureTests
{
private static readonly JsonParser ProtobufJsonParser = new(JsonParser.Settings.Default);
[Fact]
public void BehaviorManifest_DeclaresCurrentProtocolVersionsAndExistingFixtures()
{
using JsonDocument manifest = LoadBehaviorManifest();
JsonElement root = manifest.RootElement;
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
Assert.Equal("mxaccess-gateway-client-behavior", root.GetProperty("fixtureSet").GetString());
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
HashSet<string> fixtureIds = new(StringComparer.Ordinal);
foreach (JsonElement fixture in root.GetProperty("fixtures").EnumerateArray())
{
string id = fixture.GetProperty("id").GetString()!;
string path = fixture.GetProperty("path").GetString()!;
string category = fixture.GetProperty("category").GetString()!;
string messageType = fixture.GetProperty("messageType").GetString()!;
Assert.True(fixtureIds.Add(id), $"Duplicate behavior fixture id '{id}'.");
Assert.Contains(category, KnownCategories);
Assert.Contains(messageType, KnownMessageTypes);
Assert.True(
File.Exists(Path.Combine(GetBehaviorFixtureRoot().FullName, path)),
$"Expected behavior fixture '{path}' to exist.");
Assert.False(Path.IsPathRooted(path), $"Fixture path '{path}' must be relative.");
Assert.NotEmpty(fixture.GetProperty("expectation").GetString()!);
}
}
[Fact]
public void ProtoInputManifest_ReferencesBehaviorFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
string manifestPath = Path.Combine(repositoryRoot.FullName, "clients", "proto", "proto-inputs.json");
using JsonDocument manifest = JsonDocument.Parse(File.ReadAllText(manifestPath));
string fixtureRoot = manifest.RootElement.GetProperty("behaviorFixtureRoot").GetString()!;
Assert.Equal("clients/proto/fixtures/behavior", fixtureRoot);
Assert.True(Directory.Exists(Path.Combine(repositoryRoot.FullName, fixtureRoot)));
}
[Fact]
public void CommandReplyFixtures_ParseWithCurrentContractAndPreserveMxAccessDetails()
{
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("command_replies");
Assert.NotEmpty(fixtures);
foreach (JsonElement fixture in fixtures)
{
MxCommandReply reply = ParseFixture<MxCommandReply>(
fixture,
MxCommandReply.Parser);
Assert.NotEqual(MxCommandKind.Unspecified, reply.Kind);
Assert.NotEqual(ProtocolStatusCode.Unspecified, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult, $"Fixture '{GetFixtureId(fixture)}' must carry an HRESULT.");
Assert.NotEmpty(reply.Statuses);
Assert.NotEqual(MxDataType.Unspecified, reply.ReturnValue.DataType);
Assert.True(
reply.ReturnValue.KindCase != MxValue.KindOneofCase.None || reply.ReturnValue.IsNull,
$"Fixture '{GetFixtureId(fixture)}' must carry a typed value, raw value, or explicit null.");
}
MxCommandReply failedWrite = ParseFixture<MxCommandReply>(
Assert.Single(fixtures, fixture => GetFixtureId(fixture) == "command-reply.write.mxaccess-failure"),
MxCommandReply.Parser);
Assert.Equal(ProtocolStatusCode.MxaccessFailure, failedWrite.ProtocolStatus.Code);
Assert.Equal(-2147220992, failedWrite.Hresult);
Assert.True(failedWrite.Statuses.Count > 1);
Assert.All(failedWrite.Statuses, status => Assert.Equal(0, status.Success));
}
[Fact]
public void EventStreamFixtures_ParseWithMonotonicSequencesAndExpectedFamilies()
{
IReadOnlyList<JsonElement> fixtures = LoadManifestFixtures("event_streams");
Assert.NotEmpty(fixtures);
foreach (JsonElement fixture in fixtures)
{
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
ulong previousSequence = 0;
List<MxEventFamily> families = [];
foreach (JsonElement eventElement in document.RootElement.GetProperty("events").EnumerateArray())
{
MxEvent gatewayEvent = ProtobufJsonParser.Parse<MxEvent>(eventElement.GetRawText());
Assert.True(gatewayEvent.WorkerSequence > previousSequence);
Assert.Equal(document.RootElement.GetProperty("sessionId").GetString(), gatewayEvent.SessionId);
Assert.NotEmpty(gatewayEvent.Statuses);
AssertEventBodyMatchesFamily(gatewayEvent);
previousSequence = gatewayEvent.WorkerSequence;
families.Add(gatewayEvent.Family);
}
Assert.Contains(MxEventFamily.OnDataChange, families);
Assert.Contains(MxEventFamily.OnWriteComplete, families);
Assert.Contains(MxEventFamily.OperationComplete, families);
Assert.Contains(MxEventFamily.OnBufferedDataChange, families);
}
}
[Fact]
public void ValueConversionFixtures_ParseTypedValuesAndRawFallbacks()
{
JsonElement cases = LoadCaseSet("value_conversion", "cases");
bool sawRawFallback = false;
bool sawRawArrayFallback = false;
bool sawTypedArray = false;
foreach (JsonElement valueCase in cases.EnumerateArray())
{
MxValue value = ProtobufJsonParser.Parse<MxValue>(
valueCase.GetProperty("value").GetRawText());
string expectedKind = valueCase.GetProperty("expectedKind").GetString()!;
Assert.NotEqual(MxDataType.Unspecified, value.DataType);
AssertJsonKindMatchesValueKind(expectedKind, value);
sawRawFallback |= value.KindCase == MxValue.KindOneofCase.RawValue
&& !string.IsNullOrWhiteSpace(value.RawDiagnostic)
&& value.RawDataType != 0;
sawRawArrayFallback |= value.KindCase == MxValue.KindOneofCase.ArrayValue
&& value.ArrayValue.ValuesCase == MxArray.ValuesOneofCase.RawValues
&& !string.IsNullOrWhiteSpace(value.ArrayValue.RawDiagnostic)
&& value.ArrayValue.RawElementDataType != 0;
sawTypedArray |= value.KindCase == MxValue.KindOneofCase.ArrayValue
&& value.ArrayValue.ValuesCase != MxArray.ValuesOneofCase.RawValues;
}
Assert.True(sawRawFallback, "Expected at least one raw scalar fallback case.");
Assert.True(sawRawArrayFallback, "Expected at least one raw array fallback case.");
Assert.True(sawTypedArray, "Expected at least one typed array case.");
}
[Fact]
public void StatusConversionFixtures_ParseStatusArraysAndRawFields()
{
JsonElement cases = LoadCaseSet("status_conversion", "cases");
bool sawRawUnknown = false;
foreach (JsonElement statusCase in cases.EnumerateArray())
{
MxStatusProxy status = ProtobufJsonParser.Parse<MxStatusProxy>(
statusCase.GetProperty("status").GetRawText());
Assert.NotEqual(MxStatusCategory.Unspecified, status.Category);
Assert.NotEqual(MxStatusSource.Unspecified, status.DetectedBy);
Assert.NotEmpty(status.DiagnosticText);
sawRawUnknown |= status.Category == MxStatusCategory.Unknown
&& status.RawCategory != 0
&& status.RawDetectedBy != 0;
}
Assert.True(sawRawUnknown, "Expected a status case with unknown raw native fields.");
}
[Fact]
public void AuthErrorFixtures_MapAuthenticationAuthorizationAndRedactCredentials()
{
JsonElement cases = LoadCaseSet("auth_errors", "cases");
HashSet<string> statusCodes = new(StringComparer.Ordinal);
foreach (JsonElement authCase in cases.EnumerateArray())
{
string grpcStatusCode = authCase.GetProperty("grpcStatusCode").GetString()!;
string category = authCase.GetProperty("clientErrorCategory").GetString()!;
string redactedOutput = authCase.GetProperty("expectedRedactedOutput").GetString()!;
string serialized = authCase.GetRawText();
Assert.Contains(grpcStatusCode, AuthGrpcStatusCodes);
Assert.Contains(category, AuthClientErrorCategories);
string authorization = authCase.GetProperty("inputMetadata").GetProperty("authorization").GetString()!;
if (!string.IsNullOrEmpty(authorization))
{
Assert.Contains("<redacted>", serialized);
}
Assert.DoesNotContain("mxgw_", serialized, StringComparison.Ordinal);
Assert.DoesNotContain("secret", redactedOutput, StringComparison.OrdinalIgnoreCase);
statusCodes.Add(grpcStatusCode);
}
Assert.Contains("UNAUTHENTICATED", statusCodes);
Assert.Contains("PERMISSION_DENIED", statusCodes);
}
[Fact]
public void TimeoutCancelFixtures_DocumentClientWaitAndWorkerCommandBehavior()
{
JsonElement cases = LoadCaseSet("timeout_cancel", "cases");
HashSet<string> statusCodes = new(StringComparer.Ordinal);
foreach (JsonElement timeoutCase in cases.EnumerateArray())
{
string grpcStatusCode = timeoutCase.GetProperty("grpcStatusCode").GetString()!;
Assert.Contains(grpcStatusCode, TimeoutGrpcStatusCodes);
Assert.NotEmpty(timeoutCase.GetProperty("clientDeadline").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("gatewayWaitBehavior").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("workerCommandBehavior").GetString()!);
Assert.NotEmpty(timeoutCase.GetProperty("expectedClientAction").GetString()!);
statusCodes.Add(grpcStatusCode);
}
Assert.Contains("DEADLINE_EXCEEDED", statusCodes);
Assert.Contains("CANCELLED", statusCodes);
}
private static readonly string[] KnownCategories =
[
"command_replies",
"event_streams",
"value_conversion",
"status_conversion",
"auth_errors",
"timeout_cancel",
];
private static readonly string[] KnownMessageTypes =
[
"mxaccess_gateway.v1.MxCommandReply",
"mxaccess_gateway.v1.MxEvent",
"mxaccess_gateway.v1.MxValue",
"mxaccess_gateway.v1.MxStatusProxy",
"client_behavior.v1.AuthErrorCase",
"client_behavior.v1.TimeoutCancelCase",
];
private static readonly string[] AuthGrpcStatusCodes =
[
"UNAUTHENTICATED",
"PERMISSION_DENIED",
];
private static readonly string[] AuthClientErrorCategories =
[
"AuthenticationError",
"AuthorizationError",
];
private static readonly string[] TimeoutGrpcStatusCodes =
[
"DEADLINE_EXCEEDED",
"CANCELLED",
];
private static T ParseFixture<T>(
JsonElement fixture,
MessageParser<T> parser)
where T : IMessage<T>
{
return parser.ParseJson(File.ReadAllText(GetFixturePath(fixture)));
}
private static JsonElement LoadCaseSet(
string category,
string propertyName)
{
JsonElement fixture = Assert.Single(LoadManifestFixtures(category));
using JsonDocument document = JsonDocument.Parse(File.ReadAllText(GetFixturePath(fixture)));
return document.RootElement.GetProperty(propertyName).Clone();
}
private static IReadOnlyList<JsonElement> LoadManifestFixtures(string category)
{
using JsonDocument manifest = LoadBehaviorManifest();
return manifest.RootElement
.GetProperty("fixtures")
.EnumerateArray()
.Where(fixture => fixture.GetProperty("category").GetString() == category)
.Select(fixture => fixture.Clone())
.ToArray();
}
private static JsonDocument LoadBehaviorManifest()
{
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetBehaviorFixtureRoot().FullName, "manifest.json")));
}
private static string GetFixturePath(JsonElement fixture)
{
return Path.Combine(GetBehaviorFixtureRoot().FullName, fixture.GetProperty("path").GetString()!);
}
private static string GetFixtureId(JsonElement fixture)
{
return fixture.GetProperty("id").GetString()!;
}
private static DirectoryInfo GetBehaviorFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "behavior"));
}
private static DirectoryInfo FindRepositoryRoot()
{
DirectoryInfo? current = new(AppContext.BaseDirectory);
while (current is not null)
{
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
&& Directory.Exists(Path.Combine(current.FullName, "src"))
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
{
return current;
}
current = current.Parent;
}
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
}
private static void AssertEventBodyMatchesFamily(MxEvent gatewayEvent)
{
switch (gatewayEvent.Family)
{
case MxEventFamily.OnDataChange:
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, gatewayEvent.BodyCase);
break;
case MxEventFamily.OnWriteComplete:
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, gatewayEvent.BodyCase);
break;
case MxEventFamily.OperationComplete:
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, gatewayEvent.BodyCase);
break;
case MxEventFamily.OnBufferedDataChange:
Assert.Equal(MxEvent.BodyOneofCase.OnBufferedDataChange, gatewayEvent.BodyCase);
break;
default:
throw new InvalidOperationException($"Unexpected event family '{gatewayEvent.Family}'.");
}
}
private static void AssertJsonKindMatchesValueKind(
string expectedKind,
MxValue value)
{
MxValue.KindOneofCase expected = expectedKind switch
{
"boolValue" => MxValue.KindOneofCase.BoolValue,
"int32Value" => MxValue.KindOneofCase.Int32Value,
"int64Value" => MxValue.KindOneofCase.Int64Value,
"floatValue" => MxValue.KindOneofCase.FloatValue,
"doubleValue" => MxValue.KindOneofCase.DoubleValue,
"stringValue" => MxValue.KindOneofCase.StringValue,
"timestampValue" => MxValue.KindOneofCase.TimestampValue,
"arrayValue" => MxValue.KindOneofCase.ArrayValue,
"rawValue" => MxValue.KindOneofCase.RawValue,
_ => throw new InvalidOperationException($"Unexpected expected value kind '{expectedKind}'."),
};
Assert.Equal(expected, value.KindCase);
}
}
@@ -842,7 +842,9 @@ public sealed class MxAccessCommandExecutorTests
private sealed class NoopEventSink : IMxAccessEventSink
{
public void Attach(object mxAccessComObject)
public void Attach(
object mxAccessComObject,
string sessionId)
{
}
@@ -0,0 +1,117 @@
using System;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessEventMapperTests
{
private readonly MxAccessEventMapper mapper = new();
[Fact]
public void CreateOnDataChange_ConvertsValueTimestampQualityAndStatuses()
{
DateTime timestamp = new(2026, 4, 26, 12, 30, 0, DateTimeKind.Utc);
FakeStatus[] statuses =
{
new()
{
success = -1,
category = 0,
detectedBy = 5,
detail = 0,
},
};
MxEvent mxEvent = mapper.CreateOnDataChange(
"session-1",
serverHandle: 12,
itemHandle: 34,
value: 42,
quality: 192,
timestamp: timestamp,
statuses: statuses);
Assert.Equal(MxEventFamily.OnDataChange, mxEvent.Family);
Assert.Equal("session-1", mxEvent.SessionId);
Assert.Equal(12, mxEvent.ServerHandle);
Assert.Equal(34, mxEvent.ItemHandle);
Assert.Equal(42, mxEvent.Value.Int32Value);
Assert.Equal(192, mxEvent.Quality);
Assert.Equal(timestamp, mxEvent.SourceTimestamp.ToDateTime());
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, mxEvent.BodyCase);
MxStatusProxy status = Assert.Single(mxEvent.Statuses);
Assert.Equal(-1, status.Success);
Assert.Equal(MxStatusCategory.Ok, status.Category);
Assert.Equal(MxStatusSource.RespondingAutomationObject, status.DetectedBy);
}
[Fact]
public void CreateOnWriteCompleteAndOperationComplete_PreservesDistinctFamilies()
{
MxEvent writeComplete = mapper.CreateOnWriteComplete(
"session-1",
serverHandle: 1,
itemHandle: 2,
statuses: Array.Empty<FakeStatus>());
MxEvent operationComplete = mapper.CreateOperationComplete(
"session-1",
serverHandle: 1,
itemHandle: 2,
statuses: Array.Empty<FakeStatus>());
Assert.Equal(MxEventFamily.OnWriteComplete, writeComplete.Family);
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, writeComplete.BodyCase);
Assert.Equal(MxEventFamily.OperationComplete, operationComplete.Family);
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, operationComplete.BodyCase);
}
[Fact]
public void CreateOnBufferedDataChange_PreservesRawDataTypeAndArrayMetadata()
{
DateTime firstTimestamp = new(2026, 4, 26, 13, 0, 0, DateTimeKind.Utc);
DateTime secondTimestamp = new(2026, 4, 26, 13, 1, 0, DateTimeKind.Utc);
MxEvent mxEvent = mapper.CreateOnBufferedDataChange(
"session-1",
serverHandle: 10,
itemHandle: 20,
rawDataType: 2,
value: new[] { 7, 8 },
quality: new[] { 192, 0 },
timestamp: new[] { firstTimestamp, secondTimestamp },
statuses: null);
Assert.Equal(MxEventFamily.OnBufferedDataChange, mxEvent.Family);
Assert.Equal(MxDataType.Integer, mxEvent.OnBufferedDataChange.DataType);
Assert.Equal(2, mxEvent.OnBufferedDataChange.RawDataType);
Assert.Equal(MxDataType.Integer, mxEvent.Value.ArrayValue.ElementDataType);
Assert.Equal(new[] { 7, 8 }, mxEvent.Value.ArrayValue.Int32Values.Values);
Assert.Equal(new[] { 192, 0 }, mxEvent.OnBufferedDataChange.QualityValues.Int32Values.Values);
Assert.Equal(2, mxEvent.OnBufferedDataChange.TimestampValues.TimestampValues.Values.Count);
}
[Theory]
[InlineData(-1, MxDataType.Unknown)]
[InlineData(0, MxDataType.NoData)]
[InlineData(1, MxDataType.Boolean)]
[InlineData(2, MxDataType.Integer)]
[InlineData(6, MxDataType.Time)]
[InlineData(15, MxDataType.InternationalizedString)]
[InlineData(999, MxDataType.Unknown)]
public void MapMxDataType_MapsInstalledMxAccessValues(
int rawDataType,
MxDataType expectedDataType)
{
Assert.Equal(expectedDataType, MxAccessEventMapper.MapMxDataType(rawDataType));
}
private sealed class FakeStatus
{
public int success;
public int category;
public int detectedBy;
public int detail;
}
}
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessEventQueueTests
{
[Fact]
public void Enqueue_AssignsMonotonicWorkerSequencesAndPreservesOrder()
{
MxAccessEventQueue queue = new(capacity: 4);
WorkerEvent first = queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
WorkerEvent second = queue.Enqueue(CreateEvent(MxEventFamily.OnWriteComplete, itemHandle: 11));
Assert.Equal(1UL, first.Event.WorkerSequence);
Assert.Equal(2UL, second.Event.WorkerSequence);
Assert.NotNull(first.Event.WorkerTimestamp);
Assert.Equal(2, queue.Count);
Assert.Equal(2UL, queue.LastEventSequence);
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedFirst));
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedSecond));
Assert.Equal(10, dequeuedFirst?.Event.ItemHandle);
Assert.Equal(11, dequeuedSecond?.Event.ItemHandle);
Assert.False(queue.TryDequeue(out _));
}
[Fact]
public void Drain_RemovesAtMostRequestedEvents()
{
MxAccessEventQueue queue = new(capacity: 4);
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11));
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12));
IReadOnlyList<WorkerEvent> drained = queue.Drain(maxEvents: 2);
Assert.Equal(2, drained.Count);
Assert.Equal(10, drained[0].Event.ItemHandle);
Assert.Equal(11, drained[1].Event.ItemHandle);
Assert.Equal(1, queue.Count);
}
[Fact]
public void Enqueue_WhenCapacityIsExceeded_RecordsOverflowFaultAndRejectsNewEvents()
{
MxAccessEventQueue queue = new(capacity: 1);
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
MxAccessEventQueueOverflowException overflow = Assert.Throws<MxAccessEventQueueOverflowException>(
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11)));
Assert.Equal(1, overflow.Capacity);
Assert.True(queue.IsFaulted);
Assert.Equal(WorkerFaultCategory.QueueOverflow, queue.Fault?.Category);
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, queue.Fault?.ProtocolStatus.Code);
Assert.Throws<InvalidOperationException>(
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12)));
}
[Fact]
public void RecordFault_KeepsFirstFault()
{
MxAccessEventQueue queue = new(capacity: 1);
queue.RecordFault(new WorkerFault
{
Category = WorkerFaultCategory.MxaccessEventConversionFailed,
});
queue.RecordFault(new WorkerFault
{
Category = WorkerFaultCategory.QueueOverflow,
});
Assert.True(queue.IsFaulted);
Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, queue.Fault?.Category);
}
private static MxEvent CreateEvent(
MxEventFamily family,
int itemHandle)
{
MxEvent mxEvent = new()
{
Family = family,
SessionId = "session-1",
ServerHandle = 1,
ItemHandle = itemHandle,
};
switch (family)
{
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
default:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
}
return mxEvent;
}
}
@@ -18,7 +18,7 @@ public sealed class MxAccessStaSessionTests
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
WorkerReady ready = await session.StartAsync(workerProcessId: 1234);
WorkerReady ready = await session.StartAsync("session-1", workerProcessId: 1234);
Assert.Equal(1234, ready.WorkerProcessId);
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
@@ -28,6 +28,7 @@ public sealed class MxAccessStaSessionTests
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
Assert.Equal("session-1", eventSink.SessionId);
}
[Fact]
@@ -107,10 +108,15 @@ public sealed class MxAccessStaSessionTests
public int? DetachThreadId { get; private set; }
public void Attach(object mxAccessComObject)
public string? SessionId { get; private set; }
public void Attach(
object mxAccessComObject,
string sessionId)
{
AttachedObject = mxAccessComObject;
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
SessionId = sessionId;
}
public void Detach()
@@ -235,7 +235,7 @@ public sealed class WorkerPipeSession
try
{
return await _mxAccessStaSession
.StartAsync(_processIdProvider(), cancellationToken)
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
@@ -2,7 +2,9 @@ namespace MxGateway.Worker.MxAccess;
public interface IMxAccessEventSink
{
void Attach(object mxAccessComObject);
void Attach(
object mxAccessComObject,
string sessionId);
void Detach();
}
@@ -1,13 +1,39 @@
using System;
using ArchestrA.MxAccess;
using Proto = MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
{
private readonly MxAccessEventMapper eventMapper;
private readonly MxAccessEventQueue eventQueue;
private LMXProxyServerClass? server;
private string sessionId = string.Empty;
public void Attach(object mxAccessComObject)
public MxAccessBaseEventSink()
: this(new MxAccessEventQueue())
{
}
public MxAccessBaseEventSink(MxAccessEventQueue eventQueue)
: this(eventQueue, new MxAccessEventMapper())
{
}
public MxAccessBaseEventSink(
MxAccessEventQueue eventQueue,
MxAccessEventMapper eventMapper)
{
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
this.eventMapper = eventMapper ?? throw new ArgumentNullException(nameof(eventMapper));
}
public void Attach(
object mxAccessComObject,
string sessionId)
{
this.sessionId = sessionId ?? string.Empty;
server = (LMXProxyServerClass)mxAccessComObject;
server.OnDataChange += OnDataChange;
server.OnWriteComplete += OnWriteComplete;
@@ -27,9 +53,10 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
server.OperationComplete -= OperationComplete;
server.OnBufferedDataChange -= OnBufferedDataChange;
server = null;
sessionId = string.Empty;
}
private static void OnDataChange(
private void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
@@ -37,23 +64,44 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses));
}
private static void OnWriteComplete(
private void OnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnWriteComplete(
sessionId,
hLMXServerHandle,
phItemHandle,
statuses));
}
private static void OperationComplete(
private void OperationComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOperationComplete(
sessionId,
hLMXServerHandle,
phItemHandle,
statuses));
}
private static void OnBufferedDataChange(
private void OnBufferedDataChange(
int hLMXServerHandle,
int phItemHandle,
MxDataType dtDataType,
@@ -62,5 +110,42 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnBufferedDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
(int)dtDataType,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses));
}
private void EnqueueEvent(Func<Proto.MxEvent> createEvent)
{
try
{
eventQueue.Enqueue(createEvent());
}
catch (Exception exception)
{
eventQueue.RecordFault(CreateEventConversionFault(exception));
}
}
private Proto.WorkerFault CreateEventConversionFault(Exception exception)
{
return new Proto.WorkerFault
{
Category = Proto.WorkerFaultCategory.MxaccessEventConversionFailed,
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = $"{exception.GetType().FullName}: HRESULT 0x{unchecked((uint)exception.HResult):X8}",
ProtocolStatus = new Proto.ProtocolStatus
{
Code = Proto.ProtocolStatusCode.MxaccessFailure,
Message = "MXAccess event conversion failed.",
},
};
}
}
@@ -0,0 +1,221 @@
using System;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Conversion;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventMapper
{
private readonly VariantConverter variantConverter;
private readonly MxStatusProxyConverter statusProxyConverter;
public MxAccessEventMapper()
: this(new VariantConverter(), new MxStatusProxyConverter())
{
}
public MxAccessEventMapper(
VariantConverter variantConverter,
MxStatusProxyConverter statusProxyConverter)
{
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
this.statusProxyConverter = statusProxyConverter ?? throw new ArgumentNullException(nameof(statusProxyConverter));
}
public MxEvent CreateOnDataChange(
string sessionId,
int serverHandle,
int itemHandle,
object? value,
int quality,
object? timestamp,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnDataChange,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.Value = variantConverter.Convert(value);
mxEvent.Quality = quality;
ApplySourceTimestamp(mxEvent, timestamp);
mxEvent.OnDataChange = new OnDataChangeEvent();
return mxEvent;
}
public MxEvent CreateOnWriteComplete(
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnWriteComplete,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
return mxEvent;
}
public MxEvent CreateOperationComplete(
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OperationComplete,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.OperationComplete = new OperationCompleteEvent();
return mxEvent;
}
public MxEvent CreateOnBufferedDataChange(
string sessionId,
int serverHandle,
int itemHandle,
int rawDataType,
object? value,
object? quality,
object? timestamp,
Array? statuses)
{
MxDataType dataType = MapMxDataType(rawDataType);
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnBufferedDataChange,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.Value = variantConverter.Convert(value, dataType);
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent
{
DataType = dataType,
RawDataType = rawDataType,
QualityValues = ConvertBufferedArray(quality, MxDataType.Integer),
TimestampValues = ConvertBufferedArray(timestamp, MxDataType.Time),
};
return mxEvent;
}
public static MxDataType MapMxDataType(int rawDataType)
{
return rawDataType switch
{
-1 => MxDataType.Unknown,
0 => MxDataType.NoData,
1 => MxDataType.Boolean,
2 => MxDataType.Integer,
3 => MxDataType.Float,
4 => MxDataType.Double,
5 => MxDataType.String,
6 => MxDataType.Time,
7 => MxDataType.ElapsedTime,
8 => MxDataType.ReferenceType,
9 => MxDataType.StatusType,
10 => MxDataType.Enum,
11 => MxDataType.SecurityClassificationEnum,
12 => MxDataType.DataQualityType,
13 => MxDataType.QualifiedEnum,
14 => MxDataType.QualifiedStruct,
15 => MxDataType.InternationalizedString,
16 => MxDataType.BigString,
17 => MxDataType.End,
_ => MxDataType.Unknown,
};
}
private MxEvent CreateBaseEvent(
MxEventFamily family,
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = new()
{
Family = family,
SessionId = sessionId ?? string.Empty,
ServerHandle = serverHandle,
ItemHandle = itemHandle,
};
mxEvent.Statuses.Add(statusProxyConverter.ConvertMany(statuses));
return mxEvent;
}
private void ApplySourceTimestamp(
MxEvent mxEvent,
object? timestamp)
{
MxValue convertedTimestamp = variantConverter.Convert(timestamp, MxDataType.Time);
if (convertedTimestamp.KindCase == MxValue.KindOneofCase.TimestampValue)
{
mxEvent.SourceTimestamp = convertedTimestamp.TimestampValue;
return;
}
if (!string.IsNullOrWhiteSpace(convertedTimestamp.RawDiagnostic))
{
mxEvent.RawStatus = string.IsNullOrWhiteSpace(mxEvent.RawStatus)
? convertedTimestamp.RawDiagnostic
: $"{mxEvent.RawStatus}; {convertedTimestamp.RawDiagnostic}";
}
}
private MxArray ConvertBufferedArray(
object? value,
MxDataType expectedElementDataType)
{
if (value is Array array)
{
return variantConverter.ConvertArray(array, expectedElementDataType);
}
MxValue converted = variantConverter.Convert(value, expectedElementDataType);
if (converted.KindCase == MxValue.KindOneofCase.ArrayValue)
{
return converted.ArrayValue;
}
MxArray mxArray = new()
{
ElementDataType = converted.DataType,
VariantType = converted.VariantType,
RawElementDataType = converted.RawDataType,
RawDiagnostic = string.IsNullOrWhiteSpace(converted.RawDiagnostic)
? "Buffered MXAccess event argument was not a SAFEARRAY."
: converted.RawDiagnostic,
};
switch (converted.KindCase)
{
case MxValue.KindOneofCase.Int32Value:
mxArray.Int32Values = new Int32Array();
mxArray.Int32Values.Values.Add(converted.Int32Value);
break;
case MxValue.KindOneofCase.Int64Value:
mxArray.Int64Values = new Int64Array();
mxArray.Int64Values.Values.Add(converted.Int64Value);
break;
case MxValue.KindOneofCase.TimestampValue:
mxArray.TimestampValues = new TimestampArray();
mxArray.TimestampValues.Values.Add(converted.TimestampValue);
break;
}
return mxArray;
}
}
@@ -0,0 +1,180 @@
using System;
using System.Collections.Generic;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventQueue
{
public const int DefaultCapacity = 10000;
private readonly int capacity;
private readonly Queue<WorkerEvent> events;
private readonly object syncRoot = new();
private ulong lastEventSequence;
private WorkerFault? fault;
public MxAccessEventQueue()
: this(DefaultCapacity)
{
}
public MxAccessEventQueue(int capacity)
{
if (capacity <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(capacity),
"MXAccess event queue capacity must be greater than zero.");
}
this.capacity = capacity;
events = new Queue<WorkerEvent>(capacity);
}
public int Capacity => capacity;
public int Count
{
get
{
lock (syncRoot)
{
return events.Count;
}
}
}
public ulong LastEventSequence
{
get
{
lock (syncRoot)
{
return lastEventSequence;
}
}
}
public bool IsFaulted
{
get
{
lock (syncRoot)
{
return fault is not null;
}
}
}
public WorkerFault? Fault
{
get
{
lock (syncRoot)
{
return fault?.Clone();
}
}
}
public WorkerEvent Enqueue(MxEvent mxEvent)
{
if (mxEvent is null)
{
throw new ArgumentNullException(nameof(mxEvent));
}
lock (syncRoot)
{
if (fault is not null)
{
throw new InvalidOperationException("MXAccess outbound event queue is faulted.");
}
if (events.Count >= capacity)
{
fault = CreateOverflowFault();
throw new MxAccessEventQueueOverflowException(capacity);
}
MxEvent queuedEvent = mxEvent.Clone();
queuedEvent.WorkerSequence = ++lastEventSequence;
queuedEvent.WorkerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow);
WorkerEvent workerEvent = new()
{
Event = queuedEvent,
};
events.Enqueue(workerEvent);
return workerEvent.Clone();
}
}
public bool TryDequeue(out WorkerEvent? workerEvent)
{
lock (syncRoot)
{
if (events.Count == 0)
{
workerEvent = null;
return false;
}
workerEvent = events.Dequeue().Clone();
return true;
}
}
public IReadOnlyList<WorkerEvent> Drain(uint maxEvents)
{
lock (syncRoot)
{
int drainCount = maxEvents == 0
? events.Count
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
if (drainCount == 0)
{
return Array.Empty<WorkerEvent>();
}
List<WorkerEvent> drained = new(drainCount);
for (int index = 0; index < drainCount; index++)
{
drained.Add(events.Dequeue().Clone());
}
return drained;
}
}
public void RecordFault(WorkerFault workerFault)
{
if (workerFault is null)
{
throw new ArgumentNullException(nameof(workerFault));
}
lock (syncRoot)
{
fault ??= workerFault.Clone();
}
}
private WorkerFault CreateOverflowFault()
{
string message = $"MXAccess outbound event queue reached capacity {capacity}.";
return new WorkerFault
{
Category = WorkerFaultCategory.QueueOverflow,
DiagnosticMessage = message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
},
};
}
}
@@ -0,0 +1,14 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventQueueOverflowException : Exception
{
public MxAccessEventQueueOverflowException(int capacity)
: base($"MXAccess outbound event queue reached its configured capacity of {capacity}.")
{
Capacity = capacity;
}
public int Capacity { get; }
}
@@ -44,7 +44,8 @@ public sealed class MxAccessSession : IDisposable
public static MxAccessSession Create(
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
IMxAccessEventSink eventSink,
string sessionId)
{
if (factory is null)
{
@@ -66,7 +67,7 @@ public sealed class MxAccessSession : IDisposable
throw new InvalidOperationException("MXAccess COM factory returned null.");
}
eventSink.Attach(mxAccessComObject);
eventSink.Attach(mxAccessComObject, sessionId);
return new MxAccessSession(
mxAccessComObject,
@@ -11,6 +11,7 @@ public sealed class MxAccessStaSession : IDisposable
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly MxAccessEventQueue eventQueue;
private readonly StaRuntime staRuntime;
private StaCommandDispatcher? commandDispatcher;
private MxAccessSession? session;
@@ -20,7 +21,7 @@ public sealed class MxAccessStaSession : IDisposable
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessBaseEventSink())
new MxAccessEventQueue())
{
}
@@ -28,13 +29,41 @@ public sealed class MxAccessStaSession : IDisposable
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
: this(staRuntime, factory, eventSink, new MxAccessEventQueue())
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
MxAccessEventQueue eventQueue)
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue)
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink,
MxAccessEventQueue eventQueue)
{
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
}
public MxAccessEventQueue EventQueue => eventQueue;
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
return StartAsync(string.Empty, workerProcessId, cancellationToken);
}
public Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
@@ -48,7 +77,7 @@ public sealed class MxAccessStaSession : IDisposable
throw new InvalidOperationException("MXAccess COM session has already been created.");
}
session = MxAccessSession.Create(factory, eventSink);
session = MxAccessSession.Create(factory, eventSink, sessionId);
commandDispatcher = new StaCommandDispatcher(
staRuntime,
new MxAccessCommandExecutor(session));
@@ -68,6 +97,11 @@ public sealed class MxAccessStaSession : IDisposable
return commandDispatcher.DispatchAsync(command);
}
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
{
return eventQueue.Drain(maxEvents);
}
public Task<IReadOnlyList<RegisteredServerHandle>> GetRegisteredServerHandlesAsync(
CancellationToken cancellationToken = default)
{