Implement graceful worker shutdown

This commit is contained in:
Joseph Doherty
2026-04-26 19:36:22 -04:00
parent 95e71cd819
commit d890eff862
15 changed files with 694 additions and 11 deletions
+17 -2
View File
@@ -11,13 +11,26 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
private readonly IWorkerLogger? _logger;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
: this(null, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(IWorkerLogger? logger)
: this(logger, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
: this(null, connectTimeoutMilliseconds)
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -27,6 +40,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
}
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
_logger = logger;
}
public async Task RunAsync(
@@ -48,8 +62,9 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = new(pipe, frameOptions);
WorkerPipeSession session = new(pipe, frameOptions, _logger);
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
private Task ConnectAsync(
+235 -3
View File
@@ -1,11 +1,14 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Ipc;
@@ -13,19 +16,24 @@ public sealed class WorkerPipeSession
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Func<int> _processIdProvider;
private readonly IWorkerLogger? _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private MxAccessStaSession? _mxAccessStaSession;
private long _nextSequence;
private bool _shutdownCompleted;
private bool _shutdownTimedOut;
public WorkerPipeSession(
Stream stream,
WorkerFrameProtocolOptions options)
WorkerFrameProtocolOptions options,
IWorkerLogger? logger = null)
: this(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => Process.GetCurrentProcess().Id)
() => Process.GetCurrentProcess().Id,
logger)
{
}
@@ -33,12 +41,14 @@ public sealed class WorkerPipeSession
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider)
Func<int> processIdProvider,
IWorkerLogger? logger = null)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_logger = logger;
}
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
@@ -95,6 +105,44 @@ public sealed class WorkerPipeSession
}
}
public async Task RunAsync(CancellationToken cancellationToken = default)
{
try
{
while (true)
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommand:
await HandleCommandAsync(envelope, cancellationToken).ConfigureAwait(false);
break;
case WorkerEnvelope.BodyOneofCase.WorkerShutdown:
await HandleShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false);
return;
case WorkerEnvelope.BodyOneofCase.WorkerCancel:
break;
default:
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
$"Worker received unexpected gateway envelope body {envelope.BodyCase} after startup.");
}
}
}
catch (WorkerFrameProtocolException exception)
{
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
throw;
}
finally
{
if (!_shutdownCompleted && !_shutdownTimedOut)
{
_mxAccessStaSession?.Dispose();
}
}
}
private void ValidateGatewayHello(WorkerEnvelope envelope)
{
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
@@ -178,6 +226,25 @@ public sealed class WorkerPipeSession
}
}
private async Task TryWriteFaultAsync(
WorkerFault fault,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(fault), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The shutdown timeout is the actionable error.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
@@ -193,6 +260,21 @@ public sealed class WorkerPipeSession
return CreateBaseEnvelope(fault);
}
private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply)
{
return CreateBaseEnvelope(reply);
}
private WorkerEnvelope CreateEnvelope(WorkerEvent workerEvent)
{
return CreateBaseEnvelope(workerEvent);
}
private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck)
{
return CreateBaseEnvelope(shutdownAck);
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
@@ -214,6 +296,28 @@ public sealed class WorkerPipeSession
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty;
envelope.WorkerCommandReply = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerEvent body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerEvent = body.Clone();
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerShutdownAck = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope()
{
return new WorkerEnvelope
@@ -246,6 +350,75 @@ public sealed class WorkerPipeSession
}
}
private async Task HandleCommandAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
if (_mxAccessStaSession is null)
{
throw new InvalidOperationException("MXAccess STA session is not initialized.");
}
StaCommand command = new(
_options.SessionId,
envelope.CorrelationId,
envelope.WorkerCommand.Command,
envelope.WorkerCommand.EnqueueTimestamp,
cancellationToken);
MxCommandReply mxReply = await _mxAccessStaSession
.DispatchAsync(command)
.ConfigureAwait(false);
WorkerCommandReply reply = new()
{
Reply = mxReply,
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
await _writer.WriteAsync(CreateEnvelope(reply), cancellationToken).ConfigureAwait(false);
await DrainEventsAsync(cancellationToken).ConfigureAwait(false);
}
private async Task HandleShutdownAsync(
WorkerShutdown shutdown,
CancellationToken cancellationToken)
{
TimeSpan gracePeriod = ResolveGracePeriod(shutdown);
try
{
MxAccessShutdownResult result = _mxAccessStaSession is null
? new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>())
: await _mxAccessStaSession
.ShutdownGracefullyAsync(gracePeriod, cancellationToken)
.ConfigureAwait(false);
LogShutdownFailures(result.Failures);
await _writer
.WriteAsync(CreateEnvelope(CreateShutdownAck(result)), cancellationToken)
.ConfigureAwait(false);
_shutdownCompleted = true;
}
catch (TimeoutException exception)
{
_shutdownTimedOut = true;
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
throw;
}
}
private async Task DrainEventsAsync(CancellationToken cancellationToken)
{
if (_mxAccessStaSession is null)
{
return;
}
foreach (WorkerEvent workerEvent in _mxAccessStaSession.DrainEvents(maxEvents: 0))
{
await _writer.WriteAsync(CreateEnvelope(workerEvent), cancellationToken).ConfigureAwait(false);
}
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
@@ -257,6 +430,49 @@ public sealed class WorkerPipeSession
};
}
private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown)
{
if (shutdown.GracePeriod is null)
{
return TimeSpan.FromSeconds(10);
}
TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan();
return gracePeriod <= TimeSpan.Zero
? TimeSpan.FromSeconds(10)
: gracePeriod;
}
private static WorkerShutdownAck CreateShutdownAck(MxAccessShutdownResult result)
{
return new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = result.Succeeded
? "Graceful shutdown completed."
: $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).",
},
};
}
private void LogShutdownFailures(IReadOnlyList<MxAccessShutdownFailure> failures)
{
foreach (MxAccessShutdownFailure failure in failures)
{
_logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary<string, object?>
{
["session_id"] = _options.SessionId,
["operation"] = failure.Operation,
["server_handle"] = failure.ServerHandle,
["item_handle"] = failure.ItemHandle,
["exception_type"] = failure.ExceptionType,
["hresult"] = failure.HResult,
});
}
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
@@ -295,6 +511,22 @@ public sealed class WorkerPipeSession
return fault;
}
private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception)
{
string message = exception.Message;
return new WorkerFault
{
Category = WorkerFaultCategory.ShutdownTimeout,
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
},
};
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
@@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable
MxAccessAdviceKind.Supervisory);
}
public MxAccessShutdownResult ShutdownGracefully()
{
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
List<MxAccessShutdownFailure> failures = new();
CleanupAdviceHandles(failures);
CleanupItemHandles(failures);
CleanupServerHandles(failures);
DisposeCore(failures);
return new MxAccessShutdownResult(failures);
}
public void Dispose()
{
if (disposed)
@@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable
return;
}
eventSink.Detach();
DisposeCore(failures: null);
}
if (Marshal.IsComObject(mxAccessComObject))
private void CleanupAdviceHandles(ICollection<MxAccessShutdownFailure> failures)
{
HashSet<long> cleanedPairs = new();
foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles)
{
Marshal.FinalReleaseComObject(mxAccessComObject);
long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
if (!cleanedPairs.Add(key))
{
continue;
}
try
{
mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(UnAdvise),
adviceHandle.ServerHandle,
adviceHandle.ItemHandle,
exception));
}
}
}
private void CleanupItemHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles)
{
try
{
mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle);
handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(RemoveItem),
itemHandle.ServerHandle,
itemHandle.ItemHandle,
exception));
}
}
}
private void CleanupServerHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles)
{
try
{
mxAccessServer.Unregister(serverHandle.ServerHandle);
handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(Unregister),
serverHandle.ServerHandle,
itemHandle: null,
exception));
}
}
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures)
{
try
{
eventSink.Detach();
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"DetachEvents",
serverHandle: null,
itemHandle: null,
exception));
}
try
{
if (Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"ReleaseComObject",
serverHandle: null,
itemHandle: null,
exception));
}
disposed = true;
@@ -0,0 +1,34 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownFailure
{
public MxAccessShutdownFailure(
string operation,
int? serverHandle,
int? itemHandle,
Exception exception)
{
if (string.IsNullOrWhiteSpace(operation))
{
throw new ArgumentException("Shutdown failure operation is required.", nameof(operation));
}
Operation = operation;
ServerHandle = serverHandle;
ItemHandle = itemHandle;
ExceptionType = exception?.GetType().FullName ?? string.Empty;
HResult = exception?.HResult;
}
public string Operation { get; }
public int? ServerHandle { get; }
public int? ItemHandle { get; }
public string ExceptionType { get; }
public int? HResult { get; }
}
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownResult
{
public MxAccessShutdownResult(IReadOnlyList<MxAccessShutdownFailure> failures)
{
Failures = failures ?? throw new ArgumentNullException(nameof(failures));
}
public IReadOnlyList<MxAccessShutdownFailure> Failures { get; }
public bool Succeeded => Failures.Count == 0;
}
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
@@ -141,6 +142,61 @@ public sealed class MxAccessStaSession : IDisposable
cancellationToken);
}
public async Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(timeout),
"MXAccess graceful shutdown timeout must be greater than zero.");
}
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
commandDispatcher?.RequestShutdown();
Stopwatch stopwatch = Stopwatch.StartNew();
MxAccessShutdownResult result;
if (session is null)
{
result = new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
else
{
using CancellationTokenSource shutdownCancellation =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
shutdownCancellation.CancelAfter(timeout);
Task<MxAccessShutdownResult> cleanupTask = staRuntime.InvokeAsync(
() => session.ShutdownGracefully(),
shutdownCancellation.Token);
Task delayTask = Task.Delay(timeout, cancellationToken);
Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false);
if (completedTask != cleanupTask)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
result = await cleanupTask.ConfigureAwait(false);
}
TimeSpan remaining = timeout - stopwatch.Elapsed;
if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining))
{
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
staRuntime.Dispose();
disposed = true;
return result;
}
public void Dispose()
{
if (disposed)
@@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher
lock (gate)
{
shutdownRequested = true;
while (commandQueue.Count > 0)
{
QueuedStaCommand queuedCommand = commandQueue.Dequeue();
queuedCommand.Complete(CreateRejectedReply(
queuedCommand.Command,
ProtocolStatusCode.WorkerUnavailable,
"The STA command dispatcher is shutting down."));
}
}
}
+2 -3
View File
@@ -13,8 +13,7 @@ public static class WorkerApplication
return Run(
args,
new EnvironmentVariableWorkerEnvironment(),
new WorkerConsoleLogger(Console.Error),
new WorkerPipeClient());
new WorkerConsoleLogger(Console.Error));
}
public static int Run(
@@ -26,7 +25,7 @@ public static class WorkerApplication
args,
environment,
logger,
new WorkerPipeClient());
new WorkerPipeClient(logger));
}
public static int Run(