From d890eff86270d5c0837fbfc6d1b2824ccb82aa0e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:36:22 -0400 Subject: [PATCH] Implement graceful worker shutdown --- docs/gateway-process-design.md | 11 + docs/mxaccess-worker-instance-design.md | 30 +++ src/MxGateway.Server/Workers/WorkerClient.cs | 12 + .../Gateway/Sessions/SessionManagerTests.cs | 27 ++ .../Ipc/WorkerPipeSessionTests.cs | 39 +++ .../MxAccess/MxAccessCommandExecutorTests.cs | 64 +++++ .../Sta/StaCommandDispatcherTests.cs | 21 ++ src/MxGateway.Worker/Ipc/WorkerPipeClient.cs | 19 +- src/MxGateway.Worker/Ipc/WorkerPipeSession.cs | 238 +++++++++++++++++- .../MxAccess/MxAccessSession.cs | 125 ++++++++- .../MxAccess/MxAccessShutdownFailure.cs | 34 +++ .../MxAccess/MxAccessShutdownResult.cs | 16 ++ .../MxAccess/MxAccessStaSession.cs | 56 +++++ .../Sta/StaCommandDispatcher.cs | 8 + src/MxGateway.Worker/WorkerApplication.cs | 5 +- 15 files changed, 694 insertions(+), 11 deletions(-) create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 8bcaef8..ec69253 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -175,6 +175,12 @@ Behavior: `CloseSession` should be idempotent. Closing an already closed session should return a successful close result with the final known state. +`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read, +write, and heartbeat loops to stop, and waits for the launched worker process to +exit within the same shutdown timeout. If the pipe loops or process exit exceed +the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession` +then kills the worker process tree before surfacing the close failure. + ### Invoke `Invoke` forwards one MXAccess command to the worker that owns the session. @@ -515,6 +521,11 @@ It handles: The write loop should fail the session if a pipe write fails outside normal shutdown. +During shutdown the worker client treats `WorkerShutdownAck` as the protocol +close signal, but the process handle remains authoritative for process lifetime. +The client waits for both the protocol close and process exit before reporting a +clean shutdown to `GatewaySession`. + ## Command Correlation Each command gets: diff --git a/docs/mxaccess-worker-instance-design.md b/docs/mxaccess-worker-instance-design.md index 12c9322..09e92d4 100644 --- a/docs/mxaccess-worker-instance-design.md +++ b/docs/mxaccess-worker-instance-design.md @@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with: when the exception exposes one, and does not send `WorkerReady` after a failed COM creation attempt. +After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for +the lifetime of the process. `WorkerCommand` frames are dispatched to +`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued +worker events are drained after command replies. `WorkerShutdown` starts the +graceful shutdown path and returns `WorkerShutdownAck` only after the STA +cleanup path completes. + ## Event Sink The worker must subscribe to every public MXAccess event family: @@ -663,6 +670,29 @@ Graceful shutdown sequence: If shutdown wedges, the gateway kills the process. The worker should be written so process kill does not corrupt other sessions. +`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup +path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands +are rejected and queued commands that have not started receive +`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the +STA is allowed to finish until the shutdown grace period expires. + +After command dispatch is closed, cleanup runs on the STA in MXAccess handle +order: + +1. one `UnAdvise` call per advised server/item pair, +2. `RemoveItem` for active item handles, +3. `Unregister` for active server handles, +4. event sink detach, +5. COM release. + +Each cleanup call is best effort. A failed cleanup operation is recorded as an +`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent +later cleanup calls from running. A shutdown with cleanup failures still returns +`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the +controlled release path. If the grace period expires before cleanup can run or +finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible +and relies on the gateway to kill the process. + ## Fault Handling Worker fault categories: diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 18b8cb6..697eb05 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient try { await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false); + await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false); MarkClosed("shutdown"); } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) @@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false); } + private async Task WaitForProcessExitAsync(CancellationToken cancellationToken) + { + WorkerProcessHandle? processHandle = _connection.ProcessHandle; + if (processHandle is null || processHandle.Process.HasExited) + { + return; + } + + await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); + } + private void ThrowIfDisposed() { ObjectDisposedException.ThrowIf(_disposed, this); diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 9fe3611..eece91a 100644 --- a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -86,6 +86,26 @@ public sealed class SessionManagerTests Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } + [Fact] + public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker() + { + FakeWorkerClient workerClient = new() + { + ShutdownException = new WorkerClientException( + WorkerClientErrorCode.ShutdownTimeout, + "Worker shutdown timed out."), + }; + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); + + Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); + Assert.Equal(1, workerClient.ShutdownCount); + Assert.Equal(1, workerClient.KillCount); + } + [Fact] public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() { @@ -266,6 +286,8 @@ public sealed class SessionManagerTests public int KillCount { get; private set; } + public Exception? ShutdownException { get; init; } + public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; @@ -302,6 +324,11 @@ public sealed class SessionManagerTests CancellationToken cancellationToken) { ShutdownCount++; + if (ShutdownException is not null) + { + throw ShutdownException; + } + State = WorkerClientState.Closed; return Task.CompletedTask; } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index e1670d7..1e5ee5a 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.IO; using System.Runtime.InteropServices; @@ -147,6 +148,29 @@ public sealed class WorkerPipeSessionTests Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code); } + [Fact] + public async Task RunAsync_WithWorkerShutdown_WritesShutdownAckAndReturns() + { + WorkerFrameProtocolOptions options = CreateOptions(); + MemoryStream inbound = new(); + WorkerFrameWriter inboundWriter = new(inbound, options); + await inboundWriter.WriteAsync(CreateGatewayHelloEnvelope()); + await inboundWriter.WriteAsync(CreateWorkerShutdownEnvelope()); + inbound.Position = 0; + MemoryStream outbound = new(); + WorkerPipeSession session = CreateSession(inbound, outbound, options); + + await session.CompleteStartupHandshakeAsync(_ => Task.CompletedTask); + await session.RunAsync(); + + WorkerEnvelope[] written = ReadWrittenFrames(outbound, options); + Assert.Equal(3, written.Length); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, written[1].BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, written[2].BodyCase); + Assert.Equal(ProtocolStatusCode.Ok, written[2].WorkerShutdownAck.Status.Code); + } + private static WorkerPipeSession CreateSession( Stream inbound, Stream outbound, @@ -185,6 +209,21 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEnvelope CreateWorkerShutdownEnvelope() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = 2, + WorkerShutdown = new WorkerShutdown + { + GracePeriod = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Reason = "test-shutdown", + }, + }; + } + private static WorkerEnvelope[] ReadWrittenFrames( MemoryStream stream, WorkerFrameProtocolOptions options) diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs index 61bdb7e..4e8b355 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind); } + [Fact] + public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder() + { + FakeMxAccessComObject fakeComObject = new( + registerHandle: 58, + addItemHandle: 510); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510)); + await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.True(result.Succeeded); + Assert.Equal( + new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" }, + fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal) + || name.StartsWith("Remove", StringComparison.Ordinal))); + } + + [Fact] + public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues() + { + const int hresult = unchecked((int)0x80070057); + COMException cleanupException = new("Invalid handle.", hresult); + FakeMxAccessComObject fakeComObject = new( + registerHandle: 59, + addItemHandle: 511, + unregisterException: cleanupException, + removeItemException: cleanupException, + unAdviseException: cleanupException); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.False(result.Succeeded); + Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation)); + Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult)); + Assert.Contains("Unregister:59", fakeComObject.OperationNames); + } + [Fact] public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest() { @@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests private readonly Exception? adviseException; private readonly Exception? unAdviseException; private readonly Exception? adviseSupervisoryException; + private readonly List operationNames = new(); public FakeMxAccessComObject( int registerHandle, @@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests public int? AdviseSupervisoryThreadId { get; private set; } + public IReadOnlyList OperationNames => operationNames.ToArray(); + public int Register(string clientName) { + operationNames.Add($"Register:{clientName}"); RegisteredClientName = clientName; RegisterThreadId = Environment.CurrentManagedThreadId; @@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests public void Unregister(int serverHandle) { + operationNames.Add($"Unregister:{serverHandle}"); UnregisteredServerHandle = serverHandle; UnregisterThreadId = Environment.CurrentManagedThreadId; @@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, string itemDefinition) { + operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}"); AddItemServerHandle = serverHandle; AddItemDefinition = itemDefinition; AddItemThreadId = Environment.CurrentManagedThreadId; @@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests string itemDefinition, string itemContext) { + operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}"); AddItem2ServerHandle = serverHandle; AddItem2Definition = itemDefinition; AddItem2Context = itemContext; @@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}"); RemoveItemServerHandle = serverHandle; RemovedItemHandle = itemHandle; RemoveItemThreadId = Environment.CurrentManagedThreadId; @@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"Advise:{serverHandle}:{itemHandle}"); AdviseServerHandle = serverHandle; AdvisedItemHandle = itemHandle; AdviseThreadId = Environment.CurrentManagedThreadId; @@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}"); UnAdviseServerHandle = serverHandle; UnAdvisedItemHandle = itemHandle; UnAdviseThreadId = Environment.CurrentManagedThreadId; @@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}"); AdviseSupervisoryServerHandle = serverHandle; AdviseSupervisoryItemHandle = itemHandle; AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId; diff --git a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs index f4ee5a5..35032ce 100644 --- a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs +++ b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs @@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests Assert.Equal("correlation-1", reply.CorrelationId); } + [Fact] + public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish() + { + using StaRuntime runtime = CreateRuntime(); + runtime.Start(); + BlockingCommandExecutor executor = new(); + StaCommandDispatcher dispatcher = new(runtime, executor); + Task current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register)); + Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2))); + Task pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem)); + + dispatcher.RequestShutdown(); + MxCommandReply pendingReply = await pending; + executor.Release(); + MxCommandReply currentReply = await current; + + Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code); + Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code); + Assert.Equal(new[] { "current" }, executor.CorrelationIds); + } + [Fact] public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount() { diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index e9e408f..2a7dcbf 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -11,13 +11,26 @@ public sealed class WorkerPipeClient : IWorkerPipeClient public const int DefaultConnectTimeoutMilliseconds = 30000; private readonly int _connectTimeoutMilliseconds; + private readonly IWorkerLogger? _logger; public WorkerPipeClient() - : this(DefaultConnectTimeoutMilliseconds) + : this(null, DefaultConnectTimeoutMilliseconds) + { + } + + public WorkerPipeClient(IWorkerLogger? logger) + : this(logger, DefaultConnectTimeoutMilliseconds) { } public WorkerPipeClient(int connectTimeoutMilliseconds) + : this(null, connectTimeoutMilliseconds) + { + } + + public WorkerPipeClient( + IWorkerLogger? logger, + int connectTimeoutMilliseconds) { if (connectTimeoutMilliseconds <= 0) { @@ -27,6 +40,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient } _connectTimeoutMilliseconds = connectTimeoutMilliseconds; + _logger = logger; } public async Task RunAsync( @@ -48,8 +62,9 @@ public sealed class WorkerPipeClient : IWorkerPipeClient await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false); - WorkerPipeSession session = new(pipe, frameOptions); + WorkerPipeSession session = new(pipe, frameOptions, _logger); await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false); + await session.RunAsync(cancellationToken).ConfigureAwait(false); } private Task ConnectAsync( diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index d628faf..075262a 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -1,11 +1,14 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; +using MxGateway.Worker.Bootstrap; using MxGateway.Worker.MxAccess; +using MxGateway.Worker.Sta; namespace MxGateway.Worker.Ipc; @@ -13,19 +16,24 @@ public sealed class WorkerPipeSession { private readonly WorkerFrameProtocolOptions _options; private readonly Func _processIdProvider; + private readonly IWorkerLogger? _logger; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; private MxAccessStaSession? _mxAccessStaSession; private long _nextSequence; + private bool _shutdownCompleted; + private bool _shutdownTimedOut; public WorkerPipeSession( Stream stream, - WorkerFrameProtocolOptions options) + WorkerFrameProtocolOptions options, + IWorkerLogger? logger = null) : this( new WorkerFrameReader(stream, options), new WorkerFrameWriter(stream, options), options, - () => Process.GetCurrentProcess().Id) + () => Process.GetCurrentProcess().Id, + logger) { } @@ -33,12 +41,14 @@ public sealed class WorkerPipeSession WorkerFrameReader reader, WorkerFrameWriter writer, WorkerFrameProtocolOptions options, - Func processIdProvider) + Func processIdProvider, + IWorkerLogger? logger = null) { _reader = reader ?? throw new ArgumentNullException(nameof(reader)); _writer = writer ?? throw new ArgumentNullException(nameof(writer)); _options = options ?? throw new ArgumentNullException(nameof(options)); _processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider)); + _logger = logger; } public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default) @@ -95,6 +105,44 @@ public sealed class WorkerPipeSession } } + public async Task RunAsync(CancellationToken cancellationToken = default) + { + try + { + while (true) + { + WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); + switch (envelope.BodyCase) + { + case WorkerEnvelope.BodyOneofCase.WorkerCommand: + await HandleCommandAsync(envelope, cancellationToken).ConfigureAwait(false); + break; + case WorkerEnvelope.BodyOneofCase.WorkerShutdown: + await HandleShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false); + return; + case WorkerEnvelope.BodyOneofCase.WorkerCancel: + break; + default: + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody, + $"Worker received unexpected gateway envelope body {envelope.BodyCase} after startup."); + } + } + } + catch (WorkerFrameProtocolException exception) + { + await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false); + throw; + } + finally + { + if (!_shutdownCompleted && !_shutdownTimedOut) + { + _mxAccessStaSession?.Dispose(); + } + } + } + private void ValidateGatewayHello(WorkerEnvelope envelope) { if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello) @@ -178,6 +226,25 @@ public sealed class WorkerPipeSession } } + private async Task TryWriteFaultAsync( + WorkerFault fault, + CancellationToken cancellationToken) + { + try + { + await _writer + .WriteAsync(CreateEnvelope(fault), cancellationToken) + .ConfigureAwait(false); + } + catch (Exception faultWriteException) when ( + faultWriteException is IOException + || faultWriteException is ObjectDisposedException + || faultWriteException is WorkerFrameProtocolException) + { + // The shutdown timeout is the actionable error. + } + } + private WorkerEnvelope CreateEnvelope(WorkerHello hello) { return CreateBaseEnvelope(hello); @@ -193,6 +260,21 @@ public sealed class WorkerPipeSession return CreateBaseEnvelope(fault); } + private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply) + { + return CreateBaseEnvelope(reply); + } + + private WorkerEnvelope CreateEnvelope(WorkerEvent workerEvent) + { + return CreateBaseEnvelope(workerEvent); + } + + private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck) + { + return CreateBaseEnvelope(shutdownAck); + } + private WorkerEnvelope CreateBaseEnvelope(WorkerHello body) { WorkerEnvelope envelope = CreateBaseEnvelope(); @@ -214,6 +296,28 @@ public sealed class WorkerPipeSession return envelope; } + private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty; + envelope.WorkerCommandReply = body; + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerEvent body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerEvent = body.Clone(); + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerShutdownAck = body; + return envelope; + } + private WorkerEnvelope CreateBaseEnvelope() { return new WorkerEnvelope @@ -246,6 +350,75 @@ public sealed class WorkerPipeSession } } + private async Task HandleCommandAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + if (_mxAccessStaSession is null) + { + throw new InvalidOperationException("MXAccess STA session is not initialized."); + } + + StaCommand command = new( + _options.SessionId, + envelope.CorrelationId, + envelope.WorkerCommand.Command, + envelope.WorkerCommand.EnqueueTimestamp, + cancellationToken); + + MxCommandReply mxReply = await _mxAccessStaSession + .DispatchAsync(command) + .ConfigureAwait(false); + WorkerCommandReply reply = new() + { + Reply = mxReply, + CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }; + + await _writer.WriteAsync(CreateEnvelope(reply), cancellationToken).ConfigureAwait(false); + await DrainEventsAsync(cancellationToken).ConfigureAwait(false); + } + + private async Task HandleShutdownAsync( + WorkerShutdown shutdown, + CancellationToken cancellationToken) + { + TimeSpan gracePeriod = ResolveGracePeriod(shutdown); + try + { + MxAccessShutdownResult result = _mxAccessStaSession is null + ? new MxAccessShutdownResult(Array.Empty()) + : await _mxAccessStaSession + .ShutdownGracefullyAsync(gracePeriod, cancellationToken) + .ConfigureAwait(false); + + LogShutdownFailures(result.Failures); + await _writer + .WriteAsync(CreateEnvelope(CreateShutdownAck(result)), cancellationToken) + .ConfigureAwait(false); + _shutdownCompleted = true; + } + catch (TimeoutException exception) + { + _shutdownTimedOut = true; + await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false); + throw; + } + } + + private async Task DrainEventsAsync(CancellationToken cancellationToken) + { + if (_mxAccessStaSession is null) + { + return; + } + + foreach (WorkerEvent workerEvent in _mxAccessStaSession.DrainEvents(maxEvents: 0)) + { + await _writer.WriteAsync(CreateEnvelope(workerEvent), cancellationToken).ConfigureAwait(false); + } + } + private WorkerReady CreateWorkerReady() { return new WorkerReady @@ -257,6 +430,49 @@ public sealed class WorkerPipeSession }; } + private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown) + { + if (shutdown.GracePeriod is null) + { + return TimeSpan.FromSeconds(10); + } + + TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan(); + return gracePeriod <= TimeSpan.Zero + ? TimeSpan.FromSeconds(10) + : gracePeriod; + } + + private static WorkerShutdownAck CreateShutdownAck(MxAccessShutdownResult result) + { + return new WorkerShutdownAck + { + Status = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = result.Succeeded + ? "Graceful shutdown completed." + : $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).", + }, + }; + } + + private void LogShutdownFailures(IReadOnlyList failures) + { + foreach (MxAccessShutdownFailure failure in failures) + { + _logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary + { + ["session_id"] = _options.SessionId, + ["operation"] = failure.Operation, + ["server_handle"] = failure.ServerHandle, + ["item_handle"] = failure.ItemHandle, + ["exception_type"] = failure.ExceptionType, + ["hresult"] = failure.HResult, + }); + } + } + private static WorkerFault CreateFault(WorkerFrameProtocolException exception) { return new WorkerFault @@ -295,6 +511,22 @@ public sealed class WorkerPipeSession return fault; } + private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception) + { + string message = exception.Message; + return new WorkerFault + { + Category = WorkerFaultCategory.ShutdownTimeout, + ExceptionType = exception.GetType().FullName ?? string.Empty, + DiagnosticMessage = message, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = message, + }, + }; + } + private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode) { return errorCode switch diff --git a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs index b96bd6f..5b7ab53 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; @@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable MxAccessAdviceKind.Supervisory); } + public MxAccessShutdownResult ShutdownGracefully() + { + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + List failures = new(); + + CleanupAdviceHandles(failures); + CleanupItemHandles(failures); + CleanupServerHandles(failures); + DisposeCore(failures); + + return new MxAccessShutdownResult(failures); + } + public void Dispose() { if (disposed) @@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable return; } - eventSink.Detach(); + DisposeCore(failures: null); + } - if (Marshal.IsComObject(mxAccessComObject)) + private void CleanupAdviceHandles(ICollection failures) + { + HashSet cleanedPairs = new(); + foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles) { - Marshal.FinalReleaseComObject(mxAccessComObject); + long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + if (!cleanedPairs.Add(key)) + { + continue; + } + + try + { + mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(UnAdvise), + adviceHandle.ServerHandle, + adviceHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupItemHandles(ICollection failures) + { + foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles) + { + try + { + mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle); + handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(RemoveItem), + itemHandle.ServerHandle, + itemHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupServerHandles(ICollection failures) + { + foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles) + { + try + { + mxAccessServer.Unregister(serverHandle.ServerHandle); + handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(Unregister), + serverHandle.ServerHandle, + itemHandle: null, + exception)); + } + } + } + + private static long CreateItemKey( + int serverHandle, + int itemHandle) + { + return ((long)serverHandle << 32) | (uint)itemHandle; + } + + private void DisposeCore(ICollection? failures) + { + try + { + eventSink.Detach(); + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "DetachEvents", + serverHandle: null, + itemHandle: null, + exception)); + } + + try + { + if (Marshal.IsComObject(mxAccessComObject)) + { + Marshal.FinalReleaseComObject(mxAccessComObject); + } + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "ReleaseComObject", + serverHandle: null, + itemHandle: null, + exception)); } disposed = true; diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs new file mode 100644 index 0000000..f4891f7 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs @@ -0,0 +1,34 @@ +using System; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownFailure +{ + public MxAccessShutdownFailure( + string operation, + int? serverHandle, + int? itemHandle, + Exception exception) + { + if (string.IsNullOrWhiteSpace(operation)) + { + throw new ArgumentException("Shutdown failure operation is required.", nameof(operation)); + } + + Operation = operation; + ServerHandle = serverHandle; + ItemHandle = itemHandle; + ExceptionType = exception?.GetType().FullName ?? string.Empty; + HResult = exception?.HResult; + } + + public string Operation { get; } + + public int? ServerHandle { get; } + + public int? ItemHandle { get; } + + public string ExceptionType { get; } + + public int? HResult { get; } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs new file mode 100644 index 0000000..683e65f --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownResult +{ + public MxAccessShutdownResult(IReadOnlyList failures) + { + Failures = failures ?? throw new ArgumentNullException(nameof(failures)); + } + + public IReadOnlyList Failures { get; } + + public bool Succeeded => Failures.Count == 0; +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index f81857b..2b6ed9c 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -141,6 +142,61 @@ public sealed class MxAccessStaSession : IDisposable cancellationToken); } + public async Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (timeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(timeout), + "MXAccess graceful shutdown timeout must be greater than zero."); + } + + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + commandDispatcher?.RequestShutdown(); + + Stopwatch stopwatch = Stopwatch.StartNew(); + MxAccessShutdownResult result; + if (session is null) + { + result = new MxAccessShutdownResult(Array.Empty()); + } + else + { + using CancellationTokenSource shutdownCancellation = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + shutdownCancellation.CancelAfter(timeout); + + Task cleanupTask = staRuntime.InvokeAsync( + () => session.ShutdownGracefully(), + shutdownCancellation.Token); + Task delayTask = Task.Delay(timeout, cancellationToken); + Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false); + if (completedTask != cleanupTask) + { + cancellationToken.ThrowIfCancellationRequested(); + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + result = await cleanupTask.ConfigureAwait(false); + } + + TimeSpan remaining = timeout - stopwatch.Elapsed; + if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining)) + { + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + staRuntime.Dispose(); + disposed = true; + return result; + } + public void Dispose() { if (disposed) diff --git a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs index 3df663e..361cccb 100644 --- a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs +++ b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs @@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher lock (gate) { shutdownRequested = true; + while (commandQueue.Count > 0) + { + QueuedStaCommand queuedCommand = commandQueue.Dequeue(); + queuedCommand.Complete(CreateRejectedReply( + queuedCommand.Command, + ProtocolStatusCode.WorkerUnavailable, + "The STA command dispatcher is shutting down.")); + } } } diff --git a/src/MxGateway.Worker/WorkerApplication.cs b/src/MxGateway.Worker/WorkerApplication.cs index 3bacc28..ec73acb 100644 --- a/src/MxGateway.Worker/WorkerApplication.cs +++ b/src/MxGateway.Worker/WorkerApplication.cs @@ -13,8 +13,7 @@ public static class WorkerApplication return Run( args, new EnvironmentVariableWorkerEnvironment(), - new WorkerConsoleLogger(Console.Error), - new WorkerPipeClient()); + new WorkerConsoleLogger(Console.Error)); } public static int Run( @@ -26,7 +25,7 @@ public static class WorkerApplication args, environment, logger, - new WorkerPipeClient()); + new WorkerPipeClient(logger)); } public static int Run(