From 53e3973209a9487906160685f493c687199491c9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 18 May 2026 20:59:46 -0400 Subject: [PATCH] Resolve Worker-001, Worker-002, Worker-003 code-review findings Worker-001: WnWrapAlarmConsumer armed a System.Threading.Timer whose OnPoll callback ran GetXmlCurrentAlarms2 on a thread-pool thread against the Apartment-threaded wnwrap COM object, which can deadlock on cross-apartment marshaling. Removed the pollTimer/pollIntervalMs fields, OnPoll, the poll-interval constructor parameter, and the timer arm/disposal. Polls are driven externally by the STA via StaRuntime.InvokeAsync(PollOnce). Worker-002: RunHeartbeatLoopAsync delayed a full HeartbeatInterval before the first heartbeat. Restructured so the first beat is sent immediately on entering the loop and the delay applies only between subsequent beats. Worker-003: ProcessCommandAsync silently returned without a reply when _state was not a command-serving state after dispatch. Both drop sites now log a WorkerCommandResultDropped diagnostic with correlation_id via IWorkerLogger; _state is now volatile. Three pre-existing tests that asserted strict frame ordering were updated to tolerate an interleaved first heartbeat (Worker-002 consequence). Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Worker/findings.md | 14 +- .../AlarmsLiveSmokeTests.cs | 20 +- .../Ipc/WorkerPipeClientTests.cs | 27 ++- .../Ipc/WorkerPipeSessionTests.cs | 181 +++++++++++++++++- .../MxAccess/WnWrapAlarmConsumerXmlTests.cs | 40 ++++ src/MxGateway.Worker/Ipc/WorkerPipeSession.cs | 40 +++- .../MxAccess/IMxAccessAlarmConsumer.cs | 10 +- .../MxAccess/WnWrapAlarmConsumer.cs | 88 +++------ 8 files changed, 323 insertions(+), 97 deletions(-) diff --git a/code-reviews/Worker/findings.md b/code-reviews/Worker/findings.md index 3fcd020..5f48b07 100644 --- a/code-reviews/Worker/findings.md +++ b/code-reviews/Worker/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `6c64030` | | Status | Reviewed | -| Open findings | 15 | +| Open findings | 12 | ## Checklist coverage @@ -33,13 +33,13 @@ | Severity | High | | Category | Concurrency & thread safety | | Location | `src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs:204-207` | -| Status | Open | +| Status | Resolved | **Description:** When constructed with `pollIntervalMilliseconds > 0`, `Subscribe` starts a `System.Threading.Timer` whose `OnPoll` callback runs `PollOnce()` — which calls `wwAlarmConsumerClass.GetXmlCurrentAlarms2` — on a thread-pool thread. The wnwrap CLSID is registered `ThreadingModel=Apartment`; calling its methods off the owning STA violates the hard rule that all COM calls happen on the dedicated STA thread, and can deadlock on cross-apartment marshaling when the STA is not pumping. The production path (default constructor, interval 0) is safe, but the public 3-arg constructor leaves this footgun callable, and tests/live-smoke use it. **Recommendation:** Remove the internal `Timer` entirely (production already drives `PollOnce` from the STA), or document and gate it so it can only be used from an STA thread. At minimum, make the timer-driven mode unreachable from any production wiring. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — Removed the off-STA timer infrastructure from `WnWrapAlarmConsumer`: the `Timer? pollTimer` and `pollIntervalMs` fields, the `DefaultPollIntervalMilliseconds` constant, the `OnPoll` callback, the timer-arming arm in `Subscribe`, and the timer disposal block in `Dispose`. The `pollIntervalMilliseconds` parameter is gone from both public constructors (the test-seam ctor is now 2-arg: `wwAlarmConsumerClass` + `maxAlarmsPerFetch`), so the off-STA footgun is structurally unreachable. `PollOnce()` remains the public STA-driven entry point. The stale "poll … on a timer below" comment was corrected. Verified by the regression tests `WnWrapAlarmConsumer_has_no_internal_timer_field` and `WnWrapAlarmConsumer_exposes_no_poll_interval_constructor_parameter`; the `AlarmsLiveSmokeTests` call site was updated to the 2-arg constructor. ### Worker-002 @@ -48,13 +48,13 @@ | Severity | High | | Category | Correctness & logic bugs | | Location | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:545-549` | -| Status | Open | +| Status | Resolved | **Description:** `RunHeartbeatLoopAsync` calls `await Task.Delay(_sessionOptions.HeartbeatInterval, ...)` before sending the first heartbeat. The gateway therefore receives no heartbeat for the first full interval (default 5s) after the worker reaches `Ready`. If the gateway's liveness watchdog expects a heartbeat sooner, a healthy worker can be misclassified as hung at startup. **Recommendation:** Send an initial heartbeat immediately on entering the loop, or move the `Task.Delay` to the end of the loop body. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — Restructured `RunHeartbeatLoopAsync` so the `Task.Delay(HeartbeatInterval)` is applied between beats only, not before the first. A `firstBeat` guard skips the delay on the initial iteration, so the gateway sees a heartbeat as soon as the worker is `Ready`; cancellation behavior is preserved (the loop still observes the token and the delay still throws on cancellation). Verified by the regression test `RunAsync_SendsFirstHeartbeatImmediatelyOnEnteringLoop`. Three pre-existing tests (`WorkerPipeClientTests.RunAsync_ConnectsToPipeAndCompletesHandshake`, `WorkerPipeClientTests.RunAsync_RetriesUntilPipeServerAppears`, `WorkerPipeSessionTests.RunAsync_WhenCommandThrowsAfterShutdown_DropsLateFaultAndWritesShutdownAck`) assumed strict frame ordering and were updated to skip the now-interleaved first heartbeat while still asserting the same shutdown-ack behavior. ### Worker-003 @@ -63,13 +63,13 @@ | Severity | High | | Category | Correctness & logic bugs | | Location | `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:399-403`, `:416-419` | -| Status | Open | +| Status | Resolved | **Description:** `ProcessCommandAsync` checks `_state` after `DispatchAsync` completes and silently `return`s without writing a `WorkerCommandReply` (or fault) when `_state` is not `Ready`/`ExecutingCommand`. `_state` is a plain field mutated from multiple tasks (heartbeat loop, event-drain loop, shutdown). A command that completes successfully while `_state` has transitioned will have its reply dropped with no diagnostic, and the gateway's correlation-id wait then hangs until its own timeout. The `_state` read is also not synchronized. **Recommendation:** Always attempt to write the reply/fault for an in-flight command, or explicitly reject in-flight commands with a `Canceled`/`WorkerUnavailable` reply during state transitions. Make `_state` access thread-safe (volatile or locked). -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — Both silent-drop `return` sites in `ProcessCommandAsync` (the post-`DispatchAsync` success path and the exception path) now call a new `LogCommandResultDropped` helper before returning. The helper logs an Information event named `WorkerCommandResultDropped` via the session's `IWorkerLogger`, carrying the command's `correlation_id` plus `command_method` and `worker_state`, so a stuck gateway correlation-id wait is now traceable. The `_state` field was made `volatile` (`WorkerState` is an int-backed protobuf enum, so volatile is valid) so cross-thread reads observe the latest value without tearing; this is a low-risk, non-behavioral change and did not destabilize any test. Verified by the regression test `RunAsync_WhenReplyIsDroppedAfterShutdown_LogsDiagnostic`. ### Worker-004 diff --git a/src/MxGateway.Worker.Tests/AlarmsLiveSmokeTests.cs b/src/MxGateway.Worker.Tests/AlarmsLiveSmokeTests.cs index a99e3d6..07c0bfa 100644 --- a/src/MxGateway.Worker.Tests/AlarmsLiveSmokeTests.cs +++ b/src/MxGateway.Worker.Tests/AlarmsLiveSmokeTests.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; -using System.Linq; using System.Threading; using MxGateway.Contracts.Proto; using MxGateway.Worker.MxAccess; @@ -77,13 +76,11 @@ public sealed class AlarmsLiveSmokeTests Log($"Pump duration: {PumpDuration.TotalSeconds:F0}s; transition wait timeout: {TransitionWaitTimeout.TotalSeconds:F0}s"); MxAccessEventQueue queue = new MxAccessEventQueue(); - // pollIntervalMs=0 disables the internal Timer; we drive PollOnce - // manually from the STA below to avoid threadpool→STA marshaling - // (the wnwrap COM is ThreadingModel=Apartment, and this test - // doesn't run a Win32 message pump on its STA). + // The consumer owns no internal timer; we drive PollOnce manually + // from the STA below (the wnwrap COM is ThreadingModel=Apartment, + // and this test doesn't run a Win32 message pump on its STA). WnWrapAlarmConsumer consumer = new WnWrapAlarmConsumer( new WNWRAPCONSUMERLib.wwAlarmConsumerClass(), - pollIntervalMilliseconds: 0, maxAlarmsPerFetch: 1024); MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper()); using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId); @@ -92,13 +89,10 @@ public sealed class AlarmsLiveSmokeTests dispatcher.Subscribe(SubscriptionExpression); Log("Subscribe -> ok. Driving PollOnce manually from this STA..."); - // The wnwrap COM object is ThreadingModel=Apartment. The consumer's - // internal Timer would fire on a threadpool thread and deadlock on - // cross-apartment marshaling without a Win32 message pump. For the - // smoke test we constructed the consumer with pollIntervalMs=0 - // (Timer disabled) and drive PollOnce manually here on the STA. - // Production hosting will route polls through the worker's - // StaRuntime in a follow-up PR. + // The wnwrap COM object is ThreadingModel=Apartment. The consumer + // owns no internal timer, so we drive PollOnce manually here on the + // STA. Production hosting routes polls through the worker's + // StaRuntime. // 1. Wait for the first transition (any kind), then keep waiting // for one with kind=Raise so the alarm is currently Active when diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs index e3c45dd..bedab8e 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs @@ -77,7 +77,9 @@ public sealed class WorkerPipeClientTests }, }); - WorkerEnvelope shutdownAck = await reader.ReadAsync(); + WorkerEnvelope shutdownAck = await ReadUntilAsync( + reader, + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase); await clientTask; } @@ -120,7 +122,9 @@ public sealed class WorkerPipeClientTests Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, (await reader.ReadAsync()).BodyCase); await writer.WriteAsync(CreateShutdown()); - Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, (await reader.ReadAsync()).BodyCase); + Assert.Equal( + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, + (await ReadUntilAsync(reader, WorkerEnvelope.BodyOneofCase.WorkerShutdownAck)).BodyCase); await clientTask; } @@ -143,6 +147,25 @@ public sealed class WorkerPipeClientTests await Assert.ThrowsAsync(async () => await client.RunAsync(workerOptions)); } + /// + /// Reads frames until one matching the expected body case is found, + /// skipping interleaved heartbeats (the first heartbeat is emitted + /// immediately on entering the heartbeat loop — see Worker-002). + /// + private static async Task ReadUntilAsync( + WorkerFrameReader reader, + WorkerEnvelope.BodyOneofCase expectedBody) + { + while (true) + { + WorkerEnvelope envelope = await reader.ReadAsync(); + if (envelope.BodyCase == expectedBody) + { + return envelope; + } + } + } + private static WorkerPipeSession CreateSession( Stream stream, WorkerFrameProtocolOptions options) diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index e65f1d1..b92b264 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -383,16 +383,126 @@ public sealed class WorkerPipeSessionTests await pipePair.GatewayWriter .WriteAsync(CreateShutdownEnvelope(), cancellation.Token); - WorkerEnvelope firstEnvelopeAfterShutdown = await pipePair.GatewayReader - .ReadAsync(cancellation.Token); + // The first heartbeat is emitted immediately on entering the loop + // (Worker-002), so skip any interleaved heartbeats; the late fault + // must still be dropped — no WorkerFault may precede the ack. + WorkerEnvelope envelopeAfterShutdown; + do + { + envelopeAfterShutdown = await pipePair.GatewayReader.ReadAsync(cancellation.Token); + Assert.NotEqual( + WorkerEnvelope.BodyOneofCase.WorkerFault, + envelopeAfterShutdown.BodyCase); + } + while (envelopeAfterShutdown.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerHeartbeat); - Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, firstEnvelopeAfterShutdown.BodyCase); - Assert.Equal(ProtocolStatusCode.Ok, firstEnvelopeAfterShutdown.WorkerShutdownAck.Status.Code); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, envelopeAfterShutdown.BodyCase); + Assert.Equal(ProtocolStatusCode.Ok, envelopeAfterShutdown.WorkerShutdownAck.Status.Code); Task completedTask = await Task.WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(2), cancellation.Token)); Assert.Same(runTask, completedTask); await runTask; } + /// + /// Worker-002 regression: the first heartbeat must be emitted + /// immediately on entering the heartbeat loop, not after a full + /// HeartbeatInterval. A long interval is configured so a delay-first + /// loop would fail to deliver a heartbeat inside the assertion window. + /// + [Fact] + public async Task RunAsync_SendsFirstHeartbeatImmediatelyOnEnteringLoop() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(10)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession( + pipePair.WorkerStream, + runtime, + new WorkerPipeSessionOptions + { + // A deliberately long interval: a delay-before-first-beat + // loop would not produce a heartbeat for 30s. + HeartbeatInterval = TimeSpan.FromSeconds(30), + HeartbeatGrace = TimeSpan.FromSeconds(60), + }); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + DateTimeOffset start = DateTimeOffset.UtcNow; + using CancellationTokenSource heartbeatWait = CancellationTokenSource + .CreateLinkedTokenSource(cancellation.Token); + heartbeatWait.CancelAfter(TimeSpan.FromSeconds(5)); + WorkerEnvelope heartbeat = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerHeartbeat, + heartbeatWait.Token); + TimeSpan elapsed = DateTimeOffset.UtcNow - start; + + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHeartbeat, heartbeat.BodyCase); + Assert.True( + elapsed < TimeSpan.FromSeconds(5), + $"First heartbeat took {elapsed}, expected well under the 30s interval."); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Worker-003 regression: when a command completes after the worker + /// has transitioned out of a command-serving state, the dropped + /// reply must be logged with a diagnostic rather than discarded + /// silently, so a stuck gateway correlation wait can be traced. + /// + [Fact] + public async Task RunAsync_WhenReplyIsDroppedAfterShutdown_LogsDiagnostic() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(10)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new() + { + BlockDispatch = true, + }; + RecordingWorkerLogger logger = new(); + WorkerFrameProtocolOptions options = CreateOptions(); + WorkerPipeSession session = new( + new WorkerFrameReader(pipePair.WorkerStream, options), + new WorkerFrameWriter(pipePair.WorkerStream, options), + options, + () => 1234, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromSeconds(1), + HeartbeatGrace = TimeSpan.FromSeconds(5), + }, + () => runtime, + logger); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter.WriteAsync( + CreateCommandEnvelope("command-dropped-after-shutdown"), + cancellation.Token); + Assert.True(runtime.DispatchStarted.Wait(TimeSpan.FromSeconds(2))); + + await pipePair.GatewayWriter + .WriteAsync(CreateShutdownEnvelope(), cancellation.Token); + + WorkerEnvelope shutdownAck = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, + cancellation.Token); + Assert.Equal(ProtocolStatusCode.Ok, shutdownAck.WorkerShutdownAck.Status.Code); + + Task completedTask = await Task.WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(3), cancellation.Token)); + Assert.Same(runTask, completedTask); + await runTask; + + Assert.Contains( + logger.Events, + entry => entry.EventName == "WorkerCommandResultDropped" + && entry.Fields.TryGetValue("correlation_id", out object? correlationId) + && (string?)correlationId == "command-dropped-after-shutdown"); + } + private static WorkerPipeSession CreateSession( Stream inbound, Stream outbound, @@ -619,6 +729,69 @@ public sealed class WorkerPipeSessionTests buffer[3] = (byte)(value >> 24); } + private sealed class RecordingWorkerLogger : MxGateway.Worker.Bootstrap.IWorkerLogger + { + private readonly object gate = new(); + private readonly List events = new(); + + /// Gets a snapshot of the recorded log entries. + public IReadOnlyList Events + { + get + { + lock (gate) + { + return new List(events); + } + } + } + + /// Records an informational log event. + public void Information(string eventName, IReadOnlyDictionary fields) + { + Record(eventName, fields); + } + + /// Records an error log event. + public void Error(string eventName, IReadOnlyDictionary fields) + { + Record(eventName, fields); + } + + private void Record(string eventName, IReadOnlyDictionary fields) + { + Dictionary copy = new(); + foreach (KeyValuePair field in fields) + { + copy[field.Key] = field.Value; + } + + lock (gate) + { + events.Add(new LogEntry(eventName, copy)); + } + } + + /// A single recorded log entry. + public sealed class LogEntry + { + /// Initializes a recorded log entry. + /// The log event name. + /// The log event fields. + public LogEntry(string eventName, IReadOnlyDictionary fields) + { + EventName = eventName; + Fields = fields; + } + + /// Gets the log event name. + public string EventName { get; } + + /// Gets the log event fields. + public IReadOnlyDictionary Fields { get; } + } + } + private sealed class FakeRuntimeSession : IWorkerRuntimeSession { private readonly ManualResetEventSlim releaseDispatch = new(false); diff --git a/src/MxGateway.Worker.Tests/MxAccess/WnWrapAlarmConsumerXmlTests.cs b/src/MxGateway.Worker.Tests/MxAccess/WnWrapAlarmConsumerXmlTests.cs index 3a3d90d..8442feb 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/WnWrapAlarmConsumerXmlTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/WnWrapAlarmConsumerXmlTests.cs @@ -1,5 +1,7 @@ using System; using System.Linq; +using System.Reflection; +using System.Threading; using MxGateway.Worker.MxAccess; namespace MxGateway.Worker.Tests.MxAccess; @@ -109,4 +111,42 @@ public sealed class WnWrapAlarmConsumerXmlTests Assert.False(WnWrapAlarmConsumer.TryParseHexGuid(hex, out Guid guid)); Assert.Equal(Guid.Empty, guid); } + + /// + /// Worker-001 regression: the consumer must own no internal + /// . A thread-pool timer calling the + /// apartment-threaded wnwrap COM object off its owning STA can + /// deadlock on cross-apartment marshaling, so the timer field and + /// callback must not exist on the type. + /// + [Fact] + public void WnWrapAlarmConsumer_has_no_internal_timer_field() + { + FieldInfo[] fields = typeof(WnWrapAlarmConsumer) + .GetFields(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + + Assert.DoesNotContain(fields, field => field.FieldType == typeof(Timer)); + Assert.Null(typeof(WnWrapAlarmConsumer).GetMethod( + "OnPoll", + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)); + } + + /// + /// Worker-001 regression: no public constructor may accept a + /// poll-interval parameter. A non-zero poll interval was the only + /// way to arm the off-STA timer; removing the parameter makes the + /// footgun structurally unreachable. + /// + [Fact] + public void WnWrapAlarmConsumer_exposes_no_poll_interval_constructor_parameter() + { + foreach (ConstructorInfo constructor in typeof(WnWrapAlarmConsumer) + .GetConstructors(BindingFlags.Instance | BindingFlags.Public)) + { + Assert.DoesNotContain( + constructor.GetParameters(), + parameter => parameter.Name is not null + && parameter.Name.IndexOf("poll", StringComparison.OrdinalIgnoreCase) >= 0); + } + } } diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index 1acad56..03d7262 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -29,7 +29,11 @@ public sealed class WorkerPipeSession private readonly HashSet _activeCommandTasks = new(); private IWorkerRuntimeSession? _runtimeSession; private long _nextSequence; - private WorkerState _state = WorkerState.Starting; + + // Mutated from the message loop, command tasks, the heartbeat loop and the + // shutdown path; volatile so cross-thread reads observe the latest state + // without tearing (WorkerState is an int-backed protobuf enum). + private volatile WorkerState _state = WorkerState.Starting; private bool _acceptingCommands = true; private bool _watchdogFaultSent; private bool _shutdownTimedOut; @@ -398,6 +402,7 @@ public sealed class WorkerPipeSession MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false); if (_state is not WorkerState.Ready and not WorkerState.ExecutingCommand) { + LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName); return; } @@ -415,6 +420,7 @@ public sealed class WorkerPipeSession { if (_state is not WorkerState.Ready and not WorkerState.ExecutingCommand) { + LogCommandResultDropped(envelope.CorrelationId, staCommand.MethodName); return; } @@ -428,6 +434,25 @@ public sealed class WorkerPipeSession } } + /// + /// Logs that a completed command result was dropped because the + /// worker is no longer in a command-serving state (typically a + /// shutdown that raced the command's completion). Without this + /// diagnostic the gateway's correlation-id wait blocks until its own + /// timeout with no trace of why no reply arrived. + /// + private void LogCommandResultDropped(string correlationId, string commandMethod) + { + _logger?.Information( + "WorkerCommandResultDropped", + new Dictionary + { + ["correlation_id"] = correlationId, + ["command_method"] = commandMethod, + ["worker_state"] = _state.ToString(), + }); + } + private async Task ShutdownAsync( WorkerShutdown shutdown, CancellationToken cancellationToken) @@ -544,9 +569,20 @@ public sealed class WorkerPipeSession private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken) { + // The first heartbeat is sent immediately on entering the loop so the + // gateway's liveness watchdog sees a beat as soon as the worker is + // Ready; the delay is applied between subsequent beats only. A + // delay-before-first-beat loop would leave the gateway without a + // heartbeat for a full HeartbeatInterval after startup. + bool firstBeat = true; while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false); + if (!firstBeat) + { + await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false); + } + + firstBeat = false; IWorkerRuntimeSession? runtimeSession = _runtimeSession; if (runtimeSession is null) { diff --git a/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs b/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs index 1a9a97d..cad7290 100644 --- a/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs +++ b/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs @@ -42,8 +42,8 @@ public interface IMxAccessAlarmConsumer : IDisposable /// Subscription string follows AVEVA's canonical format: /// \\<node>\Galaxy!<area>. The literal "Galaxy" is /// the provider name (regardless of the configured Galaxy database - /// name). Calling Subscribe also begins polling on the consumer's - /// internal timer. + /// name). Subscribe does not start any polling of its own; the caller + /// drives polls explicitly via . /// void Subscribe(string subscription); @@ -88,10 +88,8 @@ public interface IMxAccessAlarmConsumer : IDisposable /// /// Drives a single synchronous poll of the underlying alarm source. - /// Implementations that use an internal - /// are constructed with pollIntervalMilliseconds=0 in production so - /// the timer is disabled; the worker's STA drives polls via - /// StaRuntime.InvokeAsync instead, satisfying the + /// The production consumer owns no internal timer; the worker's STA + /// drives polls via StaRuntime.InvokeAsync, satisfying the /// ThreadingModel=Apartment requirement of /// wwAlarmConsumerClass. Fake implementations should no-op. /// This method must be invoked on the thread that created the consumer diff --git a/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs b/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs index 027108a..caee1d4 100644 --- a/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs +++ b/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Generic; using System.Globalization; using System.Runtime.InteropServices; -using System.Threading; using System.Xml; using WNWRAPCONSUMERLib; @@ -31,15 +30,16 @@ namespace MxGateway.Worker.MxAccess; /// Threading. The wnwrap CLSID is registered with /// ThreadingModel=Apartment. The consumer must be created /// and operated from an STA thread; the worker's -/// already runs an STA pump that -/// is the natural host. Polling cadence is governed by -/// on a dedicated timer the -/// consumer owns; in production the worker's STA dispatcher should -/// marshal each callback onto the STA thread before invoking -/// GetXmlCurrentAlarms2. For now (test-grade), this consumer -/// calls the COM API on whichever thread the timer fires it on — -/// the worker bootstrap will gain a thin "run-on-STA" wrapper as -/// part of A.3 dispatcher wiring. +/// runs an STA pump that hosts it. +/// The consumer owns no internal timer: every COM call +/// (Subscribe, PollOnce, AcknowledgeBy*) must +/// be invoked on the STA that created the consumer. Polling cadence +/// is driven externally by the worker's STA via +/// StaRuntime.InvokeAsync(() => consumer.PollOnce()), which +/// keeps every GetXmlCurrentAlarms2 call on the apartment that +/// owns the COM object. A thread-pool timer would call the COM API +/// off the owning STA and can deadlock on cross-apartment marshaling +/// when the STA is not pumping messages, so no such timer exists. /// /// public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer @@ -47,52 +47,39 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer private const string DefaultProductName = "OtOpcUa.MxGateway"; private const string DefaultApplicationName = "OtOpcUa.MxGateway.Worker"; private const string DefaultVersion = "1.0"; - private const int DefaultPollIntervalMilliseconds = 500; private const int DefaultMaxAlarmsPerFetch = 1024; private readonly object syncRoot = new object(); private readonly Dictionary latestSnapshot = new Dictionary(); - private readonly int pollIntervalMs; private readonly int maxAlarmsPerFetch; private wwAlarmConsumerClass? client; private wwAlarmConsumerClass? ackClient; private string subscriptionExpression = string.Empty; - private Timer? pollTimer; private bool subscribed; private bool disposed; /// /// Production constructor — creates the wnwrap COM object on the - /// current thread (must be the worker's STA) and disables the - /// internal (pollIntervalMilliseconds=0). - /// Polling is driven externally by the STA via - /// StaRuntime.InvokeAsync(() => consumer.PollOnce()) so - /// that every COM call stays on the STA that owns the apartment. + /// current thread (which must be the worker's STA). Polling is driven + /// externally by the STA via + /// StaRuntime.InvokeAsync(() => consumer.PollOnce()) so that + /// every COM call stays on the STA that owns the apartment. /// public WnWrapAlarmConsumer() - : this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch) + : this(new wwAlarmConsumerClass(), DefaultMaxAlarmsPerFetch) { } /// - /// Test seam / explicit construction — inject a pre-created COM - /// client and tune the poll cadence. pollIntervalMilliseconds == 0 - /// disables the internal entirely; the caller - /// must drive manually (used by hosts that - /// marshal polls onto a foreign STA, and by live smoke tests that - /// pump from the STA they own). + /// Test seam / explicit construction. /// public WnWrapAlarmConsumer( wwAlarmConsumerClass client, - int pollIntervalMilliseconds, int maxAlarmsPerFetch) { this.client = client ?? throw new ArgumentNullException(nameof(client)); - this.pollIntervalMs = pollIntervalMilliseconds < 0 - ? DefaultPollIntervalMilliseconds - : pollIntervalMilliseconds; this.maxAlarmsPerFetch = maxAlarmsPerFetch > 0 ? maxAlarmsPerFetch : DefaultMaxAlarmsPerFetch; @@ -101,8 +88,6 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer /// public event EventHandler? AlarmTransitionEmitted; - public int PollIntervalMilliseconds => pollIntervalMs; - /// public void Subscribe(string subscription) { @@ -136,7 +121,9 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer } // hWnd=0: wnwrap supports a pull-based model — no message pump - // is required. We poll GetXmlCurrentAlarms2 on a timer below. + // is required. GetXmlCurrentAlarms2 is polled by the worker's STA + // via StaRuntime.InvokeAsync(() => consumer.PollOnce()); this type + // owns no internal timer. int reg = com.IwwAlarmConsumer_RegisterConsumer( hWnd: 0, szProductName: DefaultProductName, @@ -201,10 +188,6 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer subscriptionExpression = subscription; subscribed = true; - if (pollIntervalMs > 0) - { - pollTimer = new Timer(OnPoll, state: null, dueTime: 0, period: pollIntervalMs); - } } } @@ -294,31 +277,14 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer } } - private void OnPoll(object? _) - { - if (disposed) return; - try - { - PollOnce(); - } - catch (Exception ex) - { - // Swallow — the poll loop must not propagate exceptions out of - // the timer callback, or the worker process tears down. The - // EventQueue fault counter (wired in by the future A.3 dispatcher) - // is the right place to surface poll failures; for now the - // exception is intentionally silent so the timer keeps firing. - _ = ex; - } - } - /// /// Synchronously poll the wnwrap consumer once and dispatch any - /// transitions. Public so STA-bound hosts can drive polling from - /// the thread that owns the COM object instead of relying on the - /// internal (which fires on a thread-pool - /// thread and blocks indefinitely on cross-apartment marshaling - /// when the host STA isn't pumping messages). + /// transitions. STA-bound hosts drive polling by calling this from + /// the thread that owns the COM object. The consumer deliberately + /// owns no internal timer: a thread-pool timer would call the + /// apartment-threaded COM object off its owning STA and can block + /// indefinitely on cross-apartment marshaling when the STA is not + /// pumping messages. /// public void PollOnce() { @@ -524,21 +490,17 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer /// public void Dispose() { - Timer? timerToDispose; wwAlarmConsumerClass? clientToDispose; wwAlarmConsumerClass? ackClientToDispose; lock (syncRoot) { if (disposed) return; disposed = true; - timerToDispose = pollTimer; - pollTimer = null; clientToDispose = client; client = null; ackClientToDispose = ackClient; ackClient = null; } - timerToDispose?.Dispose(); ReleaseConsumerCom(clientToDispose); ReleaseConsumerCom(ackClientToDispose); }