Compare commits

...

5 Commits

Author SHA1 Message Date
Joseph Doherty c7e4c4b614 Merge remote-tracking branch 'origin/main' into agent-2/issue-27-implement-additem-additem2-removeitem 2026-04-26 18:27:00 -04:00
Joseph Doherty 59c710d789 Implement worker AddItem commands 2026-04-26 18:26:44 -04:00
dohertj2 862f119b91 Merge pull request #77 from agent-3/issue-18-build-fake-worker-test-harness
Issue #18: build fake worker test harness
2026-04-26 18:25:00 -04:00
Joseph Doherty 35e4442c7b Build fake worker test harness 2026-04-26 18:20:45 -04:00
dohertj2 ed1018c3bb Merge pull request #76 from agent-1/issue-17-implement-dashboard-authentication
Issue #17: implement dashboard authentication
2026-04-26 18:18:43 -04:00
16 changed files with 1643 additions and 2 deletions
+51
View File
@@ -0,0 +1,51 @@
# Gateway Testing
Gateway tests run without installed MXAccess by using fake workers, fake
transports, and in-process gRPC service fakes. Live MXAccess verification belongs
in opt-in integration tests because it depends on installed COM components and
provider state.
## Fake Worker Harness
`FakeWorkerHarness` in `src/MxGateway.Tests/Gateway/Workers/Fakes/` provides an
in-process worker side for named-pipe IPC tests. It uses the same
`WorkerFrameReader`, `WorkerFrameWriter`, and `WorkerEnvelope` contract as the
gateway so tests exercise real frame validation and worker-client state changes.
Use the harness when a gateway or session test needs worker behavior without
starting `MxGateway.Worker.exe` or loading MXAccess COM. The harness scripts:
- `WorkerHello` and `WorkerReady` startup,
- command replies with matching correlation ids,
- ordered `WorkerEvent` frames,
- `WorkerFault` frames,
- shutdown acknowledgements,
- malformed protobuf payloads and oversized frame headers,
- slow or hung workers by withholding a reply.
Session-level tests can connect the harness to the pipe created by
`SessionWorkerClientFactory` with `ConnectToGatewayPipeAsync`. Lower-level
`WorkerClient` tests can use `CreateConnectedPairAsync` to create both pipe ends
inside the test.
## Focused Commands
Run the fake worker tests after changing gateway worker IPC, session startup, or
event streaming behavior:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests
```
Run the gateway test project after shared gateway test infrastructure changes:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj
```
## Related Documentation
- [Gateway Process Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
+5
View File
@@ -891,6 +891,11 @@ behavior unless an explicit non-parity backend is designed.
Gateway tests should be able to run without installed MXAccess by using fake Gateway tests should be able to run without installed MXAccess by using fake
workers and fake transports. workers and fake transports.
Use `FakeWorkerHarness` for tests that need real gateway-to-worker framing,
handshake, command, event, fault, or malformed-protocol behavior without loading
MXAccess COM. See [Gateway Testing](./GatewayTesting.md) for the harness scope
and focused test commands.
Focused tests: Focused tests:
- session state transitions, - session state transitions,
@@ -218,6 +218,8 @@ Live tests:
Labels: `area:worker`, `type:feature`, `priority:p0` Labels: `area:worker`, `type:feature`, `priority:p0`
Status: implemented.
Deliverables: Deliverables:
- `AddItem`, - `AddItem`,
+25
View File
@@ -432,6 +432,25 @@ HRESULT and converts the reply to `ProtocolStatusCode.MxaccessFailure`.
`MxAccessStaSession.GetRegisteredServerHandlesAsync` returns an STA-read `MxAccessStaSession.GetRegisteredServerHandlesAsync` returns an STA-read
snapshot of tracked server handles for diagnostics and future cleanup logic. snapshot of tracked server handles for diagnostics and future cleanup logic.
`MxAccessCommandExecutor` also implements the item lifecycle commands:
- `AddItem` calls `LMXProxyServerClass.AddItem` with the requested server
handle and item definition. It preserves the returned item handle in both
`ReturnValue` and `AddItemReply.ItemHandle`.
- `AddItem2` calls `LMXProxyServerClass.AddItem2` with the requested server
handle, item definition, and context string. The context string is passed to
MXAccess exactly as received.
- `RemoveItem` calls `LMXProxyServerClass.RemoveItem` with the requested server
handle and item handle. The reply has no method-specific payload because the
public MXAccess method returns `void`.
The worker records item handles only after `AddItem` or `AddItem2` returns
normally, and removes item handles only after `RemoveItem` returns normally.
The registry does not prevalidate server or item handles, so invalid and
cross-server handle behavior remains owned by MXAccess. COM exceptions continue
through `StaCommandDispatcher`, which preserves the HRESULT and leaves
diagnostic registry state unchanged for failed cleanup calls.
## Handle Registry ## Handle Registry
The worker should track MXAccess state for diagnostics and cleanup, while still The worker should track MXAccess state for diagnostics and cleanup, while still
@@ -454,6 +473,8 @@ Rules:
- Do not rewrite handles returned by MXAccess. - Do not rewrite handles returned by MXAccess.
- Record server handles only after `Register` succeeds. - Record server handles only after `Register` succeeds.
- Remove server handles only after `Unregister` succeeds. - Remove server handles only after `Unregister` succeeds.
- Record item handles only after `AddItem` or `AddItem2` succeeds.
- Remove item handles only after `RemoveItem` succeeds.
- Preserve invalid-handle behavior from MXAccess. - Preserve invalid-handle behavior from MXAccess.
- Preserve cross-server handle behavior from MXAccess. - Preserve cross-server handle behavior from MXAccess.
- Use registry state for cleanup and diagnostics, not semantic correction. - Use registry state for cleanup and diagnostics, not semantic correction.
@@ -697,6 +718,10 @@ Live MXAccess tests:
Live tests should be opt-in and clearly marked because they depend on installed Live tests should be opt-in and clearly marked because they depend on installed
MXAccess COM and provider state. MXAccess COM and provider state.
The worker test suite uses `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` for these
tests. `AddItem` uses `TestChildObject.TestInt` by default and accepts an
override through `MXGATEWAY_LIVE_MXACCESS_ITEM`; `AddItem2` uses the captured
parity fixture shape `AddItem2("TestInt", "TestChildObject")`.
## Initial Implementation Slice ## Initial Implementation Slice
@@ -0,0 +1,216 @@
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
using MxGateway.Tests.Gateway.Workers.Fakes;
namespace MxGateway.Tests.Gateway.Sessions;
public sealed class SessionWorkerClientFactoryFakeWorkerTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task CreateAsync_WithScriptedFakeWorker_ReturnsReadyClient()
{
ScriptedFakeWorkerProcessLauncher launcher = new();
using GatewayMetrics metrics = new();
SessionWorkerClientFactory factory = new(
launcher,
Options.Create(CreateOptions()),
metrics,
NullLoggerFactory.Instance);
GatewaySession session = CreateSession();
await using IWorkerClient workerClient = await factory.CreateAsync(
session,
CancellationToken.None);
Assert.Equal(WorkerClientState.Ready, workerClient.State);
Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, workerClient.ProcessId);
Assert.NotNull(launcher.Harness);
Task<WorkerCommandReply> invokeTask = workerClient.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await launcher.Harness.ReadCommandAsync();
await launcher.Harness.ReplyToCommandAsync(commandEnvelope);
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code);
}
[Fact]
public async Task CreateAsync_WhenFakeWorkerStartupFails_ThrowsWorkerClientException()
{
FailingStartupWorkerProcessLauncher launcher = new();
using GatewayMetrics metrics = new();
SessionWorkerClientFactory factory = new(
launcher,
Options.Create(CreateOptions()),
metrics,
NullLoggerFactory.Instance);
GatewaySession session = CreateSession();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await factory.CreateAsync(session, CancellationToken.None).WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode);
Assert.True(launcher.Process.IsDisposed);
}
private static GatewayOptions CreateOptions()
{
return new GatewayOptions
{
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 5,
ShutdownTimeoutSeconds = 5,
HeartbeatIntervalSeconds = 30,
HeartbeatGraceSeconds = 30,
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
},
Events = new EventOptions
{
QueueCapacity = 16,
},
};
}
private static GatewaySession CreateSession()
{
return new GatewaySession(
FakeWorkerHarness.DefaultSessionId,
GatewayContractInfo.DefaultBackendName,
$"mxaccessgw-session-fake-worker-{Guid.NewGuid():N}",
FakeWorkerHarness.DefaultNonce,
"test-client",
"fake-worker-session-test",
"client-correlation-1",
TestTimeout,
TestTimeout,
TestTimeout,
DateTimeOffset.UtcNow);
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher
{
public const int ProcessId = 2468;
private readonly FakeWorkerProcess _process = new(ProcessId);
public FakeWorkerHarness? Harness { get; private set; }
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
_ = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(CreateHandle(_process));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
Harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await Harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
private sealed class FailingStartupWorkerProcessLauncher : IWorkerProcessLauncher
{
public FakeWorkerProcess Process { get; } = new(processId: 3579);
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
_ = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(CreateHandle(Process));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
await harness.SendWorkerHelloAsync(
workerProcessId: Process.Id,
workerProtocolVersion: request.ProtocolVersion + 1,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
private static WorkerProcessHandle CreateHandle(IWorkerProcess process)
{
return new WorkerProcessHandle(
process,
new WorkerProcessCommandLine("fake-worker.exe", []),
DateTimeOffset.UtcNow);
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private bool _disposed;
public int Id { get; } = processId;
public bool HasExited { get; private set; }
public int? ExitCode { get; private set; }
public int KillCount { get; private set; }
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
HasExited = true;
ExitCode = 0;
return ValueTask.CompletedTask;
}
public void Kill(bool entireProcessTree)
{
KillCount++;
HasExited = true;
ExitCode = -1;
}
public void Dispose()
{
_disposed = true;
}
public bool IsDisposed => _disposed;
}
}
@@ -0,0 +1,190 @@
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
using MxGateway.Tests.Gateway.Workers.Fakes;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class FakeWorkerHarnessTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task CompleteStartupAsync_WithHelloAndReady_TransitionsClientToReady()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await fakeWorker.CompleteStartupAsync();
await startTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
Assert.Equal(FakeWorkerHarness.DefaultNonce, gatewayHello.GatewayHello.Nonce);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, client.ProcessId);
}
[Fact]
public async Task StartAsync_WithProtocolMismatch_FailsStartup()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await fakeWorker.ReadGatewayEnvelopeAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
await fakeWorker.SendWorkerHelloAsync(
workerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion + 1);
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await startTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode);
}
[Fact]
public async Task InvokeAsync_WithScriptedReply_CompletesCommand()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
await fakeWorker.ReplyToCommandAsync(commandEnvelope);
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code);
}
[Fact]
public async Task ReadEventsAsync_WithScriptedEvents_YieldsOrderedEvents()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
await using IAsyncEnumerator<WorkerEvent> events =
client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token);
await fakeWorker.EmitEventAsync(MxEventFamily.OnDataChange, cancellationTokenSource.Token);
await fakeWorker.EmitEventAsync(MxEventFamily.OperationComplete, cancellationTokenSource.Token);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)3, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)4, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
}
[Fact]
public async Task ReadLoop_WithScriptedFault_FaultsClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
await fakeWorker.EmitFaultAsync(
WorkerFaultCategory.MxaccessCommandFailed,
"scripted MXAccess command fault");
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TimeSpan.FromMilliseconds(50),
CancellationToken.None);
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
}
[Fact]
public async Task ReadLoop_WithMalformedFrame_FaultsClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
await fakeWorker.WriteMalformedPayloadAsync(new byte[] { 0x08, 0x96, 0x01 });
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ShutdownAsync_WithShutdownAck_ClosesClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task shutdownTask = client.ShutdownAsync(TestTimeout, CancellationToken.None);
WorkerEnvelope shutdownEnvelope = await fakeWorker.ReadShutdownAsync();
await fakeWorker.SendShutdownAckAsync();
await shutdownTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdown, shutdownEnvelope.BodyCase);
Assert.Equal(WorkerClientState.Closed, client.State);
}
private static async Task StartClientAsync(
FakeWorkerHarness fakeWorker,
WorkerClient client)
{
Task startTask = client.StartAsync(CancellationToken.None);
await fakeWorker.CompleteStartupAsync().ConfigureAwait(false);
await startTask.WaitAsync(TestTimeout).ConfigureAwait(false);
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private static async Task WaitUntilAsync(
Func<bool> predicate,
TimeSpan timeout)
{
using CancellationTokenSource cancellationTokenSource = new(timeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
}
@@ -0,0 +1,378 @@
using System.Buffers.Binary;
using System.IO.Pipes;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers.Fakes;
public sealed class FakeWorkerHarness : IAsyncDisposable
{
public const string DefaultSessionId = "session-fake-worker";
public const string DefaultNonce = "nonce-fake-worker";
public const int DefaultWorkerProcessId = 9321;
private readonly NamedPipeServerStream? _gatewayStream;
private readonly NamedPipeClientStream _workerStream;
private readonly WorkerFrameProtocolOptions _frameOptions;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private bool _workerSideDisposed;
private FakeWorkerHarness(
string sessionId,
string nonce,
NamedPipeServerStream? gatewayStream,
NamedPipeClientStream workerStream,
WorkerFrameProtocolOptions frameOptions)
{
SessionId = sessionId;
Nonce = nonce;
_gatewayStream = gatewayStream;
_workerStream = workerStream;
_frameOptions = frameOptions;
_reader = new WorkerFrameReader(_workerStream, frameOptions);
_writer = new WorkerFrameWriter(_workerStream, frameOptions);
}
public string SessionId { get; }
public string Nonce { get; }
public ulong NextWorkerSequence { get; private set; }
public static async Task<FakeWorkerHarness> CreateConnectedPairAsync(
string sessionId = DefaultSessionId,
string nonce = DefaultNonce,
uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion,
int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
CancellationToken cancellationToken = default)
{
string pipeName = $"mxaccessgw-fake-worker-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = CreateWorkerStream(pipeName);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(cancellationToken);
await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false);
await waitForConnectionTask.ConfigureAwait(false);
return new FakeWorkerHarness(
sessionId,
nonce,
gatewayStream,
workerStream,
new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes));
}
public static async Task<FakeWorkerHarness> ConnectToGatewayPipeAsync(
string sessionId,
string nonce,
string pipeName,
uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion,
int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
CancellationToken cancellationToken = default)
{
NamedPipeClientStream workerStream = CreateWorkerStream(pipeName);
await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false);
return new FakeWorkerHarness(
sessionId,
nonce,
gatewayStream: null,
workerStream,
new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes));
}
public WorkerClient CreateClient(
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
TimeProvider? timeProvider = null)
{
if (_gatewayStream is null)
{
throw new InvalidOperationException("This fake worker is connected to a gateway-owned pipe.");
}
WorkerClientConnection connection = new(
SessionId,
Nonce,
_gatewayStream,
_frameOptions);
return new WorkerClient(connection, options, metrics, timeProvider);
}
public async Task<WorkerEnvelope> CompleteStartupAsync(
int workerProcessId = DefaultWorkerProcessId,
string workerVersion = "fake-worker",
string mxaccessProgid = "LMXProxy.LMXProxyServer.1",
string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
CancellationToken cancellationToken = default)
{
WorkerEnvelope gatewayHello = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (gatewayHello.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
{
throw new InvalidOperationException($"Expected GatewayHello but received {gatewayHello.BodyCase}.");
}
await SendWorkerHelloAsync(
workerProcessId,
workerVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await SendWorkerReadyAsync(
workerProcessId,
mxaccessProgid,
mxaccessClsid,
cancellationToken).ConfigureAwait(false);
return gatewayHello;
}
public async Task<WorkerEnvelope> ReadGatewayEnvelopeAsync(CancellationToken cancellationToken = default)
{
return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<WorkerEnvelope> ReadCommandAsync(CancellationToken cancellationToken = default)
{
WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Expected WorkerCommand but received {envelope.BodyCase}.");
}
return envelope;
}
public async Task<WorkerEnvelope> ReadShutdownAsync(CancellationToken cancellationToken = default)
{
WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
throw new InvalidOperationException($"Expected WorkerShutdown but received {envelope.BodyCase}.");
}
return envelope;
}
public async Task SendWorkerHelloAsync(
int workerProcessId = DefaultWorkerProcessId,
string workerVersion = "fake-worker",
uint? workerProtocolVersion = null,
string? nonce = null,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerHello = new WorkerHello
{
ProtocolVersion = workerProtocolVersion ?? _frameOptions.ProtocolVersion,
Nonce = nonce ?? Nonce,
WorkerProcessId = workerProcessId,
WorkerVersion = workerVersion,
}),
cancellationToken).ConfigureAwait(false);
}
public async Task SendWorkerReadyAsync(
int workerProcessId = DefaultWorkerProcessId,
string mxaccessProgid = "LMXProxy.LMXProxyServer.1",
string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerReady = new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = mxaccessProgid,
MxaccessClsid = mxaccessClsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
}),
cancellationToken).ConfigureAwait(false);
}
public async Task ReplyToCommandAsync(
WorkerEnvelope commandEnvelope,
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
string statusMessage = "OK",
CancellationToken cancellationToken = default)
{
if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new ArgumentException("Command envelope must contain WorkerCommand.", nameof(commandEnvelope));
}
MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified;
await _writer.WriteAsync(
CreateEnvelope(
commandEnvelope.CorrelationId,
envelope => envelope.WorkerCommandReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = SessionId,
CorrelationId = commandEnvelope.CorrelationId,
Kind = kind,
ProtocolStatus = new ProtocolStatus
{
Code = statusCode,
Message = statusMessage,
},
},
CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
}),
cancellationToken).ConfigureAwait(false);
}
public async Task EmitEventAsync(
MxEventFamily family,
CancellationToken cancellationToken = default)
{
ulong sequence = NextWorkerSequence + 1;
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerEvent = new WorkerEvent
{
Event = new MxEvent
{
SessionId = SessionId,
Family = family,
WorkerSequence = sequence,
WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
}),
cancellationToken).ConfigureAwait(false);
}
public async Task EmitFaultAsync(
WorkerFaultCategory category,
string diagnosticMessage,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerFault = new WorkerFault
{
Category = category,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
}),
cancellationToken).ConfigureAwait(false);
}
public async Task SendShutdownAckAsync(
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerShutdownAck = new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = statusCode,
Message = statusCode.ToString(),
},
}),
cancellationToken).ConfigureAwait(false);
}
public async Task WriteMalformedPayloadAsync(
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken = default)
{
if (payload.IsEmpty)
{
throw new ArgumentException("Malformed payload must include at least one byte.", nameof(payload));
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payload.Length);
await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
await _workerStream.WriteAsync(payload, cancellationToken).ConfigureAwait(false);
}
public async Task WriteOversizedFrameHeaderAsync(
uint payloadLength,
CancellationToken cancellationToken = default)
{
if (payloadLength <= _frameOptions.MaxMessageBytes)
{
throw new ArgumentOutOfRangeException(
nameof(payloadLength),
payloadLength,
"Payload length must exceed the configured maximum.");
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, payloadLength);
await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeWorkerSideAsync()
{
if (_workerSideDisposed)
{
return;
}
await _workerStream.DisposeAsync().ConfigureAwait(false);
_workerSideDisposed = true;
}
public async ValueTask DisposeAsync()
{
await DisposeWorkerSideAsync().ConfigureAwait(false);
if (_gatewayStream is not null)
{
await _gatewayStream.DisposeAsync().ConfigureAwait(false);
}
}
private WorkerEnvelope CreateEnvelope(
string correlationId,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = _frameOptions.ProtocolVersion,
SessionId = SessionId,
Sequence = AdvanceSequence(),
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private ulong AdvanceSequence()
{
return ++NextWorkerSequence;
}
private static NamedPipeClientStream CreateWorkerStream(string pipeName)
{
return new NamedPipeClientStream(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
}
}
@@ -78,6 +78,166 @@ public sealed class MxAccessCommandExecutorTests
Assert.Equal(44, registeredServerHandle.ServerHandle); Assert.Equal(44, registeredServerHandle.ServerHandle);
} }
[Fact]
public async Task DispatchAsync_AddItem_CallsMxAccessOnStaAndTracksItemHandle()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 46,
addItemHandle: 501);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-add", "client-a"));
MxCommandReply reply = await session.DispatchAsync(CreateAddItemCommand(
"add-item",
46,
"Galaxy.Tag.Value"));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult);
Assert.Equal(0, reply.Hresult);
Assert.Equal(501, reply.AddItem.ItemHandle);
Assert.Equal(MxDataType.Integer, reply.ReturnValue.DataType);
Assert.Equal(501, reply.ReturnValue.Int32Value);
Assert.Equal(46, fakeComObject.AddItemServerHandle);
Assert.Equal("Galaxy.Tag.Value", fakeComObject.AddItemDefinition);
Assert.Equal(runtime.StaThreadId, fakeComObject.AddItemThreadId);
RegisteredItemHandle registeredItemHandle = Assert.Single(
await session.GetRegisteredItemHandlesAsync());
Assert.Equal(46, registeredItemHandle.ServerHandle);
Assert.Equal(501, registeredItemHandle.ItemHandle);
Assert.Equal("Galaxy.Tag.Value", registeredItemHandle.ItemDefinition);
Assert.Equal(string.Empty, registeredItemHandle.ItemContext);
Assert.False(registeredItemHandle.HasItemContext);
}
[Fact]
public async Task DispatchAsync_AddItem2_PassesContextExactlyAndTracksItemHandle()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 47,
addItem2Handle: 502);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-add2", "client-a"));
MxCommandReply reply = await session.DispatchAsync(CreateAddItem2Command(
"add-item2",
47,
"TestInt",
"TestChildObject"));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Equal(502, reply.AddItem2.ItemHandle);
Assert.Equal(MxDataType.Integer, reply.ReturnValue.DataType);
Assert.Equal(502, reply.ReturnValue.Int32Value);
Assert.Equal(47, fakeComObject.AddItem2ServerHandle);
Assert.Equal("TestInt", fakeComObject.AddItem2Definition);
Assert.Equal("TestChildObject", fakeComObject.AddItem2Context);
Assert.Equal(runtime.StaThreadId, fakeComObject.AddItem2ThreadId);
RegisteredItemHandle registeredItemHandle = Assert.Single(
await session.GetRegisteredItemHandlesAsync());
Assert.Equal(47, registeredItemHandle.ServerHandle);
Assert.Equal(502, registeredItemHandle.ItemHandle);
Assert.Equal("TestInt", registeredItemHandle.ItemDefinition);
Assert.Equal("TestChildObject", registeredItemHandle.ItemContext);
Assert.True(registeredItemHandle.HasItemContext);
}
[Fact]
public async Task DispatchAsync_RemoveItem_CallsMxAccessOnStaAndRemovesTrackedItemHandle()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 48,
addItemHandle: 503);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-remove", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-remove", 48, "Galaxy.Tag.Value"));
MxCommandReply reply = await session.DispatchAsync(CreateRemoveItemCommand(
"remove-item",
48,
503));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult);
Assert.Equal(0, reply.Hresult);
Assert.Equal(48, fakeComObject.RemoveItemServerHandle);
Assert.Equal(503, fakeComObject.RemovedItemHandle);
Assert.Equal(runtime.StaThreadId, fakeComObject.RemoveItemThreadId);
Assert.Empty(await session.GetRegisteredItemHandlesAsync());
}
[Fact]
public async Task DispatchAsync_RemoveItemWithCrossServerHandle_PreservesHResultAndKeepsTrackedItemHandle()
{
const int hresult = unchecked((int)0x80070057);
FakeMxAccessComObject fakeComObject = new(
registerHandle: 49,
addItemHandle: 504,
removeItemException: new COMException("Invalid item handle.", hresult));
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-remove-failure", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-remove-failure", 49, "Galaxy.Tag.Value"));
MxCommandReply reply = await session.DispatchAsync(CreateRemoveItemCommand(
"remove-item-failure",
999,
504));
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult);
Assert.Equal(hresult, reply.Hresult);
Assert.Contains("0x80070057", reply.DiagnosticMessage);
Assert.Equal(999, fakeComObject.RemoveItemServerHandle);
Assert.Equal(504, fakeComObject.RemovedItemHandle);
RegisteredItemHandle registeredItemHandle = Assert.Single(
await session.GetRegisteredItemHandlesAsync());
Assert.Equal(49, registeredItemHandle.ServerHandle);
Assert.Equal(504, registeredItemHandle.ItemHandle);
}
[Fact]
public async Task DispatchAsync_AddItem2WhenMxAccessThrows_PreservesHResultAndDoesNotTrackItemHandle()
{
const int hresult = unchecked((int)0x80070057);
FakeMxAccessComObject fakeComObject = new(
registerHandle: 50,
addItem2Exception: new COMException("Invalid server handle.", hresult));
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
MxCommandReply reply = await session.DispatchAsync(CreateAddItem2Command(
"add-item2-failure",
9001,
"TestInt",
"TestChildObject"));
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
Assert.True(reply.HasHresult);
Assert.Equal(hresult, reply.Hresult);
Assert.Contains("0x80070057", reply.DiagnosticMessage);
Assert.Equal(9001, fakeComObject.AddItem2ServerHandle);
Assert.Equal("TestInt", fakeComObject.AddItem2Definition);
Assert.Equal("TestChildObject", fakeComObject.AddItem2Context);
Assert.Empty(await session.GetRegisteredItemHandlesAsync());
}
[Fact] [Fact]
public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest() public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest()
{ {
@@ -98,6 +258,26 @@ public sealed class MxAccessCommandExecutorTests
Assert.Null(factory.FakeComObject.RegisteredClientName); Assert.Null(factory.FakeComObject.RegisteredClientName);
} }
[Fact]
public async Task DispatchAsync_AddItemWithoutPayload_ReturnsInvalidRequest()
{
FakeMxAccessComObjectFactory factory = new(new FakeMxAccessComObject(registerHandle: 51));
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
MxCommandReply reply = await session.DispatchAsync(new StaCommand(
"session-1",
"missing-add-payload",
new MxCommand
{
Kind = MxCommandKind.AddItem,
}));
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
Assert.Null(factory.FakeComObject.AddItemDefinition);
}
private static StaCommand CreateRegisterCommand( private static StaCommand CreateRegisterCommand(
string correlationId, string correlationId,
string clientName) string clientName)
@@ -132,6 +312,65 @@ public sealed class MxAccessCommandExecutorTests
}); });
} }
private static StaCommand CreateAddItemCommand(
string correlationId,
int serverHandle,
string itemDefinition)
{
return new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
},
});
}
private static StaCommand CreateAddItem2Command(
string correlationId,
int serverHandle,
string itemDefinition,
string itemContext)
{
return new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
ItemContext = itemContext,
},
});
}
private static StaCommand CreateRemoveItemCommand(
string correlationId,
int serverHandle,
int itemHandle)
{
return new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
});
}
private static StaRuntime CreateRuntime() private static StaRuntime CreateRuntime()
{ {
return new StaRuntime( return new StaRuntime(
@@ -143,14 +382,29 @@ public sealed class MxAccessCommandExecutorTests
private sealed class FakeMxAccessComObject private sealed class FakeMxAccessComObject
{ {
private readonly int registerHandle; private readonly int registerHandle;
private readonly int addItemHandle;
private readonly int addItem2Handle;
private readonly Exception? unregisterException; private readonly Exception? unregisterException;
private readonly Exception? addItemException;
private readonly Exception? addItem2Exception;
private readonly Exception? removeItemException;
public FakeMxAccessComObject( public FakeMxAccessComObject(
int registerHandle, int registerHandle,
Exception? unregisterException = null) int addItemHandle = 0,
int addItem2Handle = 0,
Exception? unregisterException = null,
Exception? addItemException = null,
Exception? addItem2Exception = null,
Exception? removeItemException = null)
{ {
this.registerHandle = registerHandle; this.registerHandle = registerHandle;
this.addItemHandle = addItemHandle;
this.addItem2Handle = addItem2Handle;
this.unregisterException = unregisterException; this.unregisterException = unregisterException;
this.addItemException = addItemException;
this.addItem2Exception = addItem2Exception;
this.removeItemException = removeItemException;
} }
public string? RegisteredClientName { get; private set; } public string? RegisteredClientName { get; private set; }
@@ -161,6 +415,26 @@ public sealed class MxAccessCommandExecutorTests
public int? UnregisterThreadId { get; private set; } public int? UnregisterThreadId { get; private set; }
public int? AddItemServerHandle { get; private set; }
public string? AddItemDefinition { get; private set; }
public int? AddItemThreadId { get; private set; }
public int? AddItem2ServerHandle { get; private set; }
public string? AddItem2Definition { get; private set; }
public string? AddItem2Context { get; private set; }
public int? AddItem2ThreadId { get; private set; }
public int? RemoveItemServerHandle { get; private set; }
public int? RemovedItemHandle { get; private set; }
public int? RemoveItemThreadId { get; private set; }
public int Register(string clientName) public int Register(string clientName)
{ {
RegisteredClientName = clientName; RegisteredClientName = clientName;
@@ -179,6 +453,54 @@ public sealed class MxAccessCommandExecutorTests
throw unregisterException; throw unregisterException;
} }
} }
public int AddItem(
int serverHandle,
string itemDefinition)
{
AddItemServerHandle = serverHandle;
AddItemDefinition = itemDefinition;
AddItemThreadId = Environment.CurrentManagedThreadId;
if (addItemException is not null)
{
throw addItemException;
}
return addItemHandle;
}
public int AddItem2(
int serverHandle,
string itemDefinition,
string itemContext)
{
AddItem2ServerHandle = serverHandle;
AddItem2Definition = itemDefinition;
AddItem2Context = itemContext;
AddItem2ThreadId = Environment.CurrentManagedThreadId;
if (addItem2Exception is not null)
{
throw addItem2Exception;
}
return addItem2Handle;
}
public void RemoveItem(
int serverHandle,
int itemHandle)
{
RemoveItemServerHandle = serverHandle;
RemovedItemHandle = itemHandle;
RemoveItemThreadId = Environment.CurrentManagedThreadId;
if (removeItemException is not null)
{
throw removeItemException;
}
}
} }
private sealed class FakeMxAccessComObjectFactory : IMxAccessComObjectFactory private sealed class FakeMxAccessComObjectFactory : IMxAccessComObjectFactory
@@ -8,6 +8,11 @@ namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessLiveComCreationTests public sealed class MxAccessLiveComCreationTests
{ {
private const string LiveClientName = "MxGateway.Worker.Tests";
private const string DefaultLiveAddItemReference = "TestChildObject.TestInt";
private const string DefaultLiveAddItem2Definition = "TestInt";
private const string DefaultLiveAddItem2Context = "TestChildObject";
[Fact] [Fact]
public async Task StartAsync_WhenOptedIn_CreatesInstalledMxAccessComObjectOnSta() public async Task StartAsync_WhenOptedIn_CreatesInstalledMxAccessComObjectOnSta()
{ {
@@ -43,7 +48,7 @@ public sealed class MxAccessLiveComCreationTests
Kind = MxCommandKind.Register, Kind = MxCommandKind.Register,
Register = new RegisterCommand Register = new RegisterCommand
{ {
ClientName = "MxGateway.Worker.Tests", ClientName = LiveClientName,
}, },
})); }));
@@ -65,6 +70,151 @@ public sealed class MxAccessLiveComCreationTests
Assert.Equal(ProtocolStatusCode.Ok, unregisterReply.ProtocolStatus.Code); Assert.Equal(ProtocolStatusCode.Ok, unregisterReply.ProtocolStatus.Code);
} }
[Fact]
public async Task AddItemAndRemoveItem_WhenOptedIn_RoundTripsInstalledMxAccessItemHandle()
{
if (!RunLiveMxAccessTests())
{
return;
}
using MxAccessStaSession session = new();
await session.StartAsync(workerProcessId: 1234);
MxCommandReply registerReply = await RegisterLiveSessionAsync(session, "live-add-register");
int serverHandle = registerReply.Register.ServerHandle;
int itemHandle = 0;
try
{
MxCommandReply addItemReply = await session.DispatchAsync(new StaCommand(
"session-1",
"live-add-item",
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = GetLiveAddItemReference(),
},
}));
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
Assert.True(addItemReply.AddItem.ItemHandle > 0);
itemHandle = addItemReply.AddItem.ItemHandle;
MxCommandReply removeItemReply = await session.DispatchAsync(new StaCommand(
"session-1",
"live-remove-item",
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}));
Assert.Equal(ProtocolStatusCode.Ok, removeItemReply.ProtocolStatus.Code);
itemHandle = 0;
}
finally
{
if (itemHandle > 0)
{
await session.DispatchAsync(new StaCommand(
"session-1",
"live-remove-item-cleanup",
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}));
}
await UnregisterLiveSessionAsync(session, serverHandle, "live-add-unregister");
}
}
[Fact]
public async Task AddItem2AndRemoveItem_WhenOptedIn_PreservesContextForInstalledMxAccess()
{
if (!RunLiveMxAccessTests())
{
return;
}
using MxAccessStaSession session = new();
await session.StartAsync(workerProcessId: 1234);
MxCommandReply registerReply = await RegisterLiveSessionAsync(session, "live-add2-register");
int serverHandle = registerReply.Register.ServerHandle;
int itemHandle = 0;
try
{
MxCommandReply addItem2Reply = await session.DispatchAsync(new StaCommand(
"session-1",
"live-add-item2",
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = DefaultLiveAddItem2Definition,
ItemContext = DefaultLiveAddItem2Context,
},
}));
Assert.Equal(ProtocolStatusCode.Ok, addItem2Reply.ProtocolStatus.Code);
Assert.True(addItem2Reply.AddItem2.ItemHandle > 0);
itemHandle = addItem2Reply.AddItem2.ItemHandle;
MxCommandReply removeItemReply = await session.DispatchAsync(new StaCommand(
"session-1",
"live-remove-item2",
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}));
Assert.Equal(ProtocolStatusCode.Ok, removeItemReply.ProtocolStatus.Code);
itemHandle = 0;
}
finally
{
if (itemHandle > 0)
{
await session.DispatchAsync(new StaCommand(
"session-1",
"live-remove-item2-cleanup",
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
}));
}
await UnregisterLiveSessionAsync(session, serverHandle, "live-add2-unregister");
}
}
private static bool RunLiveMxAccessTests() private static bool RunLiveMxAccessTests()
{ {
return string.Equals( return string.Equals(
@@ -72,4 +222,55 @@ public sealed class MxAccessLiveComCreationTests
"1", "1",
StringComparison.Ordinal); StringComparison.Ordinal);
} }
private static string GetLiveAddItemReference()
{
string itemReference = Environment.GetEnvironmentVariable("MXGATEWAY_LIVE_MXACCESS_ITEM");
return string.IsNullOrWhiteSpace(itemReference)
? DefaultLiveAddItemReference
: itemReference;
}
private static async Task<MxCommandReply> RegisterLiveSessionAsync(
MxAccessStaSession session,
string correlationId)
{
MxCommandReply reply = await session.DispatchAsync(new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand
{
ClientName = LiveClientName,
},
}));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.True(reply.Register.ServerHandle > 0);
return reply;
}
private static async Task UnregisterLiveSessionAsync(
MxAccessStaSession session,
int serverHandle,
string correlationId)
{
MxCommandReply unregisterReply = await session.DispatchAsync(new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.Unregister,
Unregister = new UnregisterCommand
{
ServerHandle = serverHandle,
},
}));
Assert.Equal(ProtocolStatusCode.Ok, unregisterReply.ProtocolStatus.Code);
}
} }
@@ -5,4 +5,17 @@ public interface IMxAccessServer
int Register(string clientName); int Register(string clientName);
void Unregister(int serverHandle); void Unregister(int serverHandle);
int AddItem(
int serverHandle,
string itemDefinition);
int AddItem2(
int serverHandle,
string itemDefinition,
string itemContext);
void RemoveItem(
int serverHandle,
int itemHandle);
} }
@@ -35,6 +35,44 @@ public sealed class MxAccessComServer : IMxAccessServer
Invoke(nameof(Unregister), serverHandle); Invoke(nameof(Unregister), serverHandle);
} }
public int AddItem(
int serverHandle,
string itemDefinition)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
{
return mxAccessServer.AddItem(serverHandle, itemDefinition);
}
return (int)Invoke(nameof(AddItem), serverHandle, itemDefinition);
}
public int AddItem2(
int serverHandle,
string itemDefinition,
string itemContext)
{
if (mxAccessComObject is ILMXProxyServer3 mxAccessServer)
{
return mxAccessServer.AddItem2(serverHandle, itemDefinition, itemContext);
}
return (int)Invoke(nameof(AddItem2), serverHandle, itemDefinition, itemContext);
}
public void RemoveItem(
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
{
mxAccessServer.RemoveItem(serverHandle, itemHandle);
return;
}
Invoke(nameof(RemoveItem), serverHandle, itemHandle);
}
private object Invoke( private object Invoke(
string methodName, string methodName,
params object[] arguments) params object[] arguments)
@@ -34,6 +34,9 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
{ {
MxCommandKind.Register => ExecuteRegister(command), MxCommandKind.Register => ExecuteRegister(command),
MxCommandKind.Unregister => ExecuteUnregister(command), MxCommandKind.Unregister => ExecuteUnregister(command),
MxCommandKind.AddItem => ExecuteAddItem(command),
MxCommandKind.AddItem2 => ExecuteAddItem2(command),
MxCommandKind.RemoveItem => ExecuteRemoveItem(command),
_ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."), _ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."),
}; };
} }
@@ -67,6 +70,66 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
return CreateOkReply(command); return CreateOkReply(command);
} }
private MxCommandReply ExecuteAddItem(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AddItem)
{
return CreateInvalidRequestReply(command, "AddItem command payload is required.");
}
AddItemCommand addItemCommand = command.Command.AddItem;
int itemHandle = session.AddItem(
addItemCommand.ServerHandle,
addItemCommand.ItemDefinition);
MxCommandReply reply = CreateOkReply(command);
reply.ReturnValue = variantConverter.Convert(itemHandle);
reply.AddItem = new AddItemReply
{
ItemHandle = itemHandle,
};
return reply;
}
private MxCommandReply ExecuteAddItem2(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AddItem2)
{
return CreateInvalidRequestReply(command, "AddItem2 command payload is required.");
}
AddItem2Command addItem2Command = command.Command.AddItem2;
int itemHandle = session.AddItem2(
addItem2Command.ServerHandle,
addItem2Command.ItemDefinition,
addItem2Command.ItemContext);
MxCommandReply reply = CreateOkReply(command);
reply.ReturnValue = variantConverter.Convert(itemHandle);
reply.AddItem2 = new AddItem2Reply
{
ItemHandle = itemHandle,
};
return reply;
}
private MxCommandReply ExecuteRemoveItem(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.RemoveItem)
{
return CreateInvalidRequestReply(command, "RemoveItem command payload is required.");
}
RemoveItemCommand removeItemCommand = command.Command.RemoveItem;
session.RemoveItem(
removeItemCommand.ServerHandle,
removeItemCommand.ItemHandle);
return CreateOkReply(command);
}
private static MxCommandReply CreateOkReply(StaCommand command) private static MxCommandReply CreateOkReply(StaCommand command)
{ {
return new MxCommandReply return new MxCommandReply
@@ -6,12 +6,19 @@ namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessHandleRegistry public sealed class MxAccessHandleRegistry
{ {
private readonly Dictionary<int, RegisteredServerHandle> serverHandles = new(); private readonly Dictionary<int, RegisteredServerHandle> serverHandles = new();
private readonly Dictionary<long, RegisteredItemHandle> itemHandles = new();
public IReadOnlyList<RegisteredServerHandle> ServerHandles => serverHandles public IReadOnlyList<RegisteredServerHandle> ServerHandles => serverHandles
.Values .Values
.OrderBy(handle => handle.ServerHandle) .OrderBy(handle => handle.ServerHandle)
.ToArray(); .ToArray();
public IReadOnlyList<RegisteredItemHandle> ItemHandles => itemHandles
.Values
.OrderBy(handle => handle.ServerHandle)
.ThenBy(handle => handle.ItemHandle)
.ToArray();
public void RegisterServerHandle( public void RegisterServerHandle(
int serverHandle, int serverHandle,
string clientName) string clientName)
@@ -22,10 +29,54 @@ public sealed class MxAccessHandleRegistry
public void UnregisterServerHandle(int serverHandle) public void UnregisterServerHandle(int serverHandle)
{ {
serverHandles.Remove(serverHandle); serverHandles.Remove(serverHandle);
foreach (long key in itemHandles
.Where(pair => pair.Value.ServerHandle == serverHandle)
.Select(pair => pair.Key)
.ToArray())
{
itemHandles.Remove(key);
}
} }
public bool ContainsServerHandle(int serverHandle) public bool ContainsServerHandle(int serverHandle)
{ {
return serverHandles.ContainsKey(serverHandle); return serverHandles.ContainsKey(serverHandle);
} }
public void RegisterItemHandle(
int serverHandle,
int itemHandle,
string itemDefinition,
string itemContext,
bool hasItemContext)
{
itemHandles[CreateItemKey(serverHandle, itemHandle)] = new RegisteredItemHandle(
serverHandle,
itemHandle,
itemDefinition,
itemContext,
hasItemContext);
}
public void RemoveItemHandle(
int serverHandle,
int itemHandle)
{
itemHandles.Remove(CreateItemKey(serverHandle, itemHandle));
}
public bool ContainsItemHandle(
int serverHandle,
int itemHandle)
{
return itemHandles.ContainsKey(CreateItemKey(serverHandle, itemHandle));
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
} }
@@ -106,6 +106,51 @@ public sealed class MxAccessSession : IDisposable
handleRegistry.UnregisterServerHandle(serverHandle); handleRegistry.UnregisterServerHandle(serverHandle);
} }
public int AddItem(
int serverHandle,
string itemDefinition)
{
ThrowIfDisposed();
int itemHandle = mxAccessServer.AddItem(serverHandle, itemDefinition);
handleRegistry.RegisterItemHandle(
serverHandle,
itemHandle,
itemDefinition,
string.Empty,
hasItemContext: false);
return itemHandle;
}
public int AddItem2(
int serverHandle,
string itemDefinition,
string itemContext)
{
ThrowIfDisposed();
int itemHandle = mxAccessServer.AddItem2(serverHandle, itemDefinition, itemContext);
handleRegistry.RegisterItemHandle(
serverHandle,
itemHandle,
itemDefinition,
itemContext,
hasItemContext: true);
return itemHandle;
}
public void RemoveItem(
int serverHandle,
int itemHandle)
{
ThrowIfDisposed();
mxAccessServer.RemoveItem(serverHandle, itemHandle);
handleRegistry.RemoveItemHandle(serverHandle, itemHandle);
}
public void Dispose() public void Dispose()
{ {
if (disposed) if (disposed)
@@ -81,6 +81,19 @@ public sealed class MxAccessStaSession : IDisposable
cancellationToken); cancellationToken);
} }
public Task<IReadOnlyList<RegisteredItemHandle>> GetRegisteredItemHandlesAsync(
CancellationToken cancellationToken = default)
{
if (session is null)
{
throw new InvalidOperationException("MXAccess COM session has not been started.");
}
return staRuntime.InvokeAsync(
() => session.HandleRegistry.ItemHandles,
cancellationToken);
}
public void Dispose() public void Dispose()
{ {
if (disposed) if (disposed)
@@ -0,0 +1,28 @@
namespace MxGateway.Worker.MxAccess;
public sealed class RegisteredItemHandle
{
public RegisteredItemHandle(
int serverHandle,
int itemHandle,
string itemDefinition,
string itemContext,
bool hasItemContext)
{
ServerHandle = serverHandle;
ItemHandle = itemHandle;
ItemDefinition = itemDefinition;
ItemContext = itemContext;
HasItemContext = hasItemContext;
}
public int ServerHandle { get; }
public int ItemHandle { get; }
public string ItemDefinition { get; }
public string ItemContext { get; }
public bool HasItemContext { get; }
}