diff --git a/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs b/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs index cd54467..2ed91c0 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs @@ -447,6 +447,13 @@ public sealed class AlarmCommandExecutorTests return QueryResult; } + public int PollCount { get; private set; } + + public void PollOnce() + { + PollCount++; + } + public void Dispose() { } } } diff --git a/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs b/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs index decd4b8..ebbaf6b 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs @@ -236,6 +236,13 @@ public sealed class AlarmCommandHandlerTests public IReadOnlyList SnapshotActiveAlarms() => SnapshotResult; + public int PollCount { get; private set; } + + public void PollOnce() + { + PollCount++; + } + public void Dispose() { Disposed = true; diff --git a/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs b/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs index 6b3e03d..f2f511e 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs @@ -318,6 +318,13 @@ public sealed class AlarmDispatcherTests return SnapshotResult; } + public int PollCount { get; private set; } + + public void PollOnce() + { + PollCount++; + } + public void Dispose() { Disposed = true; diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs index 1dedfb8..f08db01 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -180,6 +181,153 @@ public sealed class MxAccessStaSessionTests } } + /// + /// Gap 1: Verifies that when MxAccessStaSession is created with an alarm handler factory, + /// a SubscribeAlarms command dispatched through the session reaches the handler. + /// This proves the fix in WorkerPipeSession (and the new internal constructor) correctly + /// wires the factory rather than leaving alarmCommandHandler null. + /// + [Fact] + public async Task StartAsync_WithAlarmCommandHandlerFactory_SubscribeAlarmsCommandReachesHandler() + { + FakeAlarmCommandHandler handler = new(); + FakeMxAccessComObjectFactory factory = new(); + FakeMxAccessEventSink eventSink = new(); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new( + runtime, + factory, + eventSink, + new MxAccessEventQueue(), + _eq => handler); + + await session.StartAsync("session-1", workerProcessId: 1); + + StaCommand subscribeCommand = new StaCommand( + "session-1", + "corr-1", + new MxCommand + { + Kind = MxCommandKind.SubscribeAlarms, + SubscribeAlarms = new SubscribeAlarmsCommand + { + SubscriptionExpression = @"\\HOST\Galaxy!Area", + }, + }); + + MxCommandReply reply = await session.DispatchAsync(subscribeCommand); + + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.True(handler.IsSubscribed); + Assert.Equal(@"\\HOST\Galaxy!Area", handler.LastSubscription); + } + + /// + /// Gap 1: Verifies that when MxAccessStaSession is created with the default + /// parameterless constructor (no alarm factory), SubscribeAlarms returns + /// InvalidRequest with "alarm consumer not configured" diagnostic. + /// This validates the baseline before the fix. + /// + [Fact] + public async Task StartAsync_WithoutAlarmCommandHandlerFactory_SubscribeAlarmsReturnsInvalidRequest() + { + FakeMxAccessComObjectFactory factory = new(); + FakeMxAccessEventSink eventSink = new(); + using StaRuntime runtime = CreateRuntime(); + // Use the 4-arg (no factory) constructor — equivalent to the old MxAccessStaSession() + using MxAccessStaSession session = new(runtime, factory, eventSink); + + await session.StartAsync("session-1", workerProcessId: 1); + + StaCommand subscribeCommand = new StaCommand( + "session-1", + "corr-1", + new MxCommand + { + Kind = MxCommandKind.SubscribeAlarms, + SubscribeAlarms = new SubscribeAlarmsCommand + { + SubscriptionExpression = @"\\HOST\Galaxy!Area", + }, + }); + + MxCommandReply reply = await session.DispatchAsync(subscribeCommand); + + Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code); + Assert.Contains("alarm", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase); + } + + /// + /// Gap 2: Verifies that after StartAsync with an alarm handler factory, the STA poll + /// loop calls PollOnce on the handler via the STA within a reasonable timeout. + /// This proves polling is driven by the STA rather than the consumer's internal timer. + /// + [Fact] + public async Task StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta() + { + FakeAlarmCommandHandler handler = new(); + FakeMxAccessComObjectFactory factory = new(); + FakeMxAccessEventSink eventSink = new(); + using StaRuntime runtime = CreateRuntime(); + using MxAccessStaSession session = new( + runtime, + factory, + eventSink, + new MxAccessEventQueue(), + _eq => handler); + + await session.StartAsync("session-1", workerProcessId: 1); + + // Wait up to 3s for at least one PollOnce call from the STA poll loop. + using CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + while (handler.PollCount == 0 && !timeout.IsCancellationRequested) + { + await Task.Delay(50, CancellationToken.None); + } + + Assert.True(handler.PollCount > 0, + "Expected PollOnce to be called at least once by the STA poll loop within 3 seconds."); + Assert.NotNull(handler.LastPollThreadId); + Assert.Equal(runtime.StaThreadId, handler.LastPollThreadId); + } + + /// + /// Gap 2: Verifies that the STA poll loop stops when the session is disposed — + /// no further PollOnce calls after disposal. + /// + [Fact] + public async Task Dispose_StopsAlarmPollLoop() + { + FakeAlarmCommandHandler handler = new(); + FakeMxAccessComObjectFactory factory = new(); + FakeMxAccessEventSink eventSink = new(); + using StaRuntime runtime = CreateRuntime(); + MxAccessStaSession session = new( + runtime, + factory, + eventSink, + new MxAccessEventQueue(), + _eq => handler); + + await session.StartAsync("session-1", workerProcessId: 1); + + // Wait for at least one poll to occur, then dispose. + using CancellationTokenSource initTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + while (handler.PollCount == 0 && !initTimeout.IsCancellationRequested) + { + await Task.Delay(50, CancellationToken.None); + } + + Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose."); + + session.Dispose(); + int pollCountAtDispose = handler.PollCount; + + // Wait 1 second and verify no further polls occur. + await Task.Delay(1000); + Assert.Equal(pollCountAtDispose, handler.PollCount); + } + /// /// Noop STA COM apartment initializer for testing. /// @@ -199,4 +347,61 @@ public sealed class MxAccessStaSessionTests { } } + + /// + /// Fake alarm command handler that records calls and tracks poll thread. + /// + private sealed class FakeAlarmCommandHandler : IAlarmCommandHandler + { + private readonly object gate = new object(); + private int pollCount; + private int? lastPollThreadId; + + public bool IsSubscribed { get; private set; } + public string? LastSubscription { get; private set; } + + public int PollCount + { + get { lock (gate) return pollCount; } + } + + public int? LastPollThreadId + { + get { lock (gate) return lastPollThreadId; } + } + + public void Subscribe(string subscription, string sessionId) + { + IsSubscribed = true; + LastSubscription = subscription; + } + + public void Unsubscribe() + { + IsSubscribed = false; + } + + public int Acknowledge(Guid alarmGuid, string comment, string operatorUser, + string operatorNode, string operatorDomain, string operatorFullName) + => 0; + + public int AcknowledgeByName(string alarmName, string providerName, string groupName, + string comment, string operatorUser, string operatorNode, + string operatorDomain, string operatorFullName) + => 0; + + public IReadOnlyList QueryActive(string? alarmFilterPrefix) + => Array.Empty(); + + public void PollOnce() + { + lock (gate) + { + pollCount++; + lastPollThreadId = Thread.CurrentThread.ManagedThreadId; + } + } + + public void Dispose() { } + } } diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index caaa2b3..1acad56 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -48,7 +48,7 @@ public sealed class WorkerPipeSession options, () => Process.GetCurrentProcess().Id, new WorkerPipeSessionOptions(), - () => new MxAccessStaSession(), + () => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)), logger) { } @@ -69,7 +69,7 @@ public sealed class WorkerPipeSession options, processIdProvider, new WorkerPipeSessionOptions(), - () => new MxAccessStaSession(), + () => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)), logger: null) { } @@ -746,7 +746,7 @@ public sealed class WorkerPipeSession private async Task InitializeMxAccessAsync(CancellationToken cancellationToken) { - _runtimeSession = new MxAccessStaSession(); + _runtimeSession = new MxAccessStaSession(eq => new AlarmCommandHandler(eq)); try { return await _runtimeSession diff --git a/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs b/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs index 7867de8..e8b3236 100644 --- a/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs +++ b/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs @@ -160,6 +160,15 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler return filtered; } + /// + public void PollOnce() + { + AlarmDispatcher? d; + lock (syncRoot) d = dispatcher; + // No-op when not yet subscribed or already disposed. + d?.PollOnce(); + } + private AlarmDispatcher GetDispatcherOrThrow() { if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler)); @@ -226,4 +235,14 @@ public interface IAlarmCommandHandler : IDisposable /// prefix matched against AlarmFullReference. /// IReadOnlyList QueryActive(string? alarmFilterPrefix); + + /// + /// Drives a single poll of the underlying alarm consumer on the + /// caller's thread. This is a no-op when there is no active + /// subscription. In production the caller is the worker's STA + /// (marshalled via StaRuntime.InvokeAsync), which satisfies + /// the ThreadingModel=Apartment requirement of + /// wwAlarmConsumerClass. + /// + void PollOnce(); } diff --git a/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs index cc4e617..a70272b 100644 --- a/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs +++ b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs @@ -119,6 +119,17 @@ public sealed class AlarmDispatcher : IDisposable ackOperatorFullName); } + /// + /// Drives a single synchronous poll of the underlying consumer. + /// Must be called on the STA thread that owns the wnwrap COM object. + /// No-op if the dispatcher has been disposed. + /// + public void PollOnce() + { + if (disposed) return; + consumer.PollOnce(); + } + /// /// Snapshot the currently-active alarm set as /// protos for the diff --git a/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs b/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs index c2973bd..1a9a97d 100644 --- a/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs +++ b/src/MxGateway.Worker/MxAccess/IMxAccessAlarmConsumer.cs @@ -85,4 +85,17 @@ public interface IMxAccessAlarmConsumer : IDisposable /// to seed local Part 9 state. /// IReadOnlyList SnapshotActiveAlarms(); + + /// + /// Drives a single synchronous poll of the underlying alarm source. + /// Implementations that use an internal + /// are constructed with pollIntervalMilliseconds=0 in production so + /// the timer is disabled; the worker's STA drives polls via + /// StaRuntime.InvokeAsync instead, satisfying the + /// ThreadingModel=Apartment requirement of + /// wwAlarmConsumerClass. Fake implementations should no-op. + /// This method must be invoked on the thread that created the consumer + /// (the worker's STA in production). + /// + void PollOnce(); } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index 4b223a9..dd56607 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -11,6 +11,8 @@ namespace MxGateway.Worker.MxAccess; public sealed class MxAccessStaSession : IWorkerRuntimeSession { + private static readonly TimeSpan AlarmPollInterval = TimeSpan.FromMilliseconds(500); + private readonly IMxAccessComObjectFactory factory; private readonly IMxAccessEventSink eventSink; private readonly MxAccessEventQueue eventQueue; @@ -19,6 +21,8 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession private StaCommandDispatcher? commandDispatcher; private MxAccessSession? session; private IAlarmCommandHandler? alarmCommandHandler; + private CancellationTokenSource? alarmPollCts; + private Task? alarmPollTask; private bool disposed; /// @@ -32,6 +36,22 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession { } + /// + /// Initializes a new instance of with default STA runtime, + /// factory, and event queue, but with a custom alarm-command handler factory. The factory is + /// invoked on the STA thread during + /// ; pass null to opt out + /// of alarm-side commands. + /// + internal MxAccessStaSession(Func? alarmCommandHandlerFactory) + : this( + new StaRuntime(), + new MxAccessComObjectFactory(), + new MxAccessEventQueue(), + alarmCommandHandlerFactory) + { + } + /// /// Initializes a new instance of with custom STA runtime and factory. /// @@ -60,6 +80,26 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession { } + /// + /// Initializes a new instance of with custom event queue + /// and an alarm-command handler factory. + /// + /// STA thread runtime. + /// MXAccess COM object factory. + /// Event queue for buffering MXAccess events. + /// + /// Factory that constructs the alarm-command handler from the event queue. + /// Pass null to opt out of alarm-side commands. + /// + public MxAccessStaSession( + StaRuntime staRuntime, + IMxAccessComObjectFactory factory, + MxAccessEventQueue eventQueue, + Func? alarmCommandHandlerFactory) + : this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory) + { + } + /// /// Initializes a new instance of with all dependencies. /// @@ -122,14 +162,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession /// Worker process identifier. /// Cancellation token. /// Worker ready message. - public Task StartAsync( + public async Task StartAsync( string sessionId, int workerProcessId, CancellationToken cancellationToken = default) { staRuntime.Start(); - return staRuntime.InvokeAsync( + WorkerReady ready = await staRuntime.InvokeAsync( () => { if (session is not null) @@ -151,7 +191,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession return session.CreateWorkerReady(workerProcessId); }, - cancellationToken); + cancellationToken).ConfigureAwait(false); + + if (alarmCommandHandler is not null) + { + alarmPollCts = new CancellationTokenSource(); + alarmPollTask = RunAlarmPollLoopAsync(alarmCommandHandler, alarmPollCts.Token); + } + + return ready; + } + + private Task RunAlarmPollLoopAsync( + IAlarmCommandHandler handler, + CancellationToken cancellationToken) + { + return Task.Run(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await Task.Delay(AlarmPollInterval, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + + if (cancellationToken.IsCancellationRequested) + { + return; + } + + try + { + await staRuntime.InvokeAsync( + () => handler.PollOnce(), + cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + catch (ObjectDisposedException) + { + // STA runtime or alarm handler disposed — stop the loop gracefully. + return; + } + catch (InvalidOperationException) + { + // STA runtime shutting down — stop the loop gracefully. + return; + } + } + }, CancellationToken.None); } /// @@ -307,6 +401,30 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession commandDispatcher?.RequestShutdown(); + // Cancel the STA poll loop before disposing the alarm handler. + // The loop references the alarm handler and must be stopped first + // so that no further PollOnce calls race with disposal. + CancellationTokenSource? pollCtsToDispose = alarmPollCts; + Task? pollTaskToAwait = alarmPollTask; + alarmPollCts = null; + alarmPollTask = null; + if (pollCtsToDispose is not null) + { + pollCtsToDispose.Cancel(); + if (pollTaskToAwait is not null) + { + try + { + await pollTaskToAwait.ConfigureAwait(false); + } + catch + { + // Swallow — poll loop cancellation must not block data shutdown. + } + } + pollCtsToDispose.Dispose(); + } + // Stop the alarm consumer's polling timer and tear down the // dispatcher BEFORE the data-side cleanup begins. The alarm // consumer holds a wnwrap COM RCW that needs the STA pump to @@ -382,6 +500,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession RequestShutdown(); + // Cancel and discard the STA poll loop. + CancellationTokenSource? pollCtsToDispose = alarmPollCts; + alarmPollCts = null; + alarmPollTask = null; + if (pollCtsToDispose is not null) + { + try { pollCtsToDispose.Cancel(); } catch { } + try { pollCtsToDispose.Dispose(); } catch { } + } + IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler; alarmCommandHandler = null; if (alarmHandlerToDispose is not null) diff --git a/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs b/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs index 06dc39a..027108a 100644 --- a/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs +++ b/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs @@ -63,8 +63,16 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer private bool subscribed; private bool disposed; + /// + /// Production constructor — creates the wnwrap COM object on the + /// current thread (must be the worker's STA) and disables the + /// internal (pollIntervalMilliseconds=0). + /// Polling is driven externally by the STA via + /// StaRuntime.InvokeAsync(() => consumer.PollOnce()) so + /// that every COM call stays on the STA that owns the apartment. + /// public WnWrapAlarmConsumer() - : this(new wwAlarmConsumerClass(), DefaultPollIntervalMilliseconds, DefaultMaxAlarmsPerFetch) + : this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch) { }