fix(worker): wire alarm command handler and STA poll loop (Gap 1 + Gap 2)

Gap 1 — WorkerPipeSession now passes `eq => new AlarmCommandHandler(eq)` as
the alarmCommandHandlerFactory in all three places it constructs
MxAccessStaSession (two convenience constructors and InitializeMxAccessAsync).
Previously the parameterless MxAccessStaSession() set the factory to null,
so every SubscribeAlarms / AcknowledgeAlarm / QueryActiveAlarms command
returned "alarm consumer not configured" in a deployed worker.

  - Added internal `MxAccessStaSession(Func<MxAccessEventQueue, IAlarmCommandHandler>?)`
    constructor that builds all defaults but accepts a factory.
  - Added public `MxAccessStaSession(StaRuntime, factory, eventQueue, alarmFactory?)`
    4-arg overload to complete the constructor chain.

Gap 2 — WnWrapAlarmConsumer now disables its internal threadpool Timer
(pollIntervalMilliseconds=0 in the default constructor). MxAccessStaSession
starts a `RunAlarmPollLoopAsync` background task that sleeps off-STA then
calls `staRuntime.InvokeAsync(() => handler.PollOnce())` at 500ms intervals.
This satisfies the ThreadingModel=Apartment requirement of wwAlarmConsumerClass:
every GetXmlCurrentAlarms2 call now runs on the worker's STA.

  - Added `PollOnce()` to `IMxAccessAlarmConsumer`, `AlarmDispatcher`,
    `IAlarmCommandHandler`, and `AlarmCommandHandler`.
  - Poll loop cancelled and awaited before alarm handler disposal in both
    ShutdownGracefullyAsync and Dispose.

Tests: 4 new tests in MxAccessStaSessionTests verify that
  - SubscribeAlarms reaches the handler when the factory is wired (Gap 1)
  - SubscribeAlarms returns InvalidRequest without a factory (regression guard)
  - PollOnce is called on the STA thread within 3s (Gap 2)
  - The poll loop stops after Dispose (Gap 2 lifecycle)
All fake IMxAccessAlarmConsumer / IAlarmCommandHandler test implementations
updated with no-op PollOnce() to satisfy the new interface member.

Worker tests: 199 passed / 1 pre-existing failure / 4 skipped (was 195/1/4).
Server tests: 308 passed / 0 failures (unchanged).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 06:30:14 -04:00
parent e00ee61cf0
commit a67a5a4857
10 changed files with 412 additions and 7 deletions
@@ -48,7 +48,7 @@ public sealed class WorkerPipeSession
options,
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
logger)
{
}
@@ -69,7 +69,7 @@ public sealed class WorkerPipeSession
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
logger: null)
{
}
@@ -746,7 +746,7 @@ public sealed class WorkerPipeSession
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_runtimeSession = new MxAccessStaSession();
_runtimeSession = new MxAccessStaSession(eq => new AlarmCommandHandler(eq));
try
{
return await _runtimeSession
@@ -160,6 +160,15 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
return filtered;
}
/// <inheritdoc />
public void PollOnce()
{
AlarmDispatcher? d;
lock (syncRoot) d = dispatcher;
// No-op when not yet subscribed or already disposed.
d?.PollOnce();
}
private AlarmDispatcher GetDispatcherOrThrow()
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
@@ -226,4 +235,14 @@ public interface IAlarmCommandHandler : IDisposable
/// prefix matched against <c>AlarmFullReference</c>.
/// </summary>
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
/// <summary>
/// Drives a single poll of the underlying alarm consumer on the
/// caller's thread. This is a no-op when there is no active
/// subscription. In production the caller is the worker's STA
/// (marshalled via <c>StaRuntime.InvokeAsync</c>), which satisfies
/// the <c>ThreadingModel=Apartment</c> requirement of
/// <c>wwAlarmConsumerClass</c>.
/// </summary>
void PollOnce();
}
@@ -119,6 +119,17 @@ public sealed class AlarmDispatcher : IDisposable
ackOperatorFullName);
}
/// <summary>
/// Drives a single synchronous poll of the underlying consumer.
/// Must be called on the STA thread that owns the wnwrap COM object.
/// No-op if the dispatcher has been disposed.
/// </summary>
public void PollOnce()
{
if (disposed) return;
consumer.PollOnce();
}
/// <summary>
/// Snapshot the currently-active alarm set as
/// <see cref="ActiveAlarmSnapshot"/> protos for the
@@ -85,4 +85,17 @@ public interface IMxAccessAlarmConsumer : IDisposable
/// to seed local Part 9 state.
/// </summary>
IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms();
/// <summary>
/// Drives a single synchronous poll of the underlying alarm source.
/// Implementations that use an internal <see cref="System.Threading.Timer"/>
/// are constructed with <c>pollIntervalMilliseconds=0</c> in production so
/// the timer is disabled; the worker's STA drives polls via
/// <c>StaRuntime.InvokeAsync</c> instead, satisfying the
/// <c>ThreadingModel=Apartment</c> requirement of
/// <c>wwAlarmConsumerClass</c>. Fake implementations should no-op.
/// This method must be invoked on the thread that created the consumer
/// (the worker's STA in production).
/// </summary>
void PollOnce();
}
@@ -11,6 +11,8 @@ namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
private static readonly TimeSpan AlarmPollInterval = TimeSpan.FromMilliseconds(500);
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly MxAccessEventQueue eventQueue;
@@ -19,6 +21,8 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
private StaCommandDispatcher? commandDispatcher;
private MxAccessSession? session;
private IAlarmCommandHandler? alarmCommandHandler;
private CancellationTokenSource? alarmPollCts;
private Task? alarmPollTask;
private bool disposed;
/// <summary>
@@ -32,6 +36,22 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with default STA runtime,
/// factory, and event queue, but with a custom alarm-command handler factory. The factory is
/// invoked on the STA thread during
/// <see cref="StartAsync(string, int, CancellationToken)"/>; pass <c>null</c> to opt out
/// of alarm-side commands.
/// </summary>
internal MxAccessStaSession(Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessEventQueue(),
alarmCommandHandlerFactory)
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom STA runtime and factory.
/// </summary>
@@ -60,6 +80,26 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom event queue
/// and an alarm-command handler factory.
/// </summary>
/// <param name="staRuntime">STA thread runtime.</param>
/// <param name="factory">MXAccess COM object factory.</param>
/// <param name="eventQueue">Event queue for buffering MXAccess events.</param>
/// <param name="alarmCommandHandlerFactory">
/// Factory that constructs the alarm-command handler from the event queue.
/// Pass <c>null</c> to opt out of alarm-side commands.
/// </param>
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
MxAccessEventQueue eventQueue,
Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory)
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with all dependencies.
/// </summary>
@@ -122,14 +162,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
/// <param name="workerProcessId">Worker process identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Worker ready message.</returns>
public Task<WorkerReady> StartAsync(
public async Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
staRuntime.Start();
return staRuntime.InvokeAsync(
WorkerReady ready = await staRuntime.InvokeAsync(
() =>
{
if (session is not null)
@@ -151,7 +191,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
return session.CreateWorkerReady(workerProcessId);
},
cancellationToken);
cancellationToken).ConfigureAwait(false);
if (alarmCommandHandler is not null)
{
alarmPollCts = new CancellationTokenSource();
alarmPollTask = RunAlarmPollLoopAsync(alarmCommandHandler, alarmPollCts.Token);
}
return ready;
}
private Task RunAlarmPollLoopAsync(
IAlarmCommandHandler handler,
CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(AlarmPollInterval, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
if (cancellationToken.IsCancellationRequested)
{
return;
}
try
{
await staRuntime.InvokeAsync(
() => handler.PollOnce(),
cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
catch (ObjectDisposedException)
{
// STA runtime or alarm handler disposed — stop the loop gracefully.
return;
}
catch (InvalidOperationException)
{
// STA runtime shutting down — stop the loop gracefully.
return;
}
}
}, CancellationToken.None);
}
/// <summary>
@@ -307,6 +401,30 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
commandDispatcher?.RequestShutdown();
// Cancel the STA poll loop before disposing the alarm handler.
// The loop references the alarm handler and must be stopped first
// so that no further PollOnce calls race with disposal.
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
Task? pollTaskToAwait = alarmPollTask;
alarmPollCts = null;
alarmPollTask = null;
if (pollCtsToDispose is not null)
{
pollCtsToDispose.Cancel();
if (pollTaskToAwait is not null)
{
try
{
await pollTaskToAwait.ConfigureAwait(false);
}
catch
{
// Swallow — poll loop cancellation must not block data shutdown.
}
}
pollCtsToDispose.Dispose();
}
// Stop the alarm consumer's polling timer and tear down the
// dispatcher BEFORE the data-side cleanup begins. The alarm
// consumer holds a wnwrap COM RCW that needs the STA pump to
@@ -382,6 +500,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
RequestShutdown();
// Cancel and discard the STA poll loop.
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
alarmPollCts = null;
alarmPollTask = null;
if (pollCtsToDispose is not null)
{
try { pollCtsToDispose.Cancel(); } catch { }
try { pollCtsToDispose.Dispose(); } catch { }
}
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
alarmCommandHandler = null;
if (alarmHandlerToDispose is not null)
@@ -63,8 +63,16 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
private bool subscribed;
private bool disposed;
/// <summary>
/// Production constructor — creates the wnwrap COM object on the
/// current thread (must be the worker's STA) and disables the
/// internal <see cref="Timer"/> (<c>pollIntervalMilliseconds=0</c>).
/// Polling is driven externally by the STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c> so
/// that every COM call stays on the STA that owns the apartment.
/// </summary>
public WnWrapAlarmConsumer()
: this(new wwAlarmConsumerClass(), DefaultPollIntervalMilliseconds, DefaultMaxAlarmsPerFetch)
: this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch)
{
}