diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 8bcaef8..ec69253 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -175,6 +175,12 @@ Behavior: `CloseSession` should be idempotent. Closing an already closed session should return a successful close result with the final known state. +`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read, +write, and heartbeat loops to stop, and waits for the launched worker process to +exit within the same shutdown timeout. If the pipe loops or process exit exceed +the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession` +then kills the worker process tree before surfacing the close failure. + ### Invoke `Invoke` forwards one MXAccess command to the worker that owns the session. @@ -515,6 +521,11 @@ It handles: The write loop should fail the session if a pipe write fails outside normal shutdown. +During shutdown the worker client treats `WorkerShutdownAck` as the protocol +close signal, but the process handle remains authoritative for process lifetime. +The client waits for both the protocol close and process exit before reporting a +clean shutdown to `GatewaySession`. + ## Command Correlation Each command gets: diff --git a/docs/mxaccess-worker-instance-design.md b/docs/mxaccess-worker-instance-design.md index 8d4b88c..f0343b6 100644 --- a/docs/mxaccess-worker-instance-design.md +++ b/docs/mxaccess-worker-instance-design.md @@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with: when the exception exposes one, and does not send `WorkerReady` after a failed COM creation attempt. +After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for +the lifetime of the process. `WorkerCommand` frames are dispatched to +`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued +worker events are drained after command replies. `WorkerShutdown` starts the +graceful shutdown path and returns `WorkerShutdownAck` only after the STA +cleanup path completes. + ## Event Sink The worker must subscribe to every public MXAccess event family: @@ -675,6 +682,29 @@ Graceful shutdown sequence: If shutdown wedges, the gateway kills the process. The worker should be written so process kill does not corrupt other sessions. +`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup +path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands +are rejected and queued commands that have not started receive +`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the +STA is allowed to finish until the shutdown grace period expires. + +After command dispatch is closed, cleanup runs on the STA in MXAccess handle +order: + +1. one `UnAdvise` call per advised server/item pair, +2. `RemoveItem` for active item handles, +3. `Unregister` for active server handles, +4. event sink detach, +5. COM release. + +Each cleanup call is best effort. A failed cleanup operation is recorded as an +`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent +later cleanup calls from running. A shutdown with cleanup failures still returns +`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the +controlled release path. If the grace period expires before cleanup can run or +finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible +and relies on the gateway to kill the process. + ## Fault Handling Worker fault categories: diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 18b8cb6..697eb05 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient try { await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false); + await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false); MarkClosed("shutdown"); } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) @@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false); } + private async Task WaitForProcessExitAsync(CancellationToken cancellationToken) + { + WorkerProcessHandle? processHandle = _connection.ProcessHandle; + if (processHandle is null || processHandle.Process.HasExited) + { + return; + } + + await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); + } + private void ThrowIfDisposed() { ObjectDisposedException.ThrowIf(_disposed, this); diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 9fe3611..eece91a 100644 --- a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -86,6 +86,26 @@ public sealed class SessionManagerTests Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } + [Fact] + public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker() + { + FakeWorkerClient workerClient = new() + { + ShutdownException = new WorkerClientException( + WorkerClientErrorCode.ShutdownTimeout, + "Worker shutdown timed out."), + }; + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); + + Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); + Assert.Equal(1, workerClient.ShutdownCount); + Assert.Equal(1, workerClient.KillCount); + } + [Fact] public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() { @@ -266,6 +286,8 @@ public sealed class SessionManagerTests public int KillCount { get; private set; } + public Exception? ShutdownException { get; init; } + public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; @@ -302,6 +324,11 @@ public sealed class SessionManagerTests CancellationToken cancellationToken) { ShutdownCount++; + if (ShutdownException is not null) + { + throw ShutdownException; + } + State = WorkerClientState.Closed; return Task.CompletedTask; } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs index 5fba6ea..af97da5 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs @@ -142,6 +142,13 @@ public sealed class WorkerPipeClientTests { } + public Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + return Task.FromResult(new MxAccessShutdownResult(Array.Empty())); + } + public void Dispose() { } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index d1a11f9..4de1fd2 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -555,6 +555,14 @@ public sealed class WorkerPipeSessionTests releaseDispatch.Set(); } + public Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + releaseDispatch.Set(); + return Task.FromResult(new MxAccessShutdownResult(Array.Empty())); + } + public void ReleaseDispatch() { releaseDispatch.Set(); diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs index 61bdb7e..4e8b355 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind); } + [Fact] + public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder() + { + FakeMxAccessComObject fakeComObject = new( + registerHandle: 58, + addItemHandle: 510); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510)); + await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.True(result.Succeeded); + Assert.Equal( + new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" }, + fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal) + || name.StartsWith("Remove", StringComparison.Ordinal))); + } + + [Fact] + public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues() + { + const int hresult = unchecked((int)0x80070057); + COMException cleanupException = new("Invalid handle.", hresult); + FakeMxAccessComObject fakeComObject = new( + registerHandle: 59, + addItemHandle: 511, + unregisterException: cleanupException, + removeItemException: cleanupException, + unAdviseException: cleanupException); + FakeMxAccessComObjectFactory factory = new(fakeComObject); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new(runtime, factory, new NoopEventSink()); + await session.StartAsync(workerProcessId: 1234); + await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a")); + await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value")); + await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511)); + + MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2)); + + Assert.False(result.Succeeded); + Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation)); + Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult)); + Assert.Contains("Unregister:59", fakeComObject.OperationNames); + } + [Fact] public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest() { @@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests private readonly Exception? adviseException; private readonly Exception? unAdviseException; private readonly Exception? adviseSupervisoryException; + private readonly List operationNames = new(); public FakeMxAccessComObject( int registerHandle, @@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests public int? AdviseSupervisoryThreadId { get; private set; } + public IReadOnlyList OperationNames => operationNames.ToArray(); + public int Register(string clientName) { + operationNames.Add($"Register:{clientName}"); RegisteredClientName = clientName; RegisterThreadId = Environment.CurrentManagedThreadId; @@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests public void Unregister(int serverHandle) { + operationNames.Add($"Unregister:{serverHandle}"); UnregisteredServerHandle = serverHandle; UnregisterThreadId = Environment.CurrentManagedThreadId; @@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, string itemDefinition) { + operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}"); AddItemServerHandle = serverHandle; AddItemDefinition = itemDefinition; AddItemThreadId = Environment.CurrentManagedThreadId; @@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests string itemDefinition, string itemContext) { + operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}"); AddItem2ServerHandle = serverHandle; AddItem2Definition = itemDefinition; AddItem2Context = itemContext; @@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}"); RemoveItemServerHandle = serverHandle; RemovedItemHandle = itemHandle; RemoveItemThreadId = Environment.CurrentManagedThreadId; @@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"Advise:{serverHandle}:{itemHandle}"); AdviseServerHandle = serverHandle; AdvisedItemHandle = itemHandle; AdviseThreadId = Environment.CurrentManagedThreadId; @@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}"); UnAdviseServerHandle = serverHandle; UnAdvisedItemHandle = itemHandle; UnAdviseThreadId = Environment.CurrentManagedThreadId; @@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests int serverHandle, int itemHandle) { + operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}"); AdviseSupervisoryServerHandle = serverHandle; AdviseSupervisoryItemHandle = itemHandle; AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId; diff --git a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs index f4ee5a5..35032ce 100644 --- a/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs +++ b/src/MxGateway.Worker.Tests/Sta/StaCommandDispatcherTests.cs @@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests Assert.Equal("correlation-1", reply.CorrelationId); } + [Fact] + public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish() + { + using StaRuntime runtime = CreateRuntime(); + runtime.Start(); + BlockingCommandExecutor executor = new(); + StaCommandDispatcher dispatcher = new(runtime, executor); + Task current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register)); + Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2))); + Task pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem)); + + dispatcher.RequestShutdown(); + MxCommandReply pendingReply = await pending; + executor.Release(); + MxCommandReply currentReply = await current; + + Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code); + Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code); + Assert.Equal(new[] { "current" }, executor.CorrelationIds); + } + [Fact] public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount() { diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index d60ce04..5fd0c5a 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -12,23 +12,48 @@ public sealed class WorkerPipeClient : IWorkerPipeClient public const int DefaultConnectTimeoutMilliseconds = 30000; private readonly int _connectTimeoutMilliseconds; - private readonly Func _sessionFactory; + private readonly Func _sessionFactory; + private readonly IWorkerLogger? _logger; public WorkerPipeClient() - : this(DefaultConnectTimeoutMilliseconds) + : this(null, DefaultConnectTimeoutMilliseconds) + { + } + + public WorkerPipeClient(IWorkerLogger? logger) + : this(logger, DefaultConnectTimeoutMilliseconds) { } public WorkerPipeClient(int connectTimeoutMilliseconds) - : this( - connectTimeoutMilliseconds, - (stream, frameOptions) => new WorkerPipeSession(stream, frameOptions)) + : this(null, connectTimeoutMilliseconds) { } public WorkerPipeClient( int connectTimeoutMilliseconds, Func sessionFactory) + : this( + null, + connectTimeoutMilliseconds, + (stream, frameOptions, _) => sessionFactory(stream, frameOptions)) + { + } + + public WorkerPipeClient( + IWorkerLogger? logger, + int connectTimeoutMilliseconds) + : this( + logger, + connectTimeoutMilliseconds, + (stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger)) + { + } + + public WorkerPipeClient( + IWorkerLogger? logger, + int connectTimeoutMilliseconds, + Func sessionFactory) { if (connectTimeoutMilliseconds <= 0) { @@ -37,6 +62,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient "Worker pipe connect timeout must be greater than zero."); } + _logger = logger; _sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory)); _connectTimeoutMilliseconds = connectTimeoutMilliseconds; } @@ -60,7 +86,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false); - WorkerPipeSession session = _sessionFactory(pipe, frameOptions); + WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger); await session.RunAsync(cancellationToken).ConfigureAwait(false); } diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index 2be7cdc..3bf0ffd 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -1,10 +1,12 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; +using MxGateway.Worker.Bootstrap; using MxGateway.Worker.MxAccess; using MxGateway.Worker.Sta; @@ -16,21 +18,27 @@ public sealed class WorkerPipeSession private readonly Func _processIdProvider; private readonly Func _runtimeSessionFactory; private readonly WorkerPipeSessionOptions _sessionOptions; + private readonly IWorkerLogger? _logger; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; private IWorkerRuntimeSession? _runtimeSession; private long _nextSequence; private WorkerState _state = WorkerState.Starting; private bool _watchdogFaultSent; + private bool _shutdownTimedOut; public WorkerPipeSession( Stream stream, - WorkerFrameProtocolOptions options) + WorkerFrameProtocolOptions options, + IWorkerLogger? logger = null) : this( new WorkerFrameReader(stream, options), new WorkerFrameWriter(stream, options), options, - () => Process.GetCurrentProcess().Id) + () => Process.GetCurrentProcess().Id, + new WorkerPipeSessionOptions(), + () => new MxAccessStaSession(), + logger) { } @@ -45,7 +53,8 @@ public sealed class WorkerPipeSession options, processIdProvider, new WorkerPipeSessionOptions(), - () => new MxAccessStaSession()) + () => new MxAccessStaSession(), + logger: null) { } @@ -55,7 +64,8 @@ public sealed class WorkerPipeSession WorkerFrameProtocolOptions options, Func processIdProvider, WorkerPipeSessionOptions sessionOptions, - Func runtimeSessionFactory) + Func runtimeSessionFactory, + IWorkerLogger? logger = null) { _reader = reader ?? throw new ArgumentNullException(nameof(reader)); _writer = writer ?? throw new ArgumentNullException(nameof(writer)); @@ -63,6 +73,7 @@ public sealed class WorkerPipeSession _processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider)); _sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions)); _runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory)); + _logger = logger; _sessionOptions.Validate(); } @@ -78,7 +89,11 @@ public sealed class WorkerPipeSession } finally { - _runtimeSession?.Dispose(); + if (!_shutdownTimedOut) + { + _runtimeSession?.Dispose(); + } + _runtimeSession = null; _state = WorkerState.Stopped; } @@ -290,23 +305,38 @@ public sealed class WorkerPipeSession CancellationToken cancellationToken) { _state = WorkerState.ShuttingDown; - _runtimeSession?.RequestShutdown(); + IWorkerRuntimeSession? runtimeSession = _runtimeSession; + if (runtimeSession is null) + { + await WriteShutdownAckAsync( + CreateShutdownAck(new MxAccessShutdownResult(Array.Empty()), shutdown), + cancellationToken).ConfigureAwait(false); + return; + } - await _writer - .WriteAsync( - CreateEnvelope( - new WorkerShutdownAck - { - Status = new ProtocolStatus - { - Code = ProtocolStatusCode.Ok, - Message = string.IsNullOrWhiteSpace(shutdown.Reason) - ? "Worker shutdown accepted." - : $"Worker shutdown accepted: {shutdown.Reason}", - }, - }), - cancellationToken) - .ConfigureAwait(false); + TimeSpan gracePeriod = ResolveGracePeriod(shutdown); + try + { + MxAccessShutdownResult result = await runtimeSession + .ShutdownGracefullyAsync(gracePeriod, cancellationToken) + .ConfigureAwait(false); + LogShutdownFailures(result.Failures); + await WriteShutdownAckAsync(CreateShutdownAck(result, shutdown), cancellationToken).ConfigureAwait(false); + } + catch (TimeoutException exception) + { + _shutdownTimedOut = true; + _state = WorkerState.Faulted; + await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false); + throw; + } + } + + private Task WriteShutdownAckAsync( + WorkerShutdownAck shutdownAck, + CancellationToken cancellationToken) + { + return _writer.WriteAsync(CreateEnvelope(shutdownAck), cancellationToken); } private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken) @@ -545,6 +575,57 @@ public sealed class WorkerPipeSession }; } + private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown) + { + if (shutdown.GracePeriod is null) + { + return TimeSpan.FromSeconds(10); + } + + TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan(); + return gracePeriod <= TimeSpan.Zero + ? TimeSpan.FromSeconds(10) + : gracePeriod; + } + + private static WorkerShutdownAck CreateShutdownAck( + MxAccessShutdownResult result, + WorkerShutdown shutdown) + { + string message = result.Succeeded + ? "Graceful shutdown completed." + : $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s)."; + if (!string.IsNullOrWhiteSpace(shutdown.Reason)) + { + message = $"{message} Reason: {shutdown.Reason}"; + } + + return new WorkerShutdownAck + { + Status = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = message, + }, + }; + } + + private void LogShutdownFailures(IReadOnlyList failures) + { + foreach (MxAccessShutdownFailure failure in failures) + { + _logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary + { + ["session_id"] = _options.SessionId, + ["operation"] = failure.Operation, + ["server_handle"] = failure.ServerHandle, + ["item_handle"] = failure.ItemHandle, + ["exception_type"] = failure.ExceptionType, + ["hresult"] = failure.HResult, + }); + } + } + private static WorkerFault CreateFault(WorkerFrameProtocolException exception) { return new WorkerFault @@ -619,6 +700,14 @@ public sealed class WorkerPipeSession }; } + private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception) + { + return CreateFault( + WorkerFaultCategory.ShutdownTimeout, + commandMethod: string.Empty, + exception); + } + private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode) { return errorCode switch diff --git a/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs index 3369b3c..7be0f3f 100644 --- a/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs +++ b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs @@ -18,4 +18,8 @@ public interface IWorkerRuntimeSession : IDisposable WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat(); void RequestShutdown(); + + Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default); } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs index b96bd6f..5b7ab53 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; @@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable MxAccessAdviceKind.Supervisory); } + public MxAccessShutdownResult ShutdownGracefully() + { + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + List failures = new(); + + CleanupAdviceHandles(failures); + CleanupItemHandles(failures); + CleanupServerHandles(failures); + DisposeCore(failures); + + return new MxAccessShutdownResult(failures); + } + public void Dispose() { if (disposed) @@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable return; } - eventSink.Detach(); + DisposeCore(failures: null); + } - if (Marshal.IsComObject(mxAccessComObject)) + private void CleanupAdviceHandles(ICollection failures) + { + HashSet cleanedPairs = new(); + foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles) { - Marshal.FinalReleaseComObject(mxAccessComObject); + long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + if (!cleanedPairs.Add(key)) + { + continue; + } + + try + { + mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(UnAdvise), + adviceHandle.ServerHandle, + adviceHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupItemHandles(ICollection failures) + { + foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles) + { + try + { + mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle); + handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(RemoveItem), + itemHandle.ServerHandle, + itemHandle.ItemHandle, + exception)); + } + } + } + + private void CleanupServerHandles(ICollection failures) + { + foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles) + { + try + { + mxAccessServer.Unregister(serverHandle.ServerHandle); + handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle); + } + catch (Exception exception) + { + failures.Add(new MxAccessShutdownFailure( + nameof(Unregister), + serverHandle.ServerHandle, + itemHandle: null, + exception)); + } + } + } + + private static long CreateItemKey( + int serverHandle, + int itemHandle) + { + return ((long)serverHandle << 32) | (uint)itemHandle; + } + + private void DisposeCore(ICollection? failures) + { + try + { + eventSink.Detach(); + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "DetachEvents", + serverHandle: null, + itemHandle: null, + exception)); + } + + try + { + if (Marshal.IsComObject(mxAccessComObject)) + { + Marshal.FinalReleaseComObject(mxAccessComObject); + } + } + catch (Exception exception) when (failures is not null) + { + failures.Add(new MxAccessShutdownFailure( + "ReleaseComObject", + serverHandle: null, + itemHandle: null, + exception)); } disposed = true; diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs new file mode 100644 index 0000000..f4891f7 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownFailure.cs @@ -0,0 +1,34 @@ +using System; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownFailure +{ + public MxAccessShutdownFailure( + string operation, + int? serverHandle, + int? itemHandle, + Exception exception) + { + if (string.IsNullOrWhiteSpace(operation)) + { + throw new ArgumentException("Shutdown failure operation is required.", nameof(operation)); + } + + Operation = operation; + ServerHandle = serverHandle; + ItemHandle = itemHandle; + ExceptionType = exception?.GetType().FullName ?? string.Empty; + HResult = exception?.HResult; + } + + public string Operation { get; } + + public int? ServerHandle { get; } + + public int? ItemHandle { get; } + + public string ExceptionType { get; } + + public int? HResult { get; } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs new file mode 100644 index 0000000..683e65f --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessShutdownResult.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessShutdownResult +{ + public MxAccessShutdownResult(IReadOnlyList failures) + { + Failures = failures ?? throw new ArgumentNullException(nameof(failures)); + } + + public IReadOnlyList Failures { get; } + + public bool Succeeded => Failures.Count == 0; +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index 25a5bf6..1e2ad89 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -165,6 +166,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession cancellationToken); } + public async Task ShutdownGracefullyAsync( + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (timeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(timeout), + "MXAccess graceful shutdown timeout must be greater than zero."); + } + + if (disposed) + { + return new MxAccessShutdownResult(Array.Empty()); + } + + commandDispatcher?.RequestShutdown(); + + Stopwatch stopwatch = Stopwatch.StartNew(); + MxAccessShutdownResult result; + if (session is null) + { + result = new MxAccessShutdownResult(Array.Empty()); + } + else + { + using CancellationTokenSource shutdownCancellation = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + shutdownCancellation.CancelAfter(timeout); + + Task cleanupTask = staRuntime.InvokeAsync( + () => session.ShutdownGracefully(), + shutdownCancellation.Token); + Task delayTask = Task.Delay(timeout, cancellationToken); + Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false); + if (completedTask != cleanupTask) + { + cancellationToken.ThrowIfCancellationRequested(); + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + result = await cleanupTask.ConfigureAwait(false); + } + + TimeSpan remaining = timeout - stopwatch.Elapsed; + if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining)) + { + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } + + staRuntime.Dispose(); + disposed = true; + return result; + } + public void Dispose() { if (disposed) diff --git a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs index 3df663e..361cccb 100644 --- a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs +++ b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs @@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher lock (gate) { shutdownRequested = true; + while (commandQueue.Count > 0) + { + QueuedStaCommand queuedCommand = commandQueue.Dequeue(); + queuedCommand.Complete(CreateRejectedReply( + queuedCommand.Command, + ProtocolStatusCode.WorkerUnavailable, + "The STA command dispatcher is shutting down.")); + } } } diff --git a/src/MxGateway.Worker/WorkerApplication.cs b/src/MxGateway.Worker/WorkerApplication.cs index e1be4aa..db4bc4b 100644 --- a/src/MxGateway.Worker/WorkerApplication.cs +++ b/src/MxGateway.Worker/WorkerApplication.cs @@ -13,8 +13,7 @@ public static class WorkerApplication return Run( args, new EnvironmentVariableWorkerEnvironment(), - new WorkerConsoleLogger(Console.Error), - new WorkerPipeClient()); + new WorkerConsoleLogger(Console.Error)); } public static int Run( @@ -26,7 +25,7 @@ public static class WorkerApplication args, environment, logger, - new WorkerPipeClient()); + new WorkerPipeClient(logger)); } public static int Run(