Files
mxaccessgw/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs
T
Joseph Doherty bf72cd8961 feat(worker): implement Ping/GetSessionState/GetWorkerInfo/DrainEvents/ShutdownWorker control commands
Answer the five worker control/lifecycle commands at the WorkerPipeSession
message-loop layer instead of the STA-bound MxAccessCommandExecutor. These
replies are built from process-level state (worker pid, assembly version,
worker lifecycle, the runtime session's event queue) the executor cannot see,
and ShutdownWorker must emit its OK reply before the graceful shutdown joins
the STA thread - dispatching it onto the STA would deadlock.

- Ping: OK reply, echoes message into diagnostic_message.
- GetSessionState: maps WorkerState to proto SessionState.
- GetWorkerInfo: pid, worker version, MXAccess ProgID/CLSID.
- DrainEvents: drains the runtime event queue into DrainEventsReply.
- ShutdownWorker: OK reply, then graceful shutdown, then stops the loop.

Tests added in WorkerPipeSessionTests; FakeRuntimeSession gains a
batch-size drain suppressor so DrainEvents does not race the background
drain loop.
2026-06-15 10:20:51 -04:00

1243 lines
48 KiB
C#

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Worker.Bootstrap;
using ZB.MOM.WW.MxGateway.Worker.MxAccess;
using ZB.MOM.WW.MxGateway.Worker.Sta;
namespace ZB.MOM.WW.MxGateway.Worker.Ipc;
public sealed class WorkerPipeSession
{
private static readonly TimeSpan EventDrainInterval = TimeSpan.FromMilliseconds(25);
private static readonly TimeSpan BackgroundTaskStopTimeout = TimeSpan.FromSeconds(1);
private const uint EventDrainBatchSize = 128;
private readonly WorkerFrameProtocolOptions _options;
private readonly Func<int> _processIdProvider;
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
private readonly WorkerPipeSessionOptions _sessionOptions;
private readonly IWorkerLogger? _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private readonly object _commandTaskGate = new();
private readonly HashSet<Task> _activeCommandTasks = new();
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
// Mutated from the message loop, command tasks, the heartbeat loop and the
// shutdown path; volatile so cross-thread reads observe the latest state
// without tearing (WorkerState is an int-backed protobuf enum).
private volatile WorkerState _state = WorkerState.Starting;
private bool _acceptingCommands = true;
private bool _watchdogFaultSent;
/// <summary>Initializes a new worker pipe session over the provided stream.</summary>
/// <param name="stream">Network stream for reading and writing frames.</param>
/// <param name="options">Frame protocol configuration.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
public WorkerPipeSession(
Stream stream,
WorkerFrameProtocolOptions options,
IWorkerLogger? logger = null)
: this(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)),
logger)
{
}
/// <summary>Initializes a new worker pipe session with custom frame reader and writer.</summary>
/// <param name="reader">Frame reader for incoming messages.</param>
/// <param name="writer">Frame writer for outgoing messages.</param>
/// <param name="options">Frame protocol configuration.</param>
/// <param name="processIdProvider">Function returning the current worker process ID.</param>
public WorkerPipeSession(
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider)
: this(
reader,
writer,
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)),
logger: null)
{
}
/// <summary>Initializes a new worker pipe session with full configuration and dependencies.</summary>
/// <param name="reader">Frame reader for incoming messages.</param>
/// <param name="writer">Frame writer for outgoing messages.</param>
/// <param name="options">Frame protocol configuration.</param>
/// <param name="processIdProvider">Function returning the current worker process ID.</param>
/// <param name="sessionOptions">Session-specific options.</param>
/// <param name="runtimeSessionFactory">Factory creating the MXAccess runtime session.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
public WorkerPipeSession(
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider,
WorkerPipeSessionOptions sessionOptions,
Func<IWorkerRuntimeSession> runtimeSessionFactory,
IWorkerLogger? logger = null)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
_logger = logger;
_sessionOptions.Validate();
}
/// <summary>Runs the worker session, completing the handshake and processing messages until cancellation.</summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task RunAsync(CancellationToken cancellationToken = default)
{
// Worker-025: the factory delegate itself is null-checked in the
// constructor, but its return value is not — a factory that returned
// null would NRE on the StartAsync lambda below. Throw a diagnostic
// exception instead so the failure is unambiguous (and so the
// finally block's _runtimeSession?.Dispose() can't silently no-op
// on a torn half-initialized session). Mirrors the same pattern
// AlarmCommandHandler.Subscribe uses for its consumerFactory().
_runtimeSession = _runtimeSessionFactory()
?? throw new InvalidOperationException(
"Worker runtime session factory returned null.");
try
{
await CompleteStartupHandshakeAsync(
token => _runtimeSession.StartAsync(_options.SessionId, _processIdProvider(), token),
cancellationToken).ConfigureAwait(false);
await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
// Always dispose the runtime session, including after a
// shutdown timeout. MxAccessStaSession.Dispose is idempotent and
// bounded (each STA join is capped at 2s), so re-entering it on
// the normal path is a harmless no-op, while on the timed-out
// path it is the only thing that reclaims the STA thread and
// releases the MXAccess COM object — skipping it leaked both and
// left cleanup to rely solely on process exit.
_runtimeSession?.Dispose();
_runtimeSession = null;
_state = WorkerState.Stopped;
}
}
/// <summary>Completes the gateway startup handshake using default MXAccess initialization.</summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
{
return CompleteStartupHandshakeAsync(InitializeMxAccessAsync, cancellationToken);
}
/// <summary>Completes the gateway startup handshake with custom MXAccess initialization that returns void.</summary>
/// <param name="initializeMxAccessAsync">Async function to initialize MXAccess.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task CompleteStartupHandshakeAsync(
Func<CancellationToken, Task> initializeMxAccessAsync,
CancellationToken cancellationToken = default)
{
if (initializeMxAccessAsync is null)
{
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
await CompleteStartupHandshakeAsync(
async innerCancellationToken =>
{
await initializeMxAccessAsync(innerCancellationToken).ConfigureAwait(false);
return CreateWorkerReady();
},
cancellationToken).ConfigureAwait(false);
}
/// <summary>Completes the gateway startup handshake with custom MXAccess initialization that returns WorkerReady.</summary>
/// <param name="initializeMxAccessAsync">Async function to initialize MXAccess and return ready state.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task CompleteStartupHandshakeAsync(
Func<CancellationToken, Task<WorkerReady>> initializeMxAccessAsync,
CancellationToken cancellationToken = default)
{
if (initializeMxAccessAsync is null)
{
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.Handshaking;
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.InitializingSta;
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
_state = WorkerState.Ready;
}
catch (WorkerFrameProtocolException exception)
{
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
throw;
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
await TryWriteFaultAsync(MxAccessCreationException.From(exception), cancellationToken)
.ConfigureAwait(false);
throw;
}
}
private void ValidateGatewayHello(WorkerEnvelope envelope)
{
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
"Worker expected GatewayHello during startup handshake.");
}
GatewayHello gatewayHello = envelope.GatewayHello;
if (gatewayHello.SupportedProtocolVersion != _options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"GatewayHello supported protocol version {gatewayHello.SupportedProtocolVersion} does not match expected version {_options.ProtocolVersion}.");
}
if (!string.Equals(gatewayHello.Nonce, _options.Nonce, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.NonceMismatch,
"GatewayHello nonce does not match the worker launch nonce.");
}
}
private Task WriteWorkerHelloAsync(CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerHello
{
ProtocolVersion = _options.ProtocolVersion,
Nonce = _options.Nonce,
WorkerProcessId = _processIdProvider(),
WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty,
}),
cancellationToken);
}
private Task WriteWorkerReadyAsync(
WorkerReady ready,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
}
private async Task RunMessageLoopAsync(CancellationToken cancellationToken)
{
using CancellationTokenSource loopCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
using CancellationTokenSource heartbeatCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token);
Task eventDrainTask = RunEventDrainLoopAsync(heartbeatCancellation.Token);
Task<WorkerEnvelope> readTask = _reader.ReadAsync(loopCancellation.Token);
try
{
while (!cancellationToken.IsCancellationRequested)
{
Task completedTask = await Task.WhenAny(readTask, heartbeatTask, eventDrainTask).ConfigureAwait(false);
if (completedTask == readTask)
{
WorkerEnvelope envelope = await readTask.ConfigureAwait(false);
bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false);
if (!keepReading)
{
return;
}
readTask = _reader.ReadAsync(loopCancellation.Token);
}
else if (completedTask == heartbeatTask)
{
await heartbeatTask.ConfigureAwait(false);
}
else if (completedTask == eventDrainTask)
{
await eventDrainTask.ConfigureAwait(false);
}
}
}
finally
{
loopCancellation.Cancel();
heartbeatCancellation.Cancel();
await ObserveBackgroundTaskStopAsync(heartbeatTask, "Heartbeat").ConfigureAwait(false);
await ObserveBackgroundTaskStopAsync(eventDrainTask, "EventDrain").ConfigureAwait(false);
}
}
private async Task ObserveBackgroundTaskStopAsync(
Task task,
string taskName)
{
Task completedTask = await Task
.WhenAny(task, Task.Delay(BackgroundTaskStopTimeout))
.ConfigureAwait(false);
if (completedTask != task)
{
_logger?.Error(
"WorkerPipeSessionBackgroundTaskStopTimedOut",
new Dictionary<string, object?>
{
["task"] = taskName,
["timeout_ms"] = BackgroundTaskStopTimeout.TotalMilliseconds,
});
return;
}
try
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
_logger?.Error(
"WorkerPipeSessionBackgroundTaskStopFailed",
new Dictionary<string, object?>
{
["task"] = taskName,
["exception"] = ex.ToString(),
});
}
}
private async Task RunEventDrainLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
await Task.Delay(EventDrainInterval, cancellationToken).ConfigureAwait(false);
continue;
}
WorkerFault? fault = runtimeSession.DrainFault();
if (fault is not null)
{
_state = WorkerState.Faulted;
await TryWriteFaultAsync(fault, cancellationToken).ConfigureAwait(false);
throw new InvalidOperationException(
string.IsNullOrWhiteSpace(fault.DiagnosticMessage)
? $"MXAccess event queue faulted with category {fault.Category}."
: fault.DiagnosticMessage);
}
IReadOnlyList<WorkerEvent> events = runtimeSession.DrainEvents(EventDrainBatchSize);
if (events.Count == 0)
{
await Task.Delay(EventDrainInterval, cancellationToken).ConfigureAwait(false);
continue;
}
foreach (WorkerEvent workerEvent in events)
{
await _writer
.WriteAsync(CreateEnvelope(workerEvent), cancellationToken)
.ConfigureAwait(false);
}
}
}
private async Task<bool> DispatchGatewayEnvelopeAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommand:
// Worker control/lifecycle commands (Ping, GetSessionState,
// GetWorkerInfo, DrainEvents, ShutdownWorker) are answered here
// on the message-loop thread instead of being dispatched onto
// the STA. Their replies are built from process-level state
// (worker process id, assembly version, _state, the runtime
// session's event queue) that the STA-bound
// MxAccessCommandExecutor cannot see, and ShutdownWorker must
// return its OK reply BEFORE the graceful shutdown joins the
// STA thread — running it on the STA would deadlock. Returning
// false from the ShutdownWorker arm stops the read loop exactly
// as a WorkerShutdown envelope would.
if (IsControlCommand(envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified))
{
return await HandleControlCommandAsync(envelope, cancellationToken).ConfigureAwait(false);
}
TryStartCommandTask(envelope, cancellationToken);
return true;
case WorkerEnvelope.BodyOneofCase.WorkerShutdown:
await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false);
return false;
case WorkerEnvelope.BodyOneofCase.WorkerCancel:
_runtimeSession?.CancelCommand(envelope.CorrelationId);
return true;
default:
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
$"Worker received unexpected gateway envelope body {envelope.BodyCase}.");
}
}
private static bool IsControlCommand(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Ping => true,
MxCommandKind.GetSessionState => true,
MxCommandKind.GetWorkerInfo => true,
MxCommandKind.DrainEvents => true,
MxCommandKind.ShutdownWorker => true,
_ => false,
};
}
/// <summary>
/// Answers a worker control/lifecycle command on the message-loop
/// thread (never on the STA). Returns <c>false</c> only for
/// <see cref="MxCommandKind.ShutdownWorker"/> — after writing its OK
/// reply this drives the same graceful-shutdown path a
/// <c>WorkerShutdown</c> envelope would, then signals the read loop to
/// stop. All other control commands return <c>true</c> to keep reading.
/// </summary>
private async Task<bool> HandleControlCommandAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
WorkerCommand workerCommand = envelope.WorkerCommand;
MxCommand command = workerCommand.Command;
string correlationId = envelope.CorrelationId;
if (command.Kind == MxCommandKind.ShutdownWorker)
{
// Build and emit the OK reply BEFORE triggering shutdown so the
// gateway's correlation-id wait is satisfied even though the
// graceful shutdown below tears the session (and pipe) down.
MxCommandReply shutdownReply = CreateControlOkReply(correlationId, command.Kind);
await WriteControlReplyAsync(shutdownReply, cancellationToken).ConfigureAwait(false);
WorkerShutdown shutdown = new();
if (command.ShutdownWorker?.GracePeriod is not null)
{
shutdown.GracePeriod = command.ShutdownWorker.GracePeriod;
}
shutdown.Reason = "ShutdownWorker command";
await ShutdownAsync(shutdown, cancellationToken).ConfigureAwait(false);
return false;
}
MxCommandReply reply = command.Kind switch
{
MxCommandKind.Ping => CreatePingReply(correlationId, command),
MxCommandKind.GetSessionState => CreateSessionStateReply(correlationId, command.Kind),
MxCommandKind.GetWorkerInfo => CreateWorkerInfoReply(correlationId, command.Kind),
MxCommandKind.DrainEvents => CreateDrainEventsReply(correlationId, command),
_ => CreateControlOkReply(correlationId, command.Kind),
};
await WriteControlReplyAsync(reply, cancellationToken).ConfigureAwait(false);
return true;
}
private Task WriteControlReplyAsync(
MxCommandReply reply,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerCommandReply
{
Reply = reply,
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken);
}
private MxCommandReply CreatePingReply(string correlationId, MxCommand command)
{
MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind);
// Echo the ping message back through the base reply's diagnostic
// message field (there is no dedicated PingReply payload). An empty
// message leaves the diagnostic field at its proto3 default.
string? message = command.Ping?.Message;
if (!string.IsNullOrEmpty(message))
{
reply.DiagnosticMessage = message;
}
return reply;
}
private MxCommandReply CreateSessionStateReply(string correlationId, MxCommandKind kind)
{
MxCommandReply reply = CreateControlOkReply(correlationId, kind);
reply.SessionState = new SessionStateReply
{
State = MapWorkerStateToSessionState(_state),
};
return reply;
}
private MxCommandReply CreateWorkerInfoReply(string correlationId, MxCommandKind kind)
{
MxCommandReply reply = CreateControlOkReply(correlationId, kind);
reply.WorkerInfo = new WorkerInfoReply
{
WorkerProcessId = _processIdProvider(),
WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty,
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
};
return reply;
}
private MxCommandReply CreateDrainEventsReply(string correlationId, MxCommand command)
{
MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind);
DrainEventsReply drainReply = new();
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is not null)
{
uint maxEvents = command.DrainEvents?.MaxEvents ?? 0;
foreach (WorkerEvent workerEvent in runtimeSession.DrainEvents(maxEvents))
{
if (workerEvent.Event is not null)
{
drainReply.Events.Add(workerEvent.Event);
}
}
}
reply.DrainEvents = drainReply;
return reply;
}
private MxCommandReply CreateControlOkReply(string correlationId, MxCommandKind kind)
{
return new MxCommandReply
{
SessionId = _options.SessionId,
CorrelationId = correlationId,
Kind = kind,
Hresult = 0,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
}
private static SessionState MapWorkerStateToSessionState(WorkerState state)
{
return state switch
{
WorkerState.Starting => SessionState.StartingWorker,
WorkerState.Handshaking => SessionState.Handshaking,
WorkerState.InitializingSta => SessionState.InitializingWorker,
WorkerState.Ready => SessionState.Ready,
// A control command is being served, so the STA is alive and
// ready — the busy state is incidental, not a distinct lifecycle.
WorkerState.ExecutingCommand => SessionState.Ready,
WorkerState.ShuttingDown => SessionState.Closing,
WorkerState.Stopped => SessionState.Closed,
WorkerState.Faulted => SessionState.Faulted,
_ => SessionState.Unspecified,
};
}
private async Task ProcessCommandAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
IWorkerRuntimeSession runtimeSession = _runtimeSession
?? throw new InvalidOperationException("Worker runtime session has not been initialized.");
WorkerCommand workerCommand = envelope.WorkerCommand;
MxCommand command = workerCommand.Command;
StaCommand staCommand = new(
_options.SessionId,
envelope.CorrelationId,
command,
workerCommand.EnqueueTimestamp,
cancellationToken);
try
{
MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false);
// _state is only ever assigned Starting, Handshaking, InitializingSta,
// Ready, ShuttingDown, Faulted, or Stopped — never ExecutingCommand
// (that value is synthesized in CreateHeartbeat from the live
// CurrentCommandCorrelationId and never written back to _state). So
// the only command-serving state is Ready; anything else means a
// state transition (shutdown / fault) raced the command's
// completion and we must drop the reply rather than write into a
// half-torn-down pipe.
if (_state != WorkerState.Ready)
{
LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName);
return;
}
await _writer
.WriteAsync(
CreateEnvelope(new WorkerCommandReply
{
Reply = reply,
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
if (_state != WorkerState.Ready)
{
LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName);
return;
}
_state = WorkerState.Faulted;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.MxaccessCommandFailed,
staCommand.MethodName,
exception),
cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Logs that a completed command result was dropped because the
/// worker is no longer in a command-serving state (typically a
/// shutdown that raced the command's completion). Without this
/// diagnostic the gateway's correlation-id wait blocks until its own
/// timeout with no trace of why no reply arrived.
/// </summary>
private void LogCommandResultDropped(string correlationId, string commandMethod)
{
_logger?.Information(
"WorkerCommandResultDropped",
new Dictionary<string, object?>
{
["correlation_id"] = correlationId,
["command_method"] = commandMethod,
["worker_state"] = _state.ToString(),
});
}
private async Task ShutdownAsync(
WorkerShutdown shutdown,
CancellationToken cancellationToken)
{
_state = WorkerState.ShuttingDown;
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
await WriteShutdownAckAsync(
CreateShutdownAck(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()), shutdown),
cancellationToken).ConfigureAwait(false);
return;
}
TimeSpan gracePeriod = ResolveGracePeriod(shutdown);
StopAcceptingCommands();
try
{
MxAccessShutdownResult result = await runtimeSession
.ShutdownGracefullyAsync(gracePeriod, cancellationToken)
.ConfigureAwait(false);
await WaitForActiveCommandTasksAsync(gracePeriod, cancellationToken).ConfigureAwait(false);
LogShutdownFailures(result.Failures);
await WriteShutdownAckAsync(CreateShutdownAck(result, shutdown), cancellationToken).ConfigureAwait(false);
}
catch (TimeoutException exception)
{
_state = WorkerState.Faulted;
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
throw;
}
}
private void TryStartCommandTask(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
Task commandTask;
lock (_commandTaskGate)
{
if (!_acceptingCommands)
{
return;
}
commandTask = ProcessCommandAsync(envelope, cancellationToken);
_activeCommandTasks.Add(commandTask);
}
_ = ObserveCommandTaskAsync(commandTask);
}
private async Task ObserveCommandTaskAsync(Task commandTask)
{
try
{
await commandTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
finally
{
lock (_commandTaskGate)
{
_activeCommandTasks.Remove(commandTask);
}
}
}
private void StopAcceptingCommands()
{
lock (_commandTaskGate)
{
_acceptingCommands = false;
}
}
private async Task WaitForActiveCommandTasksAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
Task[] activeTasks;
lock (_commandTaskGate)
{
activeTasks = new List<Task>(_activeCommandTasks).ToArray();
}
if (activeTasks.Length == 0)
{
return;
}
Task activeCommandsTask = Task.WhenAll(activeTasks);
Task timeoutTask = Task.Delay(timeout, cancellationToken);
Task completedTask = await Task.WhenAny(activeCommandsTask, timeoutTask).ConfigureAwait(false);
if (completedTask == activeCommandsTask)
{
await activeCommandsTask.ConfigureAwait(false);
return;
}
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"Worker command tasks did not stop within {timeout}.");
}
private Task WriteShutdownAckAsync(
WorkerShutdownAck shutdownAck,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(CreateEnvelope(shutdownAck), cancellationToken);
}
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
{
// The first heartbeat is sent immediately on entering the loop so the
// gateway's liveness watchdog sees a beat as soon as the worker is
// Ready; the delay is applied between subsequent beats only. A
// delay-before-first-beat loop would leave the gateway without a
// heartbeat for a full HeartbeatInterval after startup.
bool firstBeat = true;
while (!cancellationToken.IsCancellationRequested)
{
if (!firstBeat)
{
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
}
firstBeat = false;
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
continue;
}
WorkerRuntimeHeartbeatSnapshot snapshot = runtimeSession.CaptureHeartbeat();
await _writer
.WriteAsync(CreateEnvelope(CreateHeartbeat(snapshot)), cancellationToken)
.ConfigureAwait(false);
await ReportWatchdogFaultIfNeededAsync(snapshot, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// The watchdog detects a hung STA (no thread activity for longer than
/// <c>HeartbeatGrace</c>) and emits an <c>StaHung</c> fault. Design
/// intent: catch a stuck STA thread, not a legitimately long-running
/// command. <c>StaRuntime.ProcessQueuedCommands</c> calls
/// <c>MarkActivity()</c> only immediately before and after
/// <c>workItem.Execute()</c>, so a synchronously long-running STA
/// command (e.g. <c>ReadBulk</c> waiting <c>timeout_ms</c> for the
/// first OnDataChange callback) freezes <c>LastActivityUtc</c> for the
/// duration of the wait even though the worker is healthy. To avoid
/// self-faulting a healthy in-flight command (Worker-017), the
/// watchdog is suppressed while <c>CurrentCommandCorrelationId</c> is
/// non-empty — the worker already advertises the in-flight command on
/// each heartbeat, so the gateway has the signal it needs to decide
/// the command is just slow. The watchdog still fires on a truly hung
/// STA (no command in flight and no activity), which is the only case
/// the watchdog can usefully distinguish from a slow command.
/// </summary>
/// <remarks>
/// Worker-023: the in-flight-command suppression is itself bounded by
/// <c>WorkerPipeSessionOptions.HeartbeatStuckCeiling</c>. A truly stuck
/// synchronous COM call (e.g. against a dead MXAccess provider whose
/// cross-apartment marshaler is permanently blocked) leaves
/// <c>CurrentCommandCorrelationId</c> non-empty forever; without an
/// upper bound the worker-side <c>StaHung</c> watchdog would be
/// permanently defeated and only the gateway's per-command timeout
/// would catch the hang. Once <c>LastActivityUtc</c> has been stale
/// for longer than <c>HeartbeatStuckCeiling</c> the watchdog fires
/// <c>StaHung</c> regardless of whether a command is in flight.
/// </remarks>
private async Task ReportWatchdogFaultIfNeededAsync(
WorkerRuntimeHeartbeatSnapshot snapshot,
CancellationToken cancellationToken)
{
TimeSpan staleFor = DateTimeOffset.UtcNow - snapshot.LastStaActivityUtc;
if (staleFor <= _sessionOptions.HeartbeatGrace)
{
_watchdogFaultSent = false;
return;
}
if (!string.IsNullOrEmpty(snapshot.CurrentCommandCorrelationId)
&& staleFor <= _sessionOptions.HeartbeatStuckCeiling)
{
// A command is in flight and we are still within the defensive
// suppression ceiling — the STA is busy executing it, not
// hung. The next MarkActivity() in StaRuntime.ProcessQueuedCommands
// will refresh LastActivityUtc once the command returns, at which
// point this branch stops being taken. The heartbeat already
// surfaces the in-flight correlation id so the gateway can apply
// its own per-command timeout if it considers the command too slow.
//
// Worker-023: once staleFor exceeds HeartbeatStuckCeiling we fall
// through to the fault path even with a command in flight — a
// truly stuck synchronous COM call would otherwise keep
// CurrentCommandCorrelationId non-empty indefinitely and the
// worker-side watchdog would never fire.
return;
}
if (_watchdogFaultSent)
{
return;
}
_watchdogFaultSent = true;
// The STA is hung — move the session to Faulted before the next
// heartbeat so the heartbeat's reported State stays consistent with
// the StaHung fault just sent. Without this the heartbeat loop keeps
// advertising a non-faulted state that contradicts the fault.
_state = WorkerState.Faulted;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.StaHung,
snapshot.CurrentCommandCorrelationId,
$"STA activity is stale by {staleFor}."),
cancellationToken).ConfigureAwait(false);
}
private async Task TryWriteFaultAsync(
WorkerFrameProtocolException exception,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The original protocol failure is the actionable error.
}
}
private async Task TryWriteFaultAsync(
MxAccessCreationException exception,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The MXAccess creation failure is the actionable error.
}
}
private async Task TryWriteFaultAsync(
WorkerFault fault,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(fault), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The runtime fault remains observable through worker exit or pipe closure.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
}
private WorkerEnvelope CreateEnvelope(WorkerReady ready)
{
return CreateBaseEnvelope(ready);
}
private WorkerEnvelope CreateEnvelope(WorkerFault fault)
{
return CreateBaseEnvelope(fault);
}
private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply)
{
return CreateBaseEnvelope(reply);
}
private WorkerEnvelope CreateEnvelope(WorkerEvent workerEvent)
{
return CreateBaseEnvelope(workerEvent);
}
private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck)
{
return CreateBaseEnvelope(shutdownAck);
}
private WorkerEnvelope CreateEnvelope(WorkerHeartbeat heartbeat)
{
return CreateBaseEnvelope(heartbeat);
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerHello = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerReady body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerReady = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerFault body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerFault = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty;
envelope.WorkerCommandReply = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerEvent body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerEvent = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerShutdownAck = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHeartbeat body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerHeartbeat = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = _options.ProtocolVersion,
SessionId = _options.SessionId,
Sequence = NextSequence(),
};
}
private ulong NextSequence()
{
return unchecked((ulong)Interlocked.Increment(ref _nextSequence));
}
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
// RunAsync constructs the runtime session via _runtimeSessionFactory()
// before invoking CompleteStartupHandshakeAsync, so on the production
// path _runtimeSession is already non-null when this default
// initializer runs. Treat that pre-existing instance as authoritative
// and only drive its StartAsync — unconditionally reassigning
// _runtimeSession here would leak the factory-supplied session (no
// Dispose) and replace it with a hard-coded MxAccessStaSession,
// discarding the factory's configuration. The fall-back construction
// is preserved for the legacy direct-invocation path where the
// parameterless CompleteStartupHandshakeAsync is used without a
// prior factory call.
_runtimeSession ??= new MxAccessStaSession(
(eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null));
IWorkerRuntimeSession session = _runtimeSession;
try
{
return await session
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
{
session.Dispose();
_runtimeSession = null;
throw;
}
}
private WorkerHeartbeat CreateHeartbeat(WorkerRuntimeHeartbeatSnapshot snapshot)
{
WorkerState state = string.IsNullOrWhiteSpace(snapshot.CurrentCommandCorrelationId)
? _state
: WorkerState.ExecutingCommand;
return new WorkerHeartbeat
{
WorkerProcessId = _processIdProvider(),
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(snapshot.LastStaActivityUtc),
PendingCommandCount = snapshot.PendingCommandCount,
OutboundEventQueueDepth = snapshot.OutboundEventQueueDepth,
LastEventSequence = snapshot.LastEventSequence,
CurrentCommandCorrelationId = snapshot.CurrentCommandCorrelationId,
};
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
}
private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown)
{
if (shutdown.GracePeriod is null)
{
return TimeSpan.FromSeconds(10);
}
TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan();
return gracePeriod <= TimeSpan.Zero
? TimeSpan.FromSeconds(10)
: gracePeriod;
}
private static WorkerShutdownAck CreateShutdownAck(
MxAccessShutdownResult result,
WorkerShutdown shutdown)
{
string message = result.Succeeded
? "Graceful shutdown completed."
: $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).";
if (!string.IsNullOrWhiteSpace(shutdown.Reason))
{
message = $"{message} Reason: {shutdown.Reason}";
}
return new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = message,
},
};
}
private void LogShutdownFailures(IReadOnlyList<MxAccessShutdownFailure> failures)
{
foreach (MxAccessShutdownFailure failure in failures)
{
_logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary<string, object?>
{
["session_id"] = _options.SessionId,
["operation"] = failure.Operation,
["server_handle"] = failure.ServerHandle,
["item_handle"] = failure.ItemHandle,
["exception_type"] = failure.ExceptionType,
["hresult"] = failure.HResult,
});
}
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
{
Category = MapFaultCategory(exception.ErrorCode),
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = exception.Message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = exception.Message,
},
};
}
private static WorkerFault CreateFault(MxAccessCreationException exception)
{
WorkerFault fault = new()
{
Category = WorkerFaultCategory.MxaccessCreationFailed,
ExceptionType = exception.InnerException?.GetType().FullName ?? exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = exception.Message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
},
};
int? hresult = MxAccessCreationException.ExtractHResult(exception);
if (hresult.HasValue)
{
fault.Hresult = hresult.Value;
}
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
Exception exception)
{
WorkerFault fault = CreateFault(
category,
commandMethod,
exception.Message);
fault.ExceptionType = exception.GetType().FullName ?? string.Empty;
fault.ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
};
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
string diagnosticMessage)
{
return new WorkerFault
{
Category = category,
CommandMethod = commandMethod ?? string.Empty,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
};
}
private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception)
{
return CreateFault(
WorkerFaultCategory.ShutdownTimeout,
commandMethod: string.Empty,
exception);
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
{
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch => WorkerFaultCategory.ProtocolMismatch,
WorkerFrameProtocolErrorCode.EndOfStream => WorkerFaultCategory.PipeDisconnected,
_ => WorkerFaultCategory.ProtocolViolation,
};
}
}