Resolve Worker-001, Worker-002, Worker-003 code-review findings

Worker-001: WnWrapAlarmConsumer armed a System.Threading.Timer whose OnPoll
callback ran GetXmlCurrentAlarms2 on a thread-pool thread against the
Apartment-threaded wnwrap COM object, which can deadlock on cross-apartment
marshaling. Removed the pollTimer/pollIntervalMs fields, OnPoll, the
poll-interval constructor parameter, and the timer arm/disposal. Polls are
driven externally by the STA via StaRuntime.InvokeAsync(PollOnce).

Worker-002: RunHeartbeatLoopAsync delayed a full HeartbeatInterval before
the first heartbeat. Restructured so the first beat is sent immediately on
entering the loop and the delay applies only between subsequent beats.

Worker-003: ProcessCommandAsync silently returned without a reply when
_state was not a command-serving state after dispatch. Both drop sites now
log a WorkerCommandResultDropped diagnostic with correlation_id via
IWorkerLogger; _state is now volatile.

Three pre-existing tests that asserted strict frame ordering were updated to
tolerate an interleaved first heartbeat (Worker-002 consequence).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 20:59:46 -04:00
parent e967e85973
commit 53e3973209
8 changed files with 323 additions and 97 deletions
+38 -2
View File
@@ -29,7 +29,11 @@ public sealed class WorkerPipeSession
private readonly HashSet<Task> _activeCommandTasks = new();
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
private WorkerState _state = WorkerState.Starting;
// Mutated from the message loop, command tasks, the heartbeat loop and the
// shutdown path; volatile so cross-thread reads observe the latest state
// without tearing (WorkerState is an int-backed protobuf enum).
private volatile WorkerState _state = WorkerState.Starting;
private bool _acceptingCommands = true;
private bool _watchdogFaultSent;
private bool _shutdownTimedOut;
@@ -398,6 +402,7 @@ public sealed class WorkerPipeSession
MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false);
if (_state is not WorkerState.Ready and not WorkerState.ExecutingCommand)
{
LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName);
return;
}
@@ -415,6 +420,7 @@ public sealed class WorkerPipeSession
{
if (_state is not WorkerState.Ready and not WorkerState.ExecutingCommand)
{
LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName);
return;
}
@@ -428,6 +434,25 @@ public sealed class WorkerPipeSession
}
}
/// <summary>
/// Logs that a completed command result was dropped because the
/// worker is no longer in a command-serving state (typically a
/// shutdown that raced the command's completion). Without this
/// diagnostic the gateway's correlation-id wait blocks until its own
/// timeout with no trace of why no reply arrived.
/// </summary>
private void LogCommandResultDropped(string correlationId, string commandMethod)
{
_logger?.Information(
"WorkerCommandResultDropped",
new Dictionary<string, object?>
{
["correlation_id"] = correlationId,
["command_method"] = commandMethod,
["worker_state"] = _state.ToString(),
});
}
private async Task ShutdownAsync(
WorkerShutdown shutdown,
CancellationToken cancellationToken)
@@ -544,9 +569,20 @@ public sealed class WorkerPipeSession
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
{
// The first heartbeat is sent immediately on entering the loop so the
// gateway's liveness watchdog sees a beat as soon as the worker is
// Ready; the delay is applied between subsequent beats only. A
// delay-before-first-beat loop would leave the gateway without a
// heartbeat for a full HeartbeatInterval after startup.
bool firstBeat = true;
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
if (!firstBeat)
{
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
}
firstBeat = false;
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
@@ -42,8 +42,8 @@ public interface IMxAccessAlarmConsumer : IDisposable
/// Subscription string follows AVEVA's canonical format:
/// <c>\\&lt;node&gt;\Galaxy!&lt;area&gt;</c>. The literal "Galaxy" is
/// the provider name (regardless of the configured Galaxy database
/// name). Calling Subscribe also begins polling on the consumer's
/// internal timer.
/// name). Subscribe does not start any polling of its own; the caller
/// drives polls explicitly via <see cref="PollOnce"/>.
/// </summary>
void Subscribe(string subscription);
@@ -88,10 +88,8 @@ public interface IMxAccessAlarmConsumer : IDisposable
/// <summary>
/// Drives a single synchronous poll of the underlying alarm source.
/// Implementations that use an internal <see cref="System.Threading.Timer"/>
/// are constructed with <c>pollIntervalMilliseconds=0</c> in production so
/// the timer is disabled; the worker's STA drives polls via
/// <c>StaRuntime.InvokeAsync</c> instead, satisfying the
/// The production consumer owns no internal timer; the worker's STA
/// drives polls via <c>StaRuntime.InvokeAsync</c>, satisfying the
/// <c>ThreadingModel=Apartment</c> requirement of
/// <c>wwAlarmConsumerClass</c>. Fake implementations should no-op.
/// This method must be invoked on the thread that created the consumer
@@ -2,7 +2,6 @@ using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.InteropServices;
using System.Threading;
using System.Xml;
using WNWRAPCONSUMERLib;
@@ -31,15 +30,16 @@ namespace MxGateway.Worker.MxAccess;
/// <strong>Threading.</strong> The wnwrap CLSID is registered with
/// <c>ThreadingModel=Apartment</c>. The consumer must be created
/// and operated from an STA thread; the worker's
/// <see cref="MxAccessStaSession"/> already runs an STA pump that
/// is the natural host. Polling cadence is governed by
/// <see cref="PollIntervalMilliseconds"/> on a dedicated timer the
/// consumer owns; in production the worker's STA dispatcher should
/// marshal each callback onto the STA thread before invoking
/// <c>GetXmlCurrentAlarms2</c>. For now (test-grade), this consumer
/// calls the COM API on whichever thread the timer fires it on —
/// the worker bootstrap will gain a thin "run-on-STA" wrapper as
/// part of A.3 dispatcher wiring.
/// <see cref="MxAccessStaSession"/> runs an STA pump that hosts it.
/// The consumer owns <em>no</em> internal timer: every COM call
/// (<c>Subscribe</c>, <c>PollOnce</c>, <c>AcknowledgeBy*</c>) must
/// be invoked on the STA that created the consumer. Polling cadence
/// is driven externally by the worker's STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c>, which
/// keeps every <c>GetXmlCurrentAlarms2</c> call on the apartment that
/// owns the COM object. A thread-pool timer would call the COM API
/// off the owning STA and can deadlock on cross-apartment marshaling
/// when the STA is not pumping messages, so no such timer exists.
/// </para>
/// </remarks>
public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
@@ -47,52 +47,39 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
private const string DefaultProductName = "OtOpcUa.MxGateway";
private const string DefaultApplicationName = "OtOpcUa.MxGateway.Worker";
private const string DefaultVersion = "1.0";
private const int DefaultPollIntervalMilliseconds = 500;
private const int DefaultMaxAlarmsPerFetch = 1024;
private readonly object syncRoot = new object();
private readonly Dictionary<Guid, MxAlarmSnapshotRecord> latestSnapshot =
new Dictionary<Guid, MxAlarmSnapshotRecord>();
private readonly int pollIntervalMs;
private readonly int maxAlarmsPerFetch;
private wwAlarmConsumerClass? client;
private wwAlarmConsumerClass? ackClient;
private string subscriptionExpression = string.Empty;
private Timer? pollTimer;
private bool subscribed;
private bool disposed;
/// <summary>
/// Production constructor — creates the wnwrap COM object on the
/// current thread (must be the worker's STA) and disables the
/// internal <see cref="Timer"/> (<c>pollIntervalMilliseconds=0</c>).
/// Polling is driven externally by the STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c> so
/// that every COM call stays on the STA that owns the apartment.
/// current thread (which must be the worker's STA). Polling is driven
/// externally by the STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c> so that
/// every COM call stays on the STA that owns the apartment.
/// </summary>
public WnWrapAlarmConsumer()
: this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch)
: this(new wwAlarmConsumerClass(), DefaultMaxAlarmsPerFetch)
{
}
/// <summary>
/// Test seam / explicit construction — inject a pre-created COM
/// client and tune the poll cadence. <c>pollIntervalMilliseconds == 0</c>
/// disables the internal <see cref="Timer"/> entirely; the caller
/// must drive <see cref="PollOnce"/> manually (used by hosts that
/// marshal polls onto a foreign STA, and by live smoke tests that
/// pump from the STA they own).
/// Test seam / explicit construction.
/// </summary>
public WnWrapAlarmConsumer(
wwAlarmConsumerClass client,
int pollIntervalMilliseconds,
int maxAlarmsPerFetch)
{
this.client = client ?? throw new ArgumentNullException(nameof(client));
this.pollIntervalMs = pollIntervalMilliseconds < 0
? DefaultPollIntervalMilliseconds
: pollIntervalMilliseconds;
this.maxAlarmsPerFetch = maxAlarmsPerFetch > 0
? maxAlarmsPerFetch
: DefaultMaxAlarmsPerFetch;
@@ -101,8 +88,6 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
/// <inheritdoc />
public event EventHandler<MxAlarmTransitionEvent>? AlarmTransitionEmitted;
public int PollIntervalMilliseconds => pollIntervalMs;
/// <inheritdoc />
public void Subscribe(string subscription)
{
@@ -136,7 +121,9 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
}
// hWnd=0: wnwrap supports a pull-based model — no message pump
// is required. We poll GetXmlCurrentAlarms2 on a timer below.
// is required. GetXmlCurrentAlarms2 is polled by the worker's STA
// via StaRuntime.InvokeAsync(() => consumer.PollOnce()); this type
// owns no internal timer.
int reg = com.IwwAlarmConsumer_RegisterConsumer(
hWnd: 0,
szProductName: DefaultProductName,
@@ -201,10 +188,6 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
subscriptionExpression = subscription;
subscribed = true;
if (pollIntervalMs > 0)
{
pollTimer = new Timer(OnPoll, state: null, dueTime: 0, period: pollIntervalMs);
}
}
}
@@ -294,31 +277,14 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
}
}
private void OnPoll(object? _)
{
if (disposed) return;
try
{
PollOnce();
}
catch (Exception ex)
{
// Swallow — the poll loop must not propagate exceptions out of
// the timer callback, or the worker process tears down. The
// EventQueue fault counter (wired in by the future A.3 dispatcher)
// is the right place to surface poll failures; for now the
// exception is intentionally silent so the timer keeps firing.
_ = ex;
}
}
/// <summary>
/// Synchronously poll the wnwrap consumer once and dispatch any
/// transitions. Public so STA-bound hosts can drive polling from
/// the thread that owns the COM object instead of relying on the
/// internal <see cref="Timer"/> (which fires on a thread-pool
/// thread and blocks indefinitely on cross-apartment marshaling
/// when the host STA isn't pumping messages).
/// transitions. STA-bound hosts drive polling by calling this from
/// the thread that owns the COM object. The consumer deliberately
/// owns no internal timer: a thread-pool timer would call the
/// apartment-threaded COM object off its owning STA and can block
/// indefinitely on cross-apartment marshaling when the STA is not
/// pumping messages.
/// </summary>
public void PollOnce()
{
@@ -524,21 +490,17 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
/// <inheritdoc />
public void Dispose()
{
Timer? timerToDispose;
wwAlarmConsumerClass? clientToDispose;
wwAlarmConsumerClass? ackClientToDispose;
lock (syncRoot)
{
if (disposed) return;
disposed = true;
timerToDispose = pollTimer;
pollTimer = null;
clientToDispose = client;
client = null;
ackClientToDispose = ackClient;
ackClient = null;
}
timerToDispose?.Dispose();
ReleaseConsumerCom(clientToDispose);
ReleaseConsumerCom(ackClientToDispose);
}