From d890eff86270d5c0837fbfc6d1b2824ccb82aa0e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:36:22 -0400 Subject: [PATCH 1/2] Implement graceful worker shutdown --- docs/gateway-process-design.md | 11 + docs/mxaccess-worker-instance-design.md | 30 +++ src/MxGateway.Server/Workers/WorkerClient.cs | 12 + .../Gateway/Sessions/SessionManagerTests.cs | 27 ++ .../Ipc/WorkerPipeSessionTests.cs | 39 +++ .../MxAccess/MxAccessCommandExecutorTests.cs | 64 +++++ .../Sta/StaCommandDispatcherTests.cs | 21 ++ src/MxGateway.Worker/Ipc/WorkerPipeClient.cs | 19 +- src/MxGateway.Worker/Ipc/WorkerPipeSession.cs | 238 +++++++++++++++++- .../MxAccess/MxAccessSession.cs | 125 ++++++++- .../MxAccess/MxAccessShutdownFailure.cs | 34 +++ .../MxAccess/MxAccessShutdownResult.cs | 16 ++ .../MxAccess/MxAccessStaSession.cs | 56 +++++ .../Sta/StaCommandDispatcher.cs | 8 + src/MxGateway.Worker/WorkerApplication.cs | 5 +- 15 files changed, 694 insertions(+), 11 deletions(-) create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 8bcaef8..ec69253 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -175,6 +175,12 @@ Behavior: `CloseSession` should be idempotent. Closing an already closed session should return a successful close result with the final known state. +`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read, +write, and heartbeat loops to stop, and waits for the launched worker process to +exit within the same shutdown timeout. If the pipe loops or process exit exceed +the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession` +then kills the worker process tree before surfacing the close failure. + ### Invoke `Invoke` forwards one MXAccess command to the worker that owns the session. @@ -515,6 +521,11 @@ It handles: The write loop should fail the session if a pipe write fails outside normal shutdown. +During shutdown the worker client treats `WorkerShutdownAck` as the protocol +close signal, but the process handle remains authoritative for process lifetime. +The client waits for both the protocol close and process exit before reporting a +clean shutdown to `GatewaySession`. + ## Command Correlation Each command gets: diff --git a/docs/mxaccess-worker-instance-design.md b/docs/mxaccess-worker-instance-design.md index 12c9322..09e92d4 100644 --- a/docs/mxaccess-worker-instance-design.md +++ b/docs/mxaccess-worker-instance-design.md @@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with: when the exception exposes one, and does not send `WorkerReady` after a failed COM creation attempt. +After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for +the lifetime of the process. `WorkerCommand` frames are dispatched to +`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued +worker events are drained after command replies. `WorkerShutdown` starts the +graceful shutdown path and returns `WorkerShutdownAck` only after the STA +cleanup path completes. + ## Event Sink The worker must subscribe to every public MXAccess event family: @@ -663,6 +670,29 @@ Graceful shutdown sequence: If shutdown wedges, the gateway kills the process. The worker should be written so process kill does not corrupt other sessions. +`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup +path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands +are rejected and queued commands that have not started receive +`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the +STA is allowed to finish until the shutdown grace period expires. + +After command dispatch is closed, cleanup runs on the STA in MXAccess handle +order: + +1. one `UnAdvise` call per advised server/item pair, +2. `RemoveItem` for active item handles, +3. `Unregister` for active server handles, +4. event sink detach, +5. COM release. + +Each cleanup call is best effort. A failed cleanup operation is recorded as an +`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent +later cleanup calls from running. A shutdown with cleanup failures still returns +`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the +controlled release path. If the grace period expires before cleanup can run or +finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible +and relies on the gateway to kill the process. + ## Fault Handling Worker fault categories: diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 18b8cb6..697eb05 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient try { await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false); + await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false); MarkClosed("shutdown"); } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) @@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false); } + private async Task WaitForProcessExitAsync(CancellationToken cancellationToken) + { + WorkerProcessHandle? processHandle = _connection.ProcessHandle; + if (processHandle is null || processHandle.Process.HasExited) + { + return; + } + + await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); + } + private void ThrowIfDisposed() { ObjectDisposedException.ThrowIf(_disposed, this); diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 9fe3611..eece91a 100644 --- a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -86,6 +86,26 @@ public sealed class SessionManagerTests Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } + [Fact] + public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker() + { + FakeWorkerClient workerClient = new() + { + ShutdownException = new WorkerClientException( + WorkerClientErrorCode.ShutdownTimeout, + "Worker shutdown timed out."), + }; + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); + + Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); + Assert.Equal(1, workerClient.ShutdownCount); + Assert.Equal(1, workerClient.KillCount); + } + [Fact] public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() { @@ -266,6 +286,8 @@ public sealed class SessionManagerTests public int KillCount { get; private set; } + public Exception? ShutdownException { get; init; } + public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; @@ -302,6 +324,11 @@ public sealed class SessionManagerTests CancellationToken cancellationToken) { ShutdownCount++; + if (ShutdownException is not null) + { + throw ShutdownException; + } + State = WorkerClientState.Closed; return Task.CompletedTask; } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index e1670d7..1e5ee5a 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.IO; using System.Runtime.InteropServices; @@ -147,6 +148,29 @@ public sealed class WorkerPipeSessionTests Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code); } + [Fact] + public async Task RunAsync_WithWorkerShutdown_WritesShutdownAckAndReturns() + { + WorkerFrameProtocolOptions options = CreateOptions(); + MemoryStream inbound = new(); + WorkerFrameWriter inboundWriter = new(inbound, options); + await inboundWriter.WriteAsync(CreateGatewayHelloEnvelope()); + await inboundWriter.WriteAsync(CreateWorkerShutdownEnvelope()); + inbound.Position = 0; + MemoryStream outbound = new(); + WorkerPipeSession session = CreateSession(inbound, outbound, options); + + await session.CompleteStartupHandshakeAsync(_ => Task.CompletedTask); + await session.RunAsync(); + + WorkerEnvelope[] written = ReadWrittenFrames(outbound, options); + Assert.Equal(3, written.Length); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, written[1].BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, written[2].BodyCase); + Assert.Equal(ProtocolStatusCode.Ok, written[2].WorkerShutdownAck.Status.Code); + } + private static WorkerPipeSession CreateSession( Stream inbound, Stream outbound, @@ -185,6 +209,21 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEnvelope CreateWorkerShutdownEnvelope() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = 2, + WorkerShutdown = new WorkerShutdown + { + GracePeriod = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Reason = "test-shutdown", + }, + }; + } + private static WorkerEnvelope[] ReadWrittenFrames( MemoryStream stream, WorkerFrameProtocolOptions options) diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs index 61bdb7e..4e8b355 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind); } + [Fact] + public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder() + { + FakeMxAccessComObject fakeComObject = new( + registerHandle: 58, + addItemHandle: 510); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510)); + await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.True(result.Succeeded); + Assert.Equal( + new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" }, + fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal) + || name.StartsWith("Remove", StringComparison.Ordinal))); + } + + [Fact] + public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues() + { + const int hresult = unchecked((int)0x80070057); + COMException cleanupException = new("Invalid handle.", hresult); + FakeMxAccessComObject fakeComObject = new( + registerHandle: 59, + addItemHandle: 511, + unregisterException: cleanupException, + removeItemException: cleanupException, + unAdviseException: cleanupException); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.False(result.Succeeded); + Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation)); + Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult)); + Assert.Contains("Unregister:59", fakeComObject.OperationNames); + } + [Fact] public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest() { @@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests private readonly Exception? adviseException; private readonly Exception? unAdviseException; private readonly Exception? adviseSupervisoryException; + private readonly List operationNames = new(); public FakeMxAccessComObject( int registerHandle, @@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests public int? AdviseSupervisoryThreadId { get; private set; } + public IReadOnlyList OperationNames => operationNames.ToArray(); + public int Register(string clientName) { + operationNames.Add($"Register:{clientName}"); RegisteredClientName = clientName; RegisterThreadId = Environment.CurrentManagedThreadId; @@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests public void Unregister(int serverHandle) { + operationNames.Add($"Unregister:{serverHandle}"); UnregisteredServerHandle = serverHandle; UnregisterThreadId = Environment.CurrentManagedThreadId; @@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, string itemDefinition) { + operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}"); AddItemServerHandle = serverHandle; AddItemDefinition = itemDefinition; AddItemThreadId = Environment.CurrentManagedThreadId; @@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests string itemDefinition, string itemContext) { + operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}"); AddItem2ServerHandle = serverHandle; AddItem2Definition = itemDefinition; AddItem2Context = itemContext; @@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}"); RemoveItemServerHandle = serverHandle; RemovedItemHandle = itemHandle; RemoveItemThreadId = Environment.CurrentManagedThreadId; @@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"Advise:{serverHandle}:{itemHandle}"); AdviseServerHandle = serverHandle; AdvisedItemHandle = itemHandle; AdviseThreadId = Environment.CurrentManagedThreadId; @@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}"); UnAdviseServerHandle = serverHandle; UnAdvisedItemHandle = itemHandle; UnAdviseThreadId = Environment.CurrentManagedThreadId; @@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}"); AdviseSupervisoryServerHandle = serverHandle; AdviseSupervisoryItemHandle = itemHandle; AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId; diff --git a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs index f4ee5a5..35032ce 100644 --- a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs +++ b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs @@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests Assert.Equal("correlation-1", reply.CorrelationId); } + [Fact] + public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish() + { + using StaRuntime runtime = CreateRuntime(); + runtime.Start(); + BlockingCommandExecutor executor = new(); + StaCommandDispatcher dispatcher = new(runtime, executor); + Task current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register)); + Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2))); + Task pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem)); + + dispatcher.RequestShutdown(); + MxCommandReply pendingReply = await pending; + executor.Release(); + MxCommandReply currentReply = await current; + + Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code); + Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code); + Assert.Equal(new[] { "current" }, executor.CorrelationIds); + } + [Fact] public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount() { diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index e9e408f..2a7dcbf 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -11,13 +11,26 @@ public sealed class WorkerPipeClient : IWorkerPipeClient public const int DefaultConnectTimeoutMilliseconds = 30000; private readonly int _connectTimeoutMilliseconds; + private readonly IWorkerLogger? _logger; public WorkerPipeClient() - : this(DefaultConnectTimeoutMilliseconds) + : this(null, DefaultConnectTimeoutMilliseconds) + { + } + + public WorkerPipeClient(IWorkerLogger? logger) + : this(logger, DefaultConnectTimeoutMilliseconds) { } public WorkerPipeClient(int connectTimeoutMilliseconds) + : this(null, connectTimeoutMilliseconds) + { + } + + public WorkerPipeClient( + IWorkerLogger? logger, + int connectTimeoutMilliseconds) { if (connectTimeoutMilliseconds <= 0) { @@ -27,6 +40,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient } _connectTimeoutMilliseconds = connectTimeoutMilliseconds; + _logger = logger; } public async Task RunAsync( @@ -48,8 +62,9 @@ public sealed class WorkerPipeClient : IWorkerPipeClient await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false); - WorkerPipeSession session = new(pipe, frameOptions); + WorkerPipeSession session = new(pipe, frameOptions, _logger); await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false); + await session.RunAsync(cancellationToken).ConfigureAwait(false); } private Task ConnectAsync( diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index d628faf..075262a 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -1,11 +1,14 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; +using MxGateway.Worker.Bootstrap; using MxGateway.Worker.MxAccess; +using MxGateway.Worker.Sta; namespace MxGateway.Worker.Ipc; @@ -13,19 +16,24 @@ public sealed class WorkerPipeSession { private readonly WorkerFrameProtocolOptions _options; private readonly Func _processIdProvider; + private readonly IWorkerLogger? _logger; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; private MxAccessStaSession? _mxAccessStaSession; private long _nextSequence; + private bool _shutdownCompleted; + private bool _shutdownTimedOut; public WorkerPipeSession( Stream stream, - WorkerFrameProtocolOptions options) + WorkerFrameProtocolOptions options, + IWorkerLogger? logger = null) : this( new WorkerFrameReader(stream, options), new WorkerFrameWriter(stream, options), options, - () => Process.GetCurrentProcess().Id) + () => Process.GetCurrentProcess().Id, + logger) { } @@ -33,12 +41,14 @@ public sealed class WorkerPipeSession WorkerFrameReader reader, WorkerFrameWriter writer, WorkerFrameProtocolOptions options, - Func processIdProvider) + Func processIdProvider, + IWorkerLogger? logger = null) { _reader = reader ?? throw new ArgumentNullException(nameof(reader)); _writer = writer ?? throw new ArgumentNullException(nameof(writer)); _options = options ?? throw new ArgumentNullException(nameof(options)); _processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider)); + _logger = logger; } public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default) @@ -95,6 +105,44 @@ public sealed class WorkerPipeSession } } + public async Task RunAsync(CancellationToken cancellationToken = default) + { + try + { + while (true) + { + WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); + switch (envelope.BodyCase) + { + case WorkerEnvelope.BodyOneofCase.WorkerCommand: + await HandleCommandAsync(envelope, cancellationToken).ConfigureAwait(false); + break; + case WorkerEnvelope.BodyOneofCase.WorkerShutdown: + await HandleShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false); + return; + case WorkerEnvelope.BodyOneofCase.WorkerCancel: + break; + default: + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody, + $"Worker received unexpected gateway envelope body {envelope.BodyCase} after startup."); + } + } + } + catch (WorkerFrameProtocolException exception) + { + await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false); + throw; + } + finally + { + if (!_shutdownCompleted && !_shutdownTimedOut) + { + _mxAccessStaSession?.Dispose(); + } + } + } + private void ValidateGatewayHello(WorkerEnvelope envelope) { if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello) @@ -178,6 +226,25 @@ public sealed class WorkerPipeSession } } + private async Task TryWriteFaultAsync( + WorkerFault fault, + CancellationToken cancellationToken) + { + try + { + await _writer + .WriteAsync(CreateEnvelope(fault), cancellationToken) + .ConfigureAwait(false); + } + catch (Exception faultWriteException) when ( + faultWriteException is IOException + || faultWriteException is ObjectDisposedException + || faultWriteException is WorkerFrameProtocolException) + { + // The shutdown timeout is the actionable error. + } + } + private WorkerEnvelope CreateEnvelope(WorkerHello hello) { return CreateBaseEnvelope(hello); @@ -193,6 +260,21 @@ public sealed class WorkerPipeSession return CreateBaseEnvelope(fault); } + private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply) + { + return CreateBaseEnvelope(reply); + } + + private WorkerEnvelope CreateEnvelope(WorkerEvent workerEvent) + { + return CreateBaseEnvelope(workerEvent); + } + + private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck) + { + return CreateBaseEnvelope(shutdownAck); + } + private WorkerEnvelope CreateBaseEnvelope(WorkerHello body) { WorkerEnvelope envelope = CreateBaseEnvelope(); @@ -214,6 +296,28 @@ public sealed class WorkerPipeSession return envelope; } + private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty; + envelope.WorkerCommandReply = body; + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerEvent body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerEvent = body.Clone(); + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerShutdownAck = body; + return envelope; + } + private WorkerEnvelope CreateBaseEnvelope() { return new WorkerEnvelope @@ -246,6 +350,75 @@ public sealed class WorkerPipeSession } } + private async Task HandleCommandAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + if (_mxAccessStaSession is null) + { + throw new InvalidOperationException("MXAccess STA session is not initialized."); + } + + StaCommand command = new( + _options.SessionId, + envelope.CorrelationId, + envelope.WorkerCommand.Command, + envelope.WorkerCommand.EnqueueTimestamp, + cancellationToken); + + MxCommandReply mxReply = await _mxAccessStaSession + .DispatchAsync(command) + .ConfigureAwait(false); + WorkerCommandReply reply = new() + { + Reply = mxReply, + CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }; + + await _writer.WriteAsync(CreateEnvelope(reply), cancellationToken).ConfigureAwait(false); + await DrainEventsAsync(cancellationToken).ConfigureAwait(false); + } + + private async Task HandleShutdownAsync( + WorkerShutdown shutdown, + CancellationToken cancellationToken) + { + TimeSpan gracePeriod = ResolveGracePeriod(shutdown); + try + { + MxAccessShutdownResult result = _mxAccessStaSession is null + ? new MxAccessShutdownResult(Array.Empty()) + : await _mxAccessStaSession + .ShutdownGracefullyAsync(gracePeriod, cancellationToken) + .ConfigureAwait(false); + + LogShutdownFailures(result.Failures); + await _writer + .WriteAsync(CreateEnvelope(CreateShutdownAck(result)), cancellationToken) + .ConfigureAwait(false); + _shutdownCompleted = true; + } + catch (TimeoutException exception) + { + _shutdownTimedOut = true; + await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false); + throw; + } + } + + private async Task DrainEventsAsync(CancellationToken cancellationToken) + { + if (_mxAccessStaSession is null) + { + return; + } + + foreach (WorkerEvent workerEvent in _mxAccessStaSession.DrainEvents(maxEvents: 0)) + { + await _writer.WriteAsync(CreateEnvelope(workerEvent), cancellationToken).ConfigureAwait(false); + } + } + private WorkerReady CreateWorkerReady() { return new WorkerReady @@ -257,6 +430,49 @@ public sealed class WorkerPipeSession }; } + private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown) + { + if (shutdown.GracePeriod is null) + { + return TimeSpan.FromSeconds(10); + } + + TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan(); + return gracePeriod <= TimeSpan.Zero + ? TimeSpan.FromSeconds(10) + : gracePeriod; + } + + private static WorkerShutdownAck CreateShutdownAck(MxAccessShutdownResult result) + { + return new WorkerShutdownAck + { + Status = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = result.Succeeded + ? "Graceful shutdown completed." + : $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).", + }, + }; + } + + private void LogShutdownFailures(IReadOnlyList failures) + { + foreach (MxAccessShutdownFailure failure in failures) + { + _logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary + { + ["session_id"] = _options.SessionId, + ["operation"] = failure.Operation, + ["server_handle"] = failure.ServerHandle, + ["item_handle"] = failure.ItemHandle, + ["exception_type"] = failure.ExceptionType, + ["hresult"] = failure.HResult, + }); + } + } + private static WorkerFault CreateFault(WorkerFrameProtocolException exception) { return new WorkerFault @@ -295,6 +511,22 @@ public sealed class WorkerPipeSession return fault; } + private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception) + { + string message = exception.Message; + return new WorkerFault + { + Category = WorkerFaultCategory.ShutdownTimeout, + ExceptionType = exception.GetType().FullName ?? string.Empty, + DiagnosticMessage = message, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = message, + }, + }; + } + private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode) { return errorCode switch diff --git a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs index b96bd6f..5b7ab53 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; @@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable MxAccessAdviceKind.Supervisory); } + public MxAccessShutdownResult ShutdownGracefully() + { + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + List failures = new(); + + CleanupAdviceHandles(failures); + CleanupItemHandles(failures); + CleanupServerHandles(failures); + DisposeCore(failures); + + return new MxAccessShutdownResult(failures); + } + public void Dispose() { if (disposed) @@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable return; } - eventSink.Detach(); + DisposeCore(failures: null); + } - if (Marshal.IsComObject(mxAccessComObject)) + private void CleanupAdviceHandles(ICollection failures) + { + HashSet cleanedPairs = new(); + foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles) { - Marshal.FinalReleaseComObject(mxAccessComObject); + long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + if (!cleanedPairs.Add(key)) + { + continue; + } + + try + { + mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(UnAdvise), + adviceHandle.ServerHandle, + adviceHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupItemHandles(ICollection failures) + { + foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles) + { + try + { + mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle); + handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(RemoveItem), + itemHandle.ServerHandle, + itemHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupServerHandles(ICollection failures) + { + foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles) + { + try + { + mxAccessServer.Unregister(serverHandle.ServerHandle); + handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(Unregister), + serverHandle.ServerHandle, + itemHandle: null, + exception)); + } + } + } + + private static long CreateItemKey( + int serverHandle, + int itemHandle) + { + return ((long)serverHandle << 32) | (uint)itemHandle; + } + + private void DisposeCore(ICollection? failures) + { + try + { + eventSink.Detach(); + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "DetachEvents", + serverHandle: null, + itemHandle: null, + exception)); + } + + try + { + if (Marshal.IsComObject(mxAccessComObject)) + { + Marshal.FinalReleaseComObject(mxAccessComObject); + } + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "ReleaseComObject", + serverHandle: null, + itemHandle: null, + exception)); } disposed = true; diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs new file mode 100644 index 0000000..f4891f7 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs @@ -0,0 +1,34 @@ +using System; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownFailure +{ + public MxAccessShutdownFailure( + string operation, + int? serverHandle, + int? itemHandle, + Exception exception) + { + if (string.IsNullOrWhiteSpace(operation)) + { + throw new ArgumentException("Shutdown failure operation is required.", nameof(operation)); + } + + Operation = operation; + ServerHandle = serverHandle; + ItemHandle = itemHandle; + ExceptionType = exception?.GetType().FullName ?? string.Empty; + HResult = exception?.HResult; + } + + public string Operation { get; } + + public int? ServerHandle { get; } + + public int? ItemHandle { get; } + + public string ExceptionType { get; } + + public int? HResult { get; } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs new file mode 100644 index 0000000..683e65f --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownResult +{ + public MxAccessShutdownResult(IReadOnlyList failures) + { + Failures = failures ?? throw new ArgumentNullException(nameof(failures)); + } + + public IReadOnlyList Failures { get; } + + public bool Succeeded => Failures.Count == 0; +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index f81857b..2b6ed9c 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -141,6 +142,61 @@ public sealed class MxAccessStaSession : IDisposable cancellationToken); } + public async Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (timeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(timeout), + "MXAccess graceful shutdown timeout must be greater than zero."); + } + + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + commandDispatcher?.RequestShutdown(); + + Stopwatch stopwatch = Stopwatch.StartNew(); + MxAccessShutdownResult result; + if (session is null) + { + result = new MxAccessShutdownResult(Array.Empty()); + } + else + { + using CancellationTokenSource shutdownCancellation = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + shutdownCancellation.CancelAfter(timeout); + + Task cleanupTask = staRuntime.InvokeAsync( + () => session.ShutdownGracefully(), + shutdownCancellation.Token); + Task delayTask = Task.Delay(timeout, cancellationToken); + Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false); + if (completedTask != cleanupTask) + { + cancellationToken.ThrowIfCancellationRequested(); + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + result = await cleanupTask.ConfigureAwait(false); + } + + TimeSpan remaining = timeout - stopwatch.Elapsed; + if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining)) + { + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + staRuntime.Dispose(); + disposed = true; + return result; + } + public void Dispose() { if (disposed) diff --git a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs index 3df663e..361cccb 100644 --- a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs +++ b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs @@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher lock (gate) { shutdownRequested = true; + while (commandQueue.Count > 0) + { + QueuedStaCommand queuedCommand = commandQueue.Dequeue(); + queuedCommand.Complete(CreateRejectedReply( + queuedCommand.Command, + ProtocolStatusCode.WorkerUnavailable, + "The STA command dispatcher is shutting down.")); + } } } diff --git a/src/MxGateway.Worker/WorkerApplication.cs b/src/MxGateway.Worker/WorkerApplication.cs index 3bacc28..ec73acb 100644 --- a/src/MxGateway.Worker/WorkerApplication.cs +++ b/src/MxGateway.Worker/WorkerApplication.cs @@ -13,8 +13,7 @@ public static class WorkerApplication return Run( args, new EnvironmentVariableWorkerEnvironment(), - new WorkerConsoleLogger(Console.Error), - new WorkerPipeClient()); + new WorkerConsoleLogger(Console.Error)); } public static int Run( @@ -26,7 +25,7 @@ public static class WorkerApplication args, environment, logger, - new WorkerPipeClient()); + new WorkerPipeClient(logger)); } public static int Run( From 01d6c33156a610ee77d9d8b22666ed1f514601e9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:45:43 -0400 Subject: [PATCH 2/2] Implement .NET gateway client sessions --- .../FakeGatewayTransport.cs | 86 ++++++ .../MxGatewayClientSessionTests.cs | 190 +++++++++++++ .../GrpcMxGatewayClientTransport.cs | 68 +++++ .../IMxGatewayClientTransport.cs | 27 ++ .../MxGateway.Client/MxGatewayClient.cs | 116 +++++++- .../MxGateway.Client/MxGatewaySession.cs | 249 ++++++++++++++++++ .../Properties/AssemblyInfo.cs | 3 + clients/dotnet/README.md | 45 +++- 8 files changed, 775 insertions(+), 9 deletions(-) create mode 100644 clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs create mode 100644 clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs create mode 100644 clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs create mode 100644 clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs create mode 100644 clients/dotnet/MxGateway.Client/MxGatewaySession.cs create mode 100644 clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs diff --git a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs new file mode 100644 index 0000000..2128fda --- /dev/null +++ b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs @@ -0,0 +1,86 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client.Tests; + +internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport +{ + private readonly Queue _invokeReplies = new(); + private readonly List _events = []; + + public MxGatewayClientOptions Options { get; } = options; + + public MxAccessGateway.MxAccessGatewayClient? RawClient => null; + + public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = []; + + public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = []; + + public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = []; + + public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = []; + + public OpenSessionReply OpenSessionReply { get; set; } = new() + { + SessionId = "session-fixture", + BackendName = "mxaccess-worker", + GatewayProtocolVersion = 1, + WorkerProtocolVersion = 1, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }; + + public CloseSessionReply CloseSessionReply { get; set; } = new() + { + SessionId = "session-fixture", + FinalState = SessionState.Closed, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }; + + public Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions) + { + OpenSessionCalls.Add((request, callOptions)); + return Task.FromResult(OpenSessionReply); + } + + public Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions) + { + CloseSessionCalls.Add((request, callOptions)); + return Task.FromResult(CloseSessionReply); + } + + public Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions) + { + InvokeCalls.Add((request, callOptions)); + return Task.FromResult(_invokeReplies.Dequeue()); + } + + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions) + { + StreamEventsCalls.Add((request, callOptions)); + + foreach (MxEvent gatewayEvent in _events) + { + callOptions.CancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return gatewayEvent; + } + } + + public void AddInvokeReply(MxCommandReply reply) + { + _invokeReplies.Enqueue(reply); + } + + public void AddEvent(MxEvent gatewayEvent) + { + _events.Add(gatewayEvent); + } +} diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs new file mode 100644 index 0000000..3248c51 --- /dev/null +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs @@ -0,0 +1,190 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client.Tests; + +public sealed class MxGatewayClientSessionTests +{ + [Fact] + public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation() + { + using CancellationTokenSource cancellation = new(); + FakeGatewayTransport transport = CreateTransport(); + await using MxGatewayClient client = CreateClient(transport); + + await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token); + + var call = Assert.Single(transport.OpenSessionCalls); + Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization")); + Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken); + } + + [Fact] + public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply() + { + FakeGatewayTransport transport = CreateTransport(); + transport.OpenSessionReply.WorkerProcessId = 1234; + await using MxGatewayClient client = CreateClient(transport); + + MxGatewaySession session = await client.OpenSessionAsync(); + + Assert.Equal("session-fixture", session.SessionId); + Assert.Same(transport.OpenSessionReply, session.OpenSessionReply); + Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId); + } + + [Fact] + public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Register, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + Register = new RegisterReply { ServerHandle = 12 }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + int serverHandle = await session.RegisterAsync("fixture-client"); + + Assert.Equal(12, serverHandle); + var call = Assert.Single(transport.InvokeCalls); + Assert.Equal("session-fixture", call.Request.SessionId); + Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId)); + Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind); + Assert.Equal("fixture-client", call.Request.Command.Register.ClientName); + } + + [Fact] + public async Task AddItem2Async_BuildsAddItem2CommandWithContext() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.AddItem2, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + AddItem2 = new AddItem2Reply { ItemHandle = 34 }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime"); + + Assert.Equal(34, itemHandle); + MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; + Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind); + Assert.Equal(12, request.Command.AddItem2.ServerHandle); + Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition); + Assert.Equal("runtime", request.Command.AddItem2.ItemContext); + } + + [Fact] + public async Task WriteRawAsync_BuildsWriteCommandWithRawValue() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Write, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + MxValue value = new() + { + DataType = MxDataType.Integer, + VariantType = "VT_I4", + Int32Value = 123, + }; + + MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56); + + Assert.Equal(MxCommandKind.Write, reply.Kind); + MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; + Assert.Equal(MxCommandKind.Write, request.Command.Kind); + Assert.Equal(12, request.Command.Write.ServerHandle); + Assert.Equal(34, request.Command.Write.ItemHandle); + Assert.Same(value, request.Command.Write.Value); + Assert.Equal(56, request.Command.Write.UserId); + } + + [Fact] + public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddEvent(new MxEvent + { + SessionId = "session-fixture", + Family = MxEventFamily.OnDataChange, + WorkerSequence = 1, + }); + transport.AddEvent(new MxEvent + { + SessionId = "session-fixture", + Family = MxEventFamily.OnWriteComplete, + WorkerSequence = 2, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + List sequences = []; + + await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0)) + { + sequences.Add(gatewayEvent.WorkerSequence); + } + + Assert.Equal([1UL, 2UL], sequences); + StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request; + Assert.Equal("session-fixture", request.SessionId); + } + + [Fact] + public async Task CloseAsync_IsExplicitAndIdempotent() + { + FakeGatewayTransport transport = CreateTransport(); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + CloseSessionReply first = await session.CloseAsync(); + CloseSessionReply second = await session.CloseAsync(); + + Assert.Same(first, second); + var call = Assert.Single(transport.CloseSessionCalls); + Assert.Equal("session-fixture", call.Request.SessionId); + } + + [Fact] + public async Task InvokeHelpers_PassCancellationTokenToTransport() + { + using CancellationTokenSource cancellation = new(); + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Advise, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + await session.AdviseAsync(12, 34, cancellation.Token); + + Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken); + } + + private static MxGatewayClient CreateClient(FakeGatewayTransport transport) + { + return new MxGatewayClient(transport.Options, transport); + } + + private static FakeGatewayTransport CreateTransport() + { + return new FakeGatewayTransport(new MxGatewayClientOptions + { + Endpoint = new Uri("http://localhost:5000"), + ApiKey = "test-api-key", + }); + } +} diff --git a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs new file mode 100644 index 0000000..3bc9e85 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs @@ -0,0 +1,68 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +internal sealed class GrpcMxGatewayClientTransport( + MxGatewayClientOptions options, + MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport +{ + public MxGatewayClientOptions Options { get; } = options; + + public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient; + + MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient; + + public async Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions) + { + return await RawClient.OpenSessionAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions) + { + return await RawClient.CloseSessionAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions) + { + return await RawClient.InvokeAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled + ? cancellationToken + : callOptions.CancellationToken; + + using AsyncServerStreamingCall call = RawClient.StreamEvents(request, callOptions); + + await foreach (MxEvent gatewayEvent in call.ResponseStream + .ReadAllAsync(effectiveCancellationToken) + .ConfigureAwait(false)) + { + yield return gatewayEvent; + } + } + + IAsyncEnumerable IMxGatewayClientTransport.StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions) + { + return StreamEventsAsync(request, callOptions); + } +} diff --git a/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs new file mode 100644 index 0000000..77586c6 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs @@ -0,0 +1,27 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +internal interface IMxGatewayClientTransport +{ + MxGatewayClientOptions Options { get; } + + MxAccessGateway.MxAccessGatewayClient? RawClient { get; } + + Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions); + + Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions); + + Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions); + + IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions); +} diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs index efa8ce8..d87b40c 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs @@ -1,22 +1,44 @@ +using Grpc.Core; using Grpc.Net.Client; using MxGateway.Contracts.Proto; namespace MxGateway.Client; /// -/// Provides the initial .NET client entry point and raw generated gRPC client. +/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API. /// public sealed class MxGatewayClient : IAsyncDisposable { private readonly GrpcChannel _channel; + private readonly IMxGatewayClientTransport _transport; + private bool _disposed; - private MxGatewayClient(GrpcChannel channel) + internal MxGatewayClient( + MxGatewayClientOptions options, + IMxGatewayClientTransport transport) { - _channel = channel; - RawClient = new MxAccessGateway.MxAccessGatewayClient(channel); + ArgumentNullException.ThrowIfNull(options); + options.Validate(); + + Options = options; + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _channel = null!; } - public MxAccessGateway.MxAccessGatewayClient RawClient { get; } + private MxGatewayClient( + GrpcChannel channel, + IMxGatewayClientTransport transport) + { + _channel = channel; + _transport = transport; + Options = transport.Options; + } + + public MxGatewayClientOptions Options { get; } + + public MxAccessGateway.MxAccessGatewayClient RawClient => + _transport.RawClient + ?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance."); public static MxGatewayClient Create(MxGatewayClientOptions options) { @@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable LoggerFactory = options.LoggerFactory, }); - return new MxGatewayClient(channel); + return new MxGatewayClient( + channel, + new GrpcMxGatewayClientTransport( + options, + new MxAccessGateway.MxAccessGatewayClient(channel))); + } + + public async Task OpenSessionAsync( + OpenSessionRequest? request = null, + CancellationToken cancellationToken = default) + { + OpenSessionReply reply = await OpenSessionRawAsync( + request ?? new OpenSessionRequest(), + cancellationToken) + .ConfigureAwait(false); + + return new MxGatewaySession(this, reply); + } + + public Task OpenSessionRawAsync( + OpenSessionRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken)); + } + + public Task CloseSessionRawAsync( + CloseSessionRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken)); + } + + public Task InvokeAsync( + MxCommandRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken)); + } + + public IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken)); } public ValueTask DisposeAsync() { - _channel.Dispose(); + if (_disposed) + { + return ValueTask.CompletedTask; + } + + _disposed = true; + _channel?.Dispose(); return ValueTask.CompletedTask; } + + internal CallOptions CreateCallOptions(CancellationToken cancellationToken) + { + Metadata headers = new() + { + { "authorization", $"Bearer {Options.ApiKey}" }, + }; + + return new CallOptions( + headers, + DateTime.UtcNow.Add(Options.DefaultCallTimeout), + cancellationToken); + } + + private void ThrowIfDisposed() + { + ObjectDisposedException.ThrowIf(_disposed, this); + } } diff --git a/clients/dotnet/MxGateway.Client/MxGatewaySession.cs b/clients/dotnet/MxGateway.Client/MxGatewaySession.cs new file mode 100644 index 0000000..2081593 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/MxGatewaySession.cs @@ -0,0 +1,249 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +/// +/// Represents one gateway-backed MXAccess session. +/// +public sealed class MxGatewaySession : IAsyncDisposable +{ + private readonly MxGatewayClient _client; + private readonly SemaphoreSlim _closeLock = new(1, 1); + private CloseSessionReply? _closeReply; + + internal MxGatewaySession( + MxGatewayClient client, + OpenSessionReply openSessionReply) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply)); + } + + public string SessionId => OpenSessionReply.SessionId; + + public OpenSessionReply OpenSessionReply { get; } + + public async Task CloseAsync(CancellationToken cancellationToken = default) + { + if (_closeReply is not null) + { + return _closeReply; + } + + await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (_closeReply is not null) + { + return _closeReply; + } + + _closeReply = await _client.CloseSessionRawAsync( + new CloseSessionRequest { SessionId = SessionId }, + cancellationToken) + .ConfigureAwait(false); + return _closeReply; + } + finally + { + _closeLock.Release(); + } + } + + public async Task RegisterAsync( + string clientName, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken) + .ConfigureAwait(false); + return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value; + } + + public Task RegisterRawAsync( + string clientName, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clientName); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = clientName }, + }, + cancellationToken); + } + + public async Task AddItemAsync( + int serverHandle, + string itemDefinition, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await AddItemRawAsync( + serverHandle, + itemDefinition, + cancellationToken) + .ConfigureAwait(false); + return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value; + } + + public Task AddItemRawAsync( + int serverHandle, + string itemDefinition, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = serverHandle, + ItemDefinition = itemDefinition, + }, + }, + cancellationToken); + } + + public async Task AddItem2Async( + int serverHandle, + string itemDefinition, + string itemContext, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await AddItem2RawAsync( + serverHandle, + itemDefinition, + itemContext, + cancellationToken) + .ConfigureAwait(false); + return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value; + } + + public Task AddItem2RawAsync( + int serverHandle, + string itemDefinition, + string itemContext, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.AddItem2, + AddItem2 = new AddItem2Command + { + ServerHandle = serverHandle, + ItemDefinition = itemDefinition, + ItemContext = itemContext ?? string.Empty, + }, + }, + cancellationToken); + } + + public async Task AdviseAsync( + int serverHandle, + int itemHandle, + CancellationToken cancellationToken = default) + { + await AdviseRawAsync(serverHandle, itemHandle, cancellationToken) + .ConfigureAwait(false); + } + + public Task AdviseRawAsync( + int serverHandle, + int itemHandle, + CancellationToken cancellationToken = default) + { + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Advise, + Advise = new AdviseCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + }, + }, + cancellationToken); + } + + public async Task WriteAsync( + int serverHandle, + int itemHandle, + MxValue value, + int userId, + CancellationToken cancellationToken = default) + { + await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken) + .ConfigureAwait(false); + } + + public Task WriteRawAsync( + int serverHandle, + int itemHandle, + MxValue value, + int userId, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(value); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Write, + Write = new WriteCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + Value = value, + UserId = userId, + }, + }, + cancellationToken); + } + + public Task InvokeAsync( + MxCommandRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + return _client.InvokeAsync(request, cancellationToken); + } + + public IAsyncEnumerable StreamEventsAsync( + ulong afterWorkerSequence = 0, + CancellationToken cancellationToken = default) + { + return _client.StreamEventsAsync( + new StreamEventsRequest + { + SessionId = SessionId, + AfterWorkerSequence = afterWorkerSequence, + }, + cancellationToken); + } + + public async ValueTask DisposeAsync() + { + await CloseAsync().ConfigureAwait(false); + _closeLock.Dispose(); + } + + private Task InvokeCommandAsync( + MxCommand command, + CancellationToken cancellationToken) + { + return _client.InvokeAsync( + new MxCommandRequest + { + SessionId = SessionId, + ClientCorrelationId = Guid.NewGuid().ToString("N"), + Command = command, + }, + cancellationToken); + } +} diff --git a/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs b/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..3be2a8b --- /dev/null +++ b/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("MxGateway.Client.Tests")] diff --git a/clients/dotnet/README.md b/clients/dotnet/README.md index a606185..d77e0c6 100644 --- a/clients/dotnet/README.md +++ b/clients/dotnet/README.md @@ -7,9 +7,9 @@ CLI, and unit tests. | Project | Purpose | |---------|---------| -| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. | +| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. | | `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. | -| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. | +| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. | The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so the client compiles against the same generated protobuf and gRPC types as the @@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation. dotnet build clients/dotnet/MxGateway.Client.sln dotnet test clients/dotnet/MxGateway.Client.sln --no-build ``` + +## Client Usage + +`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key +to every unary and streaming call as `authorization: Bearer `. +Cancellation tokens passed to the public methods flow to the generated gRPC +call. Client-side cancellation stops waiting for the gateway response; it does +not abort an MXAccess COM call that is already executing inside a worker. + +```csharp +await using MxGatewayClient client = MxGatewayClient.Create( + new MxGatewayClientOptions + { + Endpoint = new Uri("http://localhost:5000"), + ApiKey = apiKey, + }); + +MxGatewaySession session = await client.OpenSessionAsync(); +try +{ + int serverHandle = await session.RegisterAsync("sample-client"); + int itemHandle = await session.AddItemAsync( + serverHandle, + "Area001.Pump001.Speed"); + + await session.AdviseAsync(serverHandle, itemHandle); +} +finally +{ + await session.CloseAsync(); +} +``` + +Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and +`StreamEventsAsync` when tests or parity tools need direct generated protobuf +messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply +available, and command helpers have `*RawAsync` variants when callers need the +complete `MxCommandReply`. + +`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return +the first `CloseSessionReply` instead of sending another close request.