Merge branch 'fix/alarm-sta-wiring'

Close the two alarm production gaps: wire alarmCommandHandlerFactory in
WorkerPipeSession, and drive WnWrapAlarmConsumer.PollOnce from the STA
instead of a threadpool timer.
This commit is contained in:
Joseph Doherty
2026-05-18 06:31:05 -04:00
10 changed files with 412 additions and 7 deletions
@@ -447,6 +447,13 @@ public sealed class AlarmCommandExecutorTests
return QueryResult;
}
public int PollCount { get; private set; }
public void PollOnce()
{
PollCount++;
}
public void Dispose() { }
}
}
@@ -236,6 +236,13 @@ public sealed class AlarmCommandHandlerTests
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms() => SnapshotResult;
public int PollCount { get; private set; }
public void PollOnce()
{
PollCount++;
}
public void Dispose()
{
Disposed = true;
@@ -318,6 +318,13 @@ public sealed class AlarmDispatcherTests
return SnapshotResult;
}
public int PollCount { get; private set; }
public void PollOnce()
{
PollCount++;
}
public void Dispose()
{
Disposed = true;
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -180,6 +181,153 @@ public sealed class MxAccessStaSessionTests
}
}
/// <summary>
/// Gap 1: Verifies that when MxAccessStaSession is created with an alarm handler factory,
/// a SubscribeAlarms command dispatched through the session reaches the handler.
/// This proves the fix in WorkerPipeSession (and the new internal constructor) correctly
/// wires the factory rather than leaving alarmCommandHandler null.
/// </summary>
[Fact]
public async Task StartAsync_WithAlarmCommandHandlerFactory_SubscribeAlarmsCommandReachesHandler()
{
FakeAlarmCommandHandler handler = new();
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(
runtime,
factory,
eventSink,
new MxAccessEventQueue(),
_eq => handler);
await session.StartAsync("session-1", workerProcessId: 1);
StaCommand subscribeCommand = new StaCommand(
"session-1",
"corr-1",
new MxCommand
{
Kind = MxCommandKind.SubscribeAlarms,
SubscribeAlarms = new SubscribeAlarmsCommand
{
SubscriptionExpression = @"\\HOST\Galaxy!Area",
},
});
MxCommandReply reply = await session.DispatchAsync(subscribeCommand);
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.True(handler.IsSubscribed);
Assert.Equal(@"\\HOST\Galaxy!Area", handler.LastSubscription);
}
/// <summary>
/// Gap 1: Verifies that when MxAccessStaSession is created with the default
/// parameterless constructor (no alarm factory), SubscribeAlarms returns
/// InvalidRequest with "alarm consumer not configured" diagnostic.
/// This validates the baseline before the fix.
/// </summary>
[Fact]
public async Task StartAsync_WithoutAlarmCommandHandlerFactory_SubscribeAlarmsReturnsInvalidRequest()
{
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
// Use the 4-arg (no factory) constructor — equivalent to the old MxAccessStaSession()
using MxAccessStaSession session = new(runtime, factory, eventSink);
await session.StartAsync("session-1", workerProcessId: 1);
StaCommand subscribeCommand = new StaCommand(
"session-1",
"corr-1",
new MxCommand
{
Kind = MxCommandKind.SubscribeAlarms,
SubscribeAlarms = new SubscribeAlarmsCommand
{
SubscriptionExpression = @"\\HOST\Galaxy!Area",
},
});
MxCommandReply reply = await session.DispatchAsync(subscribeCommand);
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
Assert.Contains("alarm", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
}
/// <summary>
/// Gap 2: Verifies that after StartAsync with an alarm handler factory, the STA poll
/// loop calls PollOnce on the handler via the STA within a reasonable timeout.
/// This proves polling is driven by the STA rather than the consumer's internal timer.
/// </summary>
[Fact]
public async Task StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta()
{
FakeAlarmCommandHandler handler = new();
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(
runtime,
factory,
eventSink,
new MxAccessEventQueue(),
_eq => handler);
await session.StartAsync("session-1", workerProcessId: 1);
// Wait up to 3s for at least one PollOnce call from the STA poll loop.
using CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (handler.PollCount == 0 && !timeout.IsCancellationRequested)
{
await Task.Delay(50, CancellationToken.None);
}
Assert.True(handler.PollCount > 0,
"Expected PollOnce to be called at least once by the STA poll loop within 3 seconds.");
Assert.NotNull(handler.LastPollThreadId);
Assert.Equal(runtime.StaThreadId, handler.LastPollThreadId);
}
/// <summary>
/// Gap 2: Verifies that the STA poll loop stops when the session is disposed —
/// no further PollOnce calls after disposal.
/// </summary>
[Fact]
public async Task Dispose_StopsAlarmPollLoop()
{
FakeAlarmCommandHandler handler = new();
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
MxAccessStaSession session = new(
runtime,
factory,
eventSink,
new MxAccessEventQueue(),
_eq => handler);
await session.StartAsync("session-1", workerProcessId: 1);
// Wait for at least one poll to occur, then dispose.
using CancellationTokenSource initTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (handler.PollCount == 0 && !initTimeout.IsCancellationRequested)
{
await Task.Delay(50, CancellationToken.None);
}
Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose.");
session.Dispose();
int pollCountAtDispose = handler.PollCount;
// Wait 1 second and verify no further polls occur.
await Task.Delay(1000);
Assert.Equal(pollCountAtDispose, handler.PollCount);
}
/// <summary>
/// Noop STA COM apartment initializer for testing.
/// </summary>
@@ -199,4 +347,61 @@ public sealed class MxAccessStaSessionTests
{
}
}
/// <summary>
/// Fake alarm command handler that records calls and tracks poll thread.
/// </summary>
private sealed class FakeAlarmCommandHandler : IAlarmCommandHandler
{
private readonly object gate = new object();
private int pollCount;
private int? lastPollThreadId;
public bool IsSubscribed { get; private set; }
public string? LastSubscription { get; private set; }
public int PollCount
{
get { lock (gate) return pollCount; }
}
public int? LastPollThreadId
{
get { lock (gate) return lastPollThreadId; }
}
public void Subscribe(string subscription, string sessionId)
{
IsSubscribed = true;
LastSubscription = subscription;
}
public void Unsubscribe()
{
IsSubscribed = false;
}
public int Acknowledge(Guid alarmGuid, string comment, string operatorUser,
string operatorNode, string operatorDomain, string operatorFullName)
=> 0;
public int AcknowledgeByName(string alarmName, string providerName, string groupName,
string comment, string operatorUser, string operatorNode,
string operatorDomain, string operatorFullName)
=> 0;
public IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix)
=> Array.Empty<ActiveAlarmSnapshot>();
public void PollOnce()
{
lock (gate)
{
pollCount++;
lastPollThreadId = Thread.CurrentThread.ManagedThreadId;
}
}
public void Dispose() { }
}
}
@@ -48,7 +48,7 @@ public sealed class WorkerPipeSession
options,
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
logger)
{
}
@@ -69,7 +69,7 @@ public sealed class WorkerPipeSession
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
logger: null)
{
}
@@ -746,7 +746,7 @@ public sealed class WorkerPipeSession
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_runtimeSession = new MxAccessStaSession();
_runtimeSession = new MxAccessStaSession(eq => new AlarmCommandHandler(eq));
try
{
return await _runtimeSession
@@ -160,6 +160,15 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
return filtered;
}
/// <inheritdoc />
public void PollOnce()
{
AlarmDispatcher? d;
lock (syncRoot) d = dispatcher;
// No-op when not yet subscribed or already disposed.
d?.PollOnce();
}
private AlarmDispatcher GetDispatcherOrThrow()
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
@@ -226,4 +235,14 @@ public interface IAlarmCommandHandler : IDisposable
/// prefix matched against <c>AlarmFullReference</c>.
/// </summary>
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
/// <summary>
/// Drives a single poll of the underlying alarm consumer on the
/// caller's thread. This is a no-op when there is no active
/// subscription. In production the caller is the worker's STA
/// (marshalled via <c>StaRuntime.InvokeAsync</c>), which satisfies
/// the <c>ThreadingModel=Apartment</c> requirement of
/// <c>wwAlarmConsumerClass</c>.
/// </summary>
void PollOnce();
}
@@ -119,6 +119,17 @@ public sealed class AlarmDispatcher : IDisposable
ackOperatorFullName);
}
/// <summary>
/// Drives a single synchronous poll of the underlying consumer.
/// Must be called on the STA thread that owns the wnwrap COM object.
/// No-op if the dispatcher has been disposed.
/// </summary>
public void PollOnce()
{
if (disposed) return;
consumer.PollOnce();
}
/// <summary>
/// Snapshot the currently-active alarm set as
/// <see cref="ActiveAlarmSnapshot"/> protos for the
@@ -85,4 +85,17 @@ public interface IMxAccessAlarmConsumer : IDisposable
/// to seed local Part 9 state.
/// </summary>
IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms();
/// <summary>
/// Drives a single synchronous poll of the underlying alarm source.
/// Implementations that use an internal <see cref="System.Threading.Timer"/>
/// are constructed with <c>pollIntervalMilliseconds=0</c> in production so
/// the timer is disabled; the worker's STA drives polls via
/// <c>StaRuntime.InvokeAsync</c> instead, satisfying the
/// <c>ThreadingModel=Apartment</c> requirement of
/// <c>wwAlarmConsumerClass</c>. Fake implementations should no-op.
/// This method must be invoked on the thread that created the consumer
/// (the worker's STA in production).
/// </summary>
void PollOnce();
}
@@ -11,6 +11,8 @@ namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
private static readonly TimeSpan AlarmPollInterval = TimeSpan.FromMilliseconds(500);
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly MxAccessEventQueue eventQueue;
@@ -19,6 +21,8 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
private StaCommandDispatcher? commandDispatcher;
private MxAccessSession? session;
private IAlarmCommandHandler? alarmCommandHandler;
private CancellationTokenSource? alarmPollCts;
private Task? alarmPollTask;
private bool disposed;
/// <summary>
@@ -32,6 +36,22 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with default STA runtime,
/// factory, and event queue, but with a custom alarm-command handler factory. The factory is
/// invoked on the STA thread during
/// <see cref="StartAsync(string, int, CancellationToken)"/>; pass <c>null</c> to opt out
/// of alarm-side commands.
/// </summary>
internal MxAccessStaSession(Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessEventQueue(),
alarmCommandHandlerFactory)
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom STA runtime and factory.
/// </summary>
@@ -60,6 +80,26 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom event queue
/// and an alarm-command handler factory.
/// </summary>
/// <param name="staRuntime">STA thread runtime.</param>
/// <param name="factory">MXAccess COM object factory.</param>
/// <param name="eventQueue">Event queue for buffering MXAccess events.</param>
/// <param name="alarmCommandHandlerFactory">
/// Factory that constructs the alarm-command handler from the event queue.
/// Pass <c>null</c> to opt out of alarm-side commands.
/// </param>
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
MxAccessEventQueue eventQueue,
Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory)
{
}
/// <summary>
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with all dependencies.
/// </summary>
@@ -122,14 +162,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
/// <param name="workerProcessId">Worker process identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Worker ready message.</returns>
public Task<WorkerReady> StartAsync(
public async Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
staRuntime.Start();
return staRuntime.InvokeAsync(
WorkerReady ready = await staRuntime.InvokeAsync(
() =>
{
if (session is not null)
@@ -151,7 +191,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
return session.CreateWorkerReady(workerProcessId);
},
cancellationToken);
cancellationToken).ConfigureAwait(false);
if (alarmCommandHandler is not null)
{
alarmPollCts = new CancellationTokenSource();
alarmPollTask = RunAlarmPollLoopAsync(alarmCommandHandler, alarmPollCts.Token);
}
return ready;
}
private Task RunAlarmPollLoopAsync(
IAlarmCommandHandler handler,
CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(AlarmPollInterval, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
if (cancellationToken.IsCancellationRequested)
{
return;
}
try
{
await staRuntime.InvokeAsync(
() => handler.PollOnce(),
cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
catch (ObjectDisposedException)
{
// STA runtime or alarm handler disposed — stop the loop gracefully.
return;
}
catch (InvalidOperationException)
{
// STA runtime shutting down — stop the loop gracefully.
return;
}
}
}, CancellationToken.None);
}
/// <summary>
@@ -307,6 +401,30 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
commandDispatcher?.RequestShutdown();
// Cancel the STA poll loop before disposing the alarm handler.
// The loop references the alarm handler and must be stopped first
// so that no further PollOnce calls race with disposal.
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
Task? pollTaskToAwait = alarmPollTask;
alarmPollCts = null;
alarmPollTask = null;
if (pollCtsToDispose is not null)
{
pollCtsToDispose.Cancel();
if (pollTaskToAwait is not null)
{
try
{
await pollTaskToAwait.ConfigureAwait(false);
}
catch
{
// Swallow — poll loop cancellation must not block data shutdown.
}
}
pollCtsToDispose.Dispose();
}
// Stop the alarm consumer's polling timer and tear down the
// dispatcher BEFORE the data-side cleanup begins. The alarm
// consumer holds a wnwrap COM RCW that needs the STA pump to
@@ -382,6 +500,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
RequestShutdown();
// Cancel and discard the STA poll loop.
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
alarmPollCts = null;
alarmPollTask = null;
if (pollCtsToDispose is not null)
{
try { pollCtsToDispose.Cancel(); } catch { }
try { pollCtsToDispose.Dispose(); } catch { }
}
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
alarmCommandHandler = null;
if (alarmHandlerToDispose is not null)
@@ -63,8 +63,16 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
private bool subscribed;
private bool disposed;
/// <summary>
/// Production constructor — creates the wnwrap COM object on the
/// current thread (must be the worker's STA) and disables the
/// internal <see cref="Timer"/> (<c>pollIntervalMilliseconds=0</c>).
/// Polling is driven externally by the STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c> so
/// that every COM call stays on the STA that owns the apartment.
/// </summary>
public WnWrapAlarmConsumer()
: this(new wwAlarmConsumerClass(), DefaultPollIntervalMilliseconds, DefaultMaxAlarmsPerFetch)
: this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch)
{
}