Merge branch 'fix/alarm-sta-wiring'
Close the two alarm production gaps: wire alarmCommandHandlerFactory in WorkerPipeSession, and drive WnWrapAlarmConsumer.PollOnce from the STA instead of a threadpool timer.
This commit is contained in:
@@ -447,6 +447,13 @@ public sealed class AlarmCommandExecutorTests
|
|||||||
return QueryResult;
|
return QueryResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int PollCount { get; private set; }
|
||||||
|
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
PollCount++;
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose() { }
|
public void Dispose() { }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -236,6 +236,13 @@ public sealed class AlarmCommandHandlerTests
|
|||||||
|
|
||||||
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms() => SnapshotResult;
|
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms() => SnapshotResult;
|
||||||
|
|
||||||
|
public int PollCount { get; private set; }
|
||||||
|
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
PollCount++;
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Disposed = true;
|
Disposed = true;
|
||||||
|
|||||||
@@ -318,6 +318,13 @@ public sealed class AlarmDispatcherTests
|
|||||||
return SnapshotResult;
|
return SnapshotResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int PollCount { get; private set; }
|
||||||
|
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
PollCount++;
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Disposed = true;
|
Disposed = true;
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Runtime.InteropServices;
|
using System.Runtime.InteropServices;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@@ -180,6 +181,153 @@ public sealed class MxAccessStaSessionTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gap 1: Verifies that when MxAccessStaSession is created with an alarm handler factory,
|
||||||
|
/// a SubscribeAlarms command dispatched through the session reaches the handler.
|
||||||
|
/// This proves the fix in WorkerPipeSession (and the new internal constructor) correctly
|
||||||
|
/// wires the factory rather than leaving alarmCommandHandler null.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_WithAlarmCommandHandlerFactory_SubscribeAlarmsCommandReachesHandler()
|
||||||
|
{
|
||||||
|
FakeAlarmCommandHandler handler = new();
|
||||||
|
FakeMxAccessComObjectFactory factory = new();
|
||||||
|
FakeMxAccessEventSink eventSink = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime();
|
||||||
|
using MxAccessStaSession session = new(
|
||||||
|
runtime,
|
||||||
|
factory,
|
||||||
|
eventSink,
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
_eq => handler);
|
||||||
|
|
||||||
|
await session.StartAsync("session-1", workerProcessId: 1);
|
||||||
|
|
||||||
|
StaCommand subscribeCommand = new StaCommand(
|
||||||
|
"session-1",
|
||||||
|
"corr-1",
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.SubscribeAlarms,
|
||||||
|
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||||
|
{
|
||||||
|
SubscriptionExpression = @"\\HOST\Galaxy!Area",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = await session.DispatchAsync(subscribeCommand);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.True(handler.IsSubscribed);
|
||||||
|
Assert.Equal(@"\\HOST\Galaxy!Area", handler.LastSubscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gap 1: Verifies that when MxAccessStaSession is created with the default
|
||||||
|
/// parameterless constructor (no alarm factory), SubscribeAlarms returns
|
||||||
|
/// InvalidRequest with "alarm consumer not configured" diagnostic.
|
||||||
|
/// This validates the baseline before the fix.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_WithoutAlarmCommandHandlerFactory_SubscribeAlarmsReturnsInvalidRequest()
|
||||||
|
{
|
||||||
|
FakeMxAccessComObjectFactory factory = new();
|
||||||
|
FakeMxAccessEventSink eventSink = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime();
|
||||||
|
// Use the 4-arg (no factory) constructor — equivalent to the old MxAccessStaSession()
|
||||||
|
using MxAccessStaSession session = new(runtime, factory, eventSink);
|
||||||
|
|
||||||
|
await session.StartAsync("session-1", workerProcessId: 1);
|
||||||
|
|
||||||
|
StaCommand subscribeCommand = new StaCommand(
|
||||||
|
"session-1",
|
||||||
|
"corr-1",
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.SubscribeAlarms,
|
||||||
|
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||||
|
{
|
||||||
|
SubscriptionExpression = @"\\HOST\Galaxy!Area",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = await session.DispatchAsync(subscribeCommand);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Contains("alarm", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gap 2: Verifies that after StartAsync with an alarm handler factory, the STA poll
|
||||||
|
/// loop calls PollOnce on the handler via the STA within a reasonable timeout.
|
||||||
|
/// This proves polling is driven by the STA rather than the consumer's internal timer.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta()
|
||||||
|
{
|
||||||
|
FakeAlarmCommandHandler handler = new();
|
||||||
|
FakeMxAccessComObjectFactory factory = new();
|
||||||
|
FakeMxAccessEventSink eventSink = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime();
|
||||||
|
using MxAccessStaSession session = new(
|
||||||
|
runtime,
|
||||||
|
factory,
|
||||||
|
eventSink,
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
_eq => handler);
|
||||||
|
|
||||||
|
await session.StartAsync("session-1", workerProcessId: 1);
|
||||||
|
|
||||||
|
// Wait up to 3s for at least one PollOnce call from the STA poll loop.
|
||||||
|
using CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||||||
|
while (handler.PollCount == 0 && !timeout.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
await Task.Delay(50, CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.True(handler.PollCount > 0,
|
||||||
|
"Expected PollOnce to be called at least once by the STA poll loop within 3 seconds.");
|
||||||
|
Assert.NotNull(handler.LastPollThreadId);
|
||||||
|
Assert.Equal(runtime.StaThreadId, handler.LastPollThreadId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gap 2: Verifies that the STA poll loop stops when the session is disposed —
|
||||||
|
/// no further PollOnce calls after disposal.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Dispose_StopsAlarmPollLoop()
|
||||||
|
{
|
||||||
|
FakeAlarmCommandHandler handler = new();
|
||||||
|
FakeMxAccessComObjectFactory factory = new();
|
||||||
|
FakeMxAccessEventSink eventSink = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime();
|
||||||
|
MxAccessStaSession session = new(
|
||||||
|
runtime,
|
||||||
|
factory,
|
||||||
|
eventSink,
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
_eq => handler);
|
||||||
|
|
||||||
|
await session.StartAsync("session-1", workerProcessId: 1);
|
||||||
|
|
||||||
|
// Wait for at least one poll to occur, then dispose.
|
||||||
|
using CancellationTokenSource initTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||||||
|
while (handler.PollCount == 0 && !initTimeout.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
await Task.Delay(50, CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose.");
|
||||||
|
|
||||||
|
session.Dispose();
|
||||||
|
int pollCountAtDispose = handler.PollCount;
|
||||||
|
|
||||||
|
// Wait 1 second and verify no further polls occur.
|
||||||
|
await Task.Delay(1000);
|
||||||
|
Assert.Equal(pollCountAtDispose, handler.PollCount);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Noop STA COM apartment initializer for testing.
|
/// Noop STA COM apartment initializer for testing.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -199,4 +347,61 @@ public sealed class MxAccessStaSessionTests
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fake alarm command handler that records calls and tracks poll thread.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class FakeAlarmCommandHandler : IAlarmCommandHandler
|
||||||
|
{
|
||||||
|
private readonly object gate = new object();
|
||||||
|
private int pollCount;
|
||||||
|
private int? lastPollThreadId;
|
||||||
|
|
||||||
|
public bool IsSubscribed { get; private set; }
|
||||||
|
public string? LastSubscription { get; private set; }
|
||||||
|
|
||||||
|
public int PollCount
|
||||||
|
{
|
||||||
|
get { lock (gate) return pollCount; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public int? LastPollThreadId
|
||||||
|
{
|
||||||
|
get { lock (gate) return lastPollThreadId; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Subscribe(string subscription, string sessionId)
|
||||||
|
{
|
||||||
|
IsSubscribed = true;
|
||||||
|
LastSubscription = subscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Unsubscribe()
|
||||||
|
{
|
||||||
|
IsSubscribed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int Acknowledge(Guid alarmGuid, string comment, string operatorUser,
|
||||||
|
string operatorNode, string operatorDomain, string operatorFullName)
|
||||||
|
=> 0;
|
||||||
|
|
||||||
|
public int AcknowledgeByName(string alarmName, string providerName, string groupName,
|
||||||
|
string comment, string operatorUser, string operatorNode,
|
||||||
|
string operatorDomain, string operatorFullName)
|
||||||
|
=> 0;
|
||||||
|
|
||||||
|
public IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix)
|
||||||
|
=> Array.Empty<ActiveAlarmSnapshot>();
|
||||||
|
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
lock (gate)
|
||||||
|
{
|
||||||
|
pollCount++;
|
||||||
|
lastPollThreadId = Thread.CurrentThread.ManagedThreadId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ public sealed class WorkerPipeSession
|
|||||||
options,
|
options,
|
||||||
() => Process.GetCurrentProcess().Id,
|
() => Process.GetCurrentProcess().Id,
|
||||||
new WorkerPipeSessionOptions(),
|
new WorkerPipeSessionOptions(),
|
||||||
() => new MxAccessStaSession(),
|
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
|
||||||
logger)
|
logger)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -69,7 +69,7 @@ public sealed class WorkerPipeSession
|
|||||||
options,
|
options,
|
||||||
processIdProvider,
|
processIdProvider,
|
||||||
new WorkerPipeSessionOptions(),
|
new WorkerPipeSessionOptions(),
|
||||||
() => new MxAccessStaSession(),
|
() => new MxAccessStaSession(eq => new AlarmCommandHandler(eq)),
|
||||||
logger: null)
|
logger: null)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -746,7 +746,7 @@ public sealed class WorkerPipeSession
|
|||||||
|
|
||||||
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
|
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_runtimeSession = new MxAccessStaSession();
|
_runtimeSession = new MxAccessStaSession(eq => new AlarmCommandHandler(eq));
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return await _runtimeSession
|
return await _runtimeSession
|
||||||
|
|||||||
@@ -160,6 +160,15 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
|
|||||||
return filtered;
|
return filtered;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
AlarmDispatcher? d;
|
||||||
|
lock (syncRoot) d = dispatcher;
|
||||||
|
// No-op when not yet subscribed or already disposed.
|
||||||
|
d?.PollOnce();
|
||||||
|
}
|
||||||
|
|
||||||
private AlarmDispatcher GetDispatcherOrThrow()
|
private AlarmDispatcher GetDispatcherOrThrow()
|
||||||
{
|
{
|
||||||
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
|
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
|
||||||
@@ -226,4 +235,14 @@ public interface IAlarmCommandHandler : IDisposable
|
|||||||
/// prefix matched against <c>AlarmFullReference</c>.
|
/// prefix matched against <c>AlarmFullReference</c>.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
|
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drives a single poll of the underlying alarm consumer on the
|
||||||
|
/// caller's thread. This is a no-op when there is no active
|
||||||
|
/// subscription. In production the caller is the worker's STA
|
||||||
|
/// (marshalled via <c>StaRuntime.InvokeAsync</c>), which satisfies
|
||||||
|
/// the <c>ThreadingModel=Apartment</c> requirement of
|
||||||
|
/// <c>wwAlarmConsumerClass</c>.
|
||||||
|
/// </summary>
|
||||||
|
void PollOnce();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -119,6 +119,17 @@ public sealed class AlarmDispatcher : IDisposable
|
|||||||
ackOperatorFullName);
|
ackOperatorFullName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drives a single synchronous poll of the underlying consumer.
|
||||||
|
/// Must be called on the STA thread that owns the wnwrap COM object.
|
||||||
|
/// No-op if the dispatcher has been disposed.
|
||||||
|
/// </summary>
|
||||||
|
public void PollOnce()
|
||||||
|
{
|
||||||
|
if (disposed) return;
|
||||||
|
consumer.PollOnce();
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Snapshot the currently-active alarm set as
|
/// Snapshot the currently-active alarm set as
|
||||||
/// <see cref="ActiveAlarmSnapshot"/> protos for the
|
/// <see cref="ActiveAlarmSnapshot"/> protos for the
|
||||||
|
|||||||
@@ -85,4 +85,17 @@ public interface IMxAccessAlarmConsumer : IDisposable
|
|||||||
/// to seed local Part 9 state.
|
/// to seed local Part 9 state.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms();
|
IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drives a single synchronous poll of the underlying alarm source.
|
||||||
|
/// Implementations that use an internal <see cref="System.Threading.Timer"/>
|
||||||
|
/// are constructed with <c>pollIntervalMilliseconds=0</c> in production so
|
||||||
|
/// the timer is disabled; the worker's STA drives polls via
|
||||||
|
/// <c>StaRuntime.InvokeAsync</c> instead, satisfying the
|
||||||
|
/// <c>ThreadingModel=Apartment</c> requirement of
|
||||||
|
/// <c>wwAlarmConsumerClass</c>. Fake implementations should no-op.
|
||||||
|
/// This method must be invoked on the thread that created the consumer
|
||||||
|
/// (the worker's STA in production).
|
||||||
|
/// </summary>
|
||||||
|
void PollOnce();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ namespace MxGateway.Worker.MxAccess;
|
|||||||
|
|
||||||
public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
||||||
{
|
{
|
||||||
|
private static readonly TimeSpan AlarmPollInterval = TimeSpan.FromMilliseconds(500);
|
||||||
|
|
||||||
private readonly IMxAccessComObjectFactory factory;
|
private readonly IMxAccessComObjectFactory factory;
|
||||||
private readonly IMxAccessEventSink eventSink;
|
private readonly IMxAccessEventSink eventSink;
|
||||||
private readonly MxAccessEventQueue eventQueue;
|
private readonly MxAccessEventQueue eventQueue;
|
||||||
@@ -19,6 +21,8 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
private StaCommandDispatcher? commandDispatcher;
|
private StaCommandDispatcher? commandDispatcher;
|
||||||
private MxAccessSession? session;
|
private MxAccessSession? session;
|
||||||
private IAlarmCommandHandler? alarmCommandHandler;
|
private IAlarmCommandHandler? alarmCommandHandler;
|
||||||
|
private CancellationTokenSource? alarmPollCts;
|
||||||
|
private Task? alarmPollTask;
|
||||||
private bool disposed;
|
private bool disposed;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -32,6 +36,22 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with default STA runtime,
|
||||||
|
/// factory, and event queue, but with a custom alarm-command handler factory. The factory is
|
||||||
|
/// invoked on the STA thread during
|
||||||
|
/// <see cref="StartAsync(string, int, CancellationToken)"/>; pass <c>null</c> to opt out
|
||||||
|
/// of alarm-side commands.
|
||||||
|
/// </summary>
|
||||||
|
internal MxAccessStaSession(Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
|
||||||
|
: this(
|
||||||
|
new StaRuntime(),
|
||||||
|
new MxAccessComObjectFactory(),
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
alarmCommandHandlerFactory)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom STA runtime and factory.
|
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom STA runtime and factory.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -60,6 +80,26 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with custom event queue
|
||||||
|
/// and an alarm-command handler factory.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="staRuntime">STA thread runtime.</param>
|
||||||
|
/// <param name="factory">MXAccess COM object factory.</param>
|
||||||
|
/// <param name="eventQueue">Event queue for buffering MXAccess events.</param>
|
||||||
|
/// <param name="alarmCommandHandlerFactory">
|
||||||
|
/// Factory that constructs the alarm-command handler from the event queue.
|
||||||
|
/// Pass <c>null</c> to opt out of alarm-side commands.
|
||||||
|
/// </param>
|
||||||
|
public MxAccessStaSession(
|
||||||
|
StaRuntime staRuntime,
|
||||||
|
IMxAccessComObjectFactory factory,
|
||||||
|
MxAccessEventQueue eventQueue,
|
||||||
|
Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
|
||||||
|
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with all dependencies.
|
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with all dependencies.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -122,14 +162,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
/// <param name="workerProcessId">Worker process identifier.</param>
|
/// <param name="workerProcessId">Worker process identifier.</param>
|
||||||
/// <param name="cancellationToken">Cancellation token.</param>
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
/// <returns>Worker ready message.</returns>
|
/// <returns>Worker ready message.</returns>
|
||||||
public Task<WorkerReady> StartAsync(
|
public async Task<WorkerReady> StartAsync(
|
||||||
string sessionId,
|
string sessionId,
|
||||||
int workerProcessId,
|
int workerProcessId,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
staRuntime.Start();
|
staRuntime.Start();
|
||||||
|
|
||||||
return staRuntime.InvokeAsync(
|
WorkerReady ready = await staRuntime.InvokeAsync(
|
||||||
() =>
|
() =>
|
||||||
{
|
{
|
||||||
if (session is not null)
|
if (session is not null)
|
||||||
@@ -151,7 +191,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
|
|
||||||
return session.CreateWorkerReady(workerProcessId);
|
return session.CreateWorkerReady(workerProcessId);
|
||||||
},
|
},
|
||||||
cancellationToken);
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (alarmCommandHandler is not null)
|
||||||
|
{
|
||||||
|
alarmPollCts = new CancellationTokenSource();
|
||||||
|
alarmPollTask = RunAlarmPollLoopAsync(alarmCommandHandler, alarmPollCts.Token);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ready;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task RunAlarmPollLoopAsync(
|
||||||
|
IAlarmCommandHandler handler,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.Run(async () =>
|
||||||
|
{
|
||||||
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(AlarmPollInterval, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await staRuntime.InvokeAsync(
|
||||||
|
() => handler.PollOnce(),
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
// STA runtime or alarm handler disposed — stop the loop gracefully.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (InvalidOperationException)
|
||||||
|
{
|
||||||
|
// STA runtime shutting down — stop the loop gracefully.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, CancellationToken.None);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -307,6 +401,30 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
|
|
||||||
commandDispatcher?.RequestShutdown();
|
commandDispatcher?.RequestShutdown();
|
||||||
|
|
||||||
|
// Cancel the STA poll loop before disposing the alarm handler.
|
||||||
|
// The loop references the alarm handler and must be stopped first
|
||||||
|
// so that no further PollOnce calls race with disposal.
|
||||||
|
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
|
||||||
|
Task? pollTaskToAwait = alarmPollTask;
|
||||||
|
alarmPollCts = null;
|
||||||
|
alarmPollTask = null;
|
||||||
|
if (pollCtsToDispose is not null)
|
||||||
|
{
|
||||||
|
pollCtsToDispose.Cancel();
|
||||||
|
if (pollTaskToAwait is not null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await pollTaskToAwait.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Swallow — poll loop cancellation must not block data shutdown.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pollCtsToDispose.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
// Stop the alarm consumer's polling timer and tear down the
|
// Stop the alarm consumer's polling timer and tear down the
|
||||||
// dispatcher BEFORE the data-side cleanup begins. The alarm
|
// dispatcher BEFORE the data-side cleanup begins. The alarm
|
||||||
// consumer holds a wnwrap COM RCW that needs the STA pump to
|
// consumer holds a wnwrap COM RCW that needs the STA pump to
|
||||||
@@ -382,6 +500,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
|
|
||||||
RequestShutdown();
|
RequestShutdown();
|
||||||
|
|
||||||
|
// Cancel and discard the STA poll loop.
|
||||||
|
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
|
||||||
|
alarmPollCts = null;
|
||||||
|
alarmPollTask = null;
|
||||||
|
if (pollCtsToDispose is not null)
|
||||||
|
{
|
||||||
|
try { pollCtsToDispose.Cancel(); } catch { }
|
||||||
|
try { pollCtsToDispose.Dispose(); } catch { }
|
||||||
|
}
|
||||||
|
|
||||||
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
|
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
|
||||||
alarmCommandHandler = null;
|
alarmCommandHandler = null;
|
||||||
if (alarmHandlerToDispose is not null)
|
if (alarmHandlerToDispose is not null)
|
||||||
|
|||||||
@@ -63,8 +63,16 @@ public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
|
|||||||
private bool subscribed;
|
private bool subscribed;
|
||||||
private bool disposed;
|
private bool disposed;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Production constructor — creates the wnwrap COM object on the
|
||||||
|
/// current thread (must be the worker's STA) and disables the
|
||||||
|
/// internal <see cref="Timer"/> (<c>pollIntervalMilliseconds=0</c>).
|
||||||
|
/// Polling is driven externally by the STA via
|
||||||
|
/// <c>StaRuntime.InvokeAsync(() => consumer.PollOnce())</c> so
|
||||||
|
/// that every COM call stays on the STA that owns the apartment.
|
||||||
|
/// </summary>
|
||||||
public WnWrapAlarmConsumer()
|
public WnWrapAlarmConsumer()
|
||||||
: this(new wwAlarmConsumerClass(), DefaultPollIntervalMilliseconds, DefaultMaxAlarmsPerFetch)
|
: this(new wwAlarmConsumerClass(), pollIntervalMilliseconds: 0, DefaultMaxAlarmsPerFetch)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user