worker(alarms): route ForcedMode/watch-list/failover via AlarmCommandHandler; emit provider-mode-changed event

This commit is contained in:
Joseph Doherty
2026-06-13 10:04:33 -04:00
parent 7241a4fb9c
commit 3f5e5fc0b3
9 changed files with 366 additions and 43 deletions
@@ -51,7 +51,7 @@ public sealed class WorkerPipeSession
options,
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession((eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity)),
() => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)),
logger)
{
}
@@ -72,7 +72,7 @@ public sealed class WorkerPipeSession
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession((eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity)),
() => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)),
logger: null)
{
}
@@ -867,7 +867,7 @@ public sealed class WorkerPipeSession
// parameterless CompleteStartupHandshakeAsync is used without a
// prior factory call.
_runtimeSession ??= new MxAccessStaSession(
(eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity));
(eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null));
IWorkerRuntimeSession session = _runtimeSession;
try
{
@@ -37,8 +37,14 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
private readonly MxAccessEventQueue eventQueue;
private readonly Func<IMxAccessAlarmConsumer> consumerFactory;
private readonly Action? threadAffinityCheck;
private readonly IMxAccessComObjectFactory? comFactory;
private readonly Func<IReadOnlyList<AlarmSubtagTarget>, IMxAccessAlarmConsumer>? standbyFactory;
private readonly MxAccessEventMapper mapper = new MxAccessEventMapper();
private readonly object syncRoot = new object();
private AlarmDispatcher? dispatcher;
private FailoverAlarmConsumer? failoverConsumer;
private EventHandler<AlarmProviderModeChange>? providerModeChangedHandler;
private string subscribeSessionId = string.Empty;
private bool disposed;
/// <summary>Initializes a new alarm command handler with the given event queue.</summary>
@@ -79,10 +85,49 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
MxAccessEventQueue eventQueue,
Func<IMxAccessAlarmConsumer> consumerFactory,
Action? threadAffinityCheck)
: this(eventQueue, consumerFactory, threadAffinityCheck, comFactory: null, standbyFactory: null)
{
}
/// <summary>
/// Full constructor that also threads the MXAccess COM-object factory and
/// an optional standby-consumer seam so the subscribe path can build the
/// subtag / failover consumers required by
/// <see cref="SubscribeAlarmsCommand.ForcedMode"/> and
/// <see cref="SubscribeAlarmsCommand.WatchList"/>.
/// </summary>
/// <param name="eventQueue">The event queue.</param>
/// <param name="consumerFactory">
/// Factory for the PRIMARY (alarmmgr) consumer — the existing
/// wnwrap-backed source. Used alone in alarmmgr mode and as the primary
/// of the failover composite in auto mode.
/// </param>
/// <param name="threadAffinityCheck">Optional STA thread-affinity guard.</param>
/// <param name="comFactory">
/// The MXAccess COM-object factory used to build the
/// <see cref="LmxSubtagAlarmSource"/> backing the subtag consumer. May be
/// <see langword="null"/> when a <paramref name="standbyFactory"/> is
/// supplied (tests) or when only the alarmmgr path is ever exercised.
/// </param>
/// <param name="standbyFactory">
/// Optional seam that builds the STANDBY (subtag) consumer from a watch
/// list. Defaults to a <see cref="SubtagAlarmConsumer"/> over an
/// <see cref="LmxSubtagAlarmSource"/> built from
/// <paramref name="comFactory"/>. Tests inject a fake so they need no
/// live COM factory.
/// </param>
public AlarmCommandHandler(
MxAccessEventQueue eventQueue,
Func<IMxAccessAlarmConsumer> consumerFactory,
Action? threadAffinityCheck,
IMxAccessComObjectFactory? comFactory,
Func<IReadOnlyList<AlarmSubtagTarget>, IMxAccessAlarmConsumer>? standbyFactory)
{
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
this.consumerFactory = consumerFactory ?? throw new ArgumentNullException(nameof(consumerFactory));
this.threadAffinityCheck = threadAffinityCheck;
this.comFactory = comFactory;
this.standbyFactory = standbyFactory;
}
/// <summary>Gets a value indicating whether the handler is subscribed.</summary>
@@ -92,10 +137,10 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
}
/// <inheritdoc />
public void Subscribe(string subscription, string sessionId)
public void Subscribe(SubscribeAlarmsCommand command, string sessionId)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
if (subscription is null) throw new ArgumentNullException(nameof(subscription));
if (command is null) throw new ArgumentNullException(nameof(command));
threadAffinityCheck?.Invoke();
lock (syncRoot)
@@ -106,17 +151,31 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
"AlarmCommandHandler already has an active subscription; " +
"call Unsubscribe before issuing another SubscribeAlarms command.");
}
IMxAccessAlarmConsumer consumer = consumerFactory()
subscribeSessionId = sessionId ?? string.Empty;
IMxAccessAlarmConsumer consumer = BuildConsumer(command)
?? throw new InvalidOperationException("Alarm consumer factory returned null.");
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(
eventQueue, new MxAccessEventMapper());
dispatcher = new AlarmDispatcher(consumer, sink, sessionId ?? string.Empty);
// When the selected consumer is a failover composite, surface its
// provider switches onto the worker's event queue so connected
// gateway clients can observe degraded/recovered state. The handler
// is unsubscribed/disposed on Unsubscribe/Dispose below.
if (consumer is FailoverAlarmConsumer failover)
{
failoverConsumer = failover;
providerModeChangedHandler = OnProviderModeChanged;
failover.ProviderModeChanged += providerModeChangedHandler;
}
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(eventQueue, mapper);
dispatcher = new AlarmDispatcher(consumer, sink, subscribeSessionId);
try
{
dispatcher.Subscribe(subscription);
dispatcher.Subscribe(command.SubscriptionExpression ?? string.Empty);
}
catch
{
DetachProviderModeChanged();
try { dispatcher.Dispose(); } catch { /* swallow */ }
dispatcher = null;
throw;
@@ -124,6 +183,89 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
}
}
/// <summary>
/// Selects and builds the alarm consumer from the command's
/// <see cref="SubscribeAlarmsCommand.ForcedMode"/> and
/// <see cref="SubscribeAlarmsCommand.WatchList"/>:
/// <list type="bullet">
/// <item><description>
/// <c>Alarmmgr</c>, or <c>Unspecified</c> with an empty watch
/// list: the existing primary (alarmmgr) consumer only —
/// today's behavior.
/// </description></item>
/// <item><description>
/// <c>Subtag</c>: a <see cref="SubtagAlarmConsumer"/> only.
/// </description></item>
/// <item><description>
/// <c>Unspecified</c> with a non-empty watch list (auto): a
/// <see cref="FailoverAlarmConsumer"/> over the primary and a
/// subtag standby.
/// </description></item>
/// </list>
/// </summary>
private IMxAccessAlarmConsumer BuildConsumer(SubscribeAlarmsCommand command)
{
List<AlarmSubtagTarget> watchList = new List<AlarmSubtagTarget>(command.WatchList);
if (command.ForcedMode == AlarmProviderMode.Subtag)
{
return BuildStandby(watchList);
}
if (command.ForcedMode == AlarmProviderMode.Unspecified && watchList.Count > 0)
{
IMxAccessAlarmConsumer primary = consumerFactory()
?? throw new InvalidOperationException("Alarm consumer factory returned null.");
IMxAccessAlarmConsumer standby = BuildStandby(watchList);
AlarmFailoverConfig? failoverConfig = command.Failover;
FailoverSettings settings = new FailoverSettings(
failoverConfig?.ConsecutiveFailureThreshold ?? 3,
failoverConfig?.FailbackProbeIntervalSeconds ?? 30,
failoverConfig?.FailbackStableProbes ?? 3);
return new FailoverAlarmConsumer(primary, standby, settings);
}
// Alarmmgr, or Unspecified with an empty watch list — primary only.
return consumerFactory()
?? throw new InvalidOperationException("Alarm consumer factory returned null.");
}
private IMxAccessAlarmConsumer BuildStandby(IReadOnlyList<AlarmSubtagTarget> watchList)
{
if (standbyFactory is not null)
{
return standbyFactory(watchList)
?? throw new InvalidOperationException("Standby alarm consumer factory returned null.");
}
if (comFactory is null)
{
throw new InvalidOperationException(
"Subtag alarm consumer requires an IMxAccessComObjectFactory; the alarm command "
+ "handler was constructed without one and no standby factory was supplied.");
}
return new SubtagAlarmConsumer(new LmxSubtagAlarmSource(comFactory), watchList);
}
private void OnProviderModeChanged(object? sender, AlarmProviderModeChange change)
{
if (change is null) return;
eventQueue.Enqueue(mapper.CreateOnAlarmProviderModeChanged(
subscribeSessionId, change.Mode, change.Reason, change.HResult, change.AtUtc));
}
private void DetachProviderModeChanged()
{
if (failoverConsumer is not null && providerModeChangedHandler is not null)
{
try { failoverConsumer.ProviderModeChanged -= providerModeChangedHandler; }
catch { /* swallow */ }
}
failoverConsumer = null;
providerModeChangedHandler = null;
}
/// <inheritdoc />
public void Unsubscribe()
{
@@ -131,6 +273,7 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler
AlarmDispatcher? toDispose;
lock (syncRoot)
{
DetachProviderModeChanged();
toDispose = dispatcher;
dispatcher = null;
}
@@ -13,10 +13,16 @@ namespace ZB.MOM.WW.MxGateway.Worker.MxAccess;
/// </summary>
public interface IAlarmCommandHandler : IDisposable
{
/// <summary>Begin a subscription against the supplied AVEVA alarm-provider expression.</summary>
/// <param name="subscription">The AVEVA alarm-provider subscription expression.</param>
/// <summary>
/// Begin an alarm subscription from the supplied command. The command's
/// <see cref="SubscribeAlarmsCommand.ForcedMode"/> and
/// <see cref="SubscribeAlarmsCommand.WatchList"/> select the consumer:
/// alarmmgr-only (the default), subtag-only, or an auto-failover
/// composite over both.
/// </summary>
/// <param name="command">The full SubscribeAlarms command.</param>
/// <param name="sessionId">The session identifier.</param>
void Subscribe(string subscription, string sessionId);
void Subscribe(SubscribeAlarmsCommand command, string sessionId);
/// <summary>Tear down the active subscription. No-op if not subscribed.</summary>
void Unsubscribe();
@@ -598,7 +598,8 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
"SubscribeAlarms requires an alarm command handler; the worker was constructed without one.");
}
string subscription = command.Command.SubscribeAlarms.SubscriptionExpression ?? string.Empty;
SubscribeAlarmsCommand subscribeCommand = command.Command.SubscribeAlarms;
string subscription = subscribeCommand.SubscriptionExpression ?? string.Empty;
if (string.IsNullOrWhiteSpace(subscription))
{
return CreateInvalidRequestReply(command, "SubscribeAlarms.subscription_expression is required.");
@@ -606,7 +607,7 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
try
{
alarmCommandHandler.Subscribe(subscription, command.SessionId);
alarmCommandHandler.Subscribe(subscribeCommand, command.SessionId);
return CreateOkReply(command);
}
catch (Exception ex)
@@ -182,6 +182,43 @@ public sealed class MxAccessEventMapper
return mxEvent;
}
/// <summary>
/// Creates an OnAlarmProviderModeChanged event from a
/// <see cref="FailoverAlarmConsumer"/> provider switch. The worker's
/// alarm path drives this when the failover composite switches between
/// the alarmmgr primary and the subtag standby, so connected gateway
/// clients can observe the degraded/recovered provider state.
/// </summary>
/// <param name="sessionId">Identifier of the session.</param>
/// <param name="mode">The provider mode now active after the switch.</param>
/// <param name="reason">Human-readable reason for the switch.</param>
/// <param name="hresult">The COM HRESULT that triggered a failover, or 0 for a clean failback.</param>
/// <param name="atUtc">The UTC instant the switch occurred.</param>
public MxEvent CreateOnAlarmProviderModeChanged(
string sessionId,
AlarmProviderMode mode,
string reason,
int hresult,
DateTime atUtc)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnAlarmProviderModeChanged,
sessionId,
serverHandle: 0,
itemHandle: 0,
statuses: null);
mxEvent.OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = mode,
Reason = reason ?? string.Empty,
Hresult = hresult,
At = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(
DateTime.SpecifyKind(atUtc, DateTimeKind.Utc)),
};
return mxEvent;
}
/// <summary>Creates an OnBufferedDataChange event from MXAccess COM event arguments.</summary>
/// <param name="sessionId">Identifier of the session.</param>
/// <param name="serverHandle">Handle returned by the worker.</param>
@@ -23,7 +23,10 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
// then invokes the guard at the entry of every method that touches the
// wnwrap consumer, matching the STA-affinity invariant already enforced
// for the poll path via EnsureOnAlarmConsumerThread.
private readonly Func<MxAccessEventQueue, Action, IAlarmCommandHandler>? alarmCommandHandlerFactory;
// Worker-9: the third arg is the session's IMxAccessComObjectFactory, so
// the handler can build the subtag-fallback source's own proxy-server COM
// object on this STA when a subscribe selects the subtag / failover path.
private readonly Func<MxAccessEventQueue, Action, IMxAccessComObjectFactory, IAlarmCommandHandler>? alarmCommandHandlerFactory;
private StaCommandDispatcher? commandDispatcher;
private MxAccessSession? session;
private IAlarmCommandHandler? alarmCommandHandler;
@@ -51,7 +54,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
/// of alarm-side commands.
/// </summary>
/// <param name="alarmCommandHandlerFactory">Factory that constructs the alarm-command handler.</param>
internal MxAccessStaSession(Func<MxAccessEventQueue, Action, IAlarmCommandHandler>? alarmCommandHandlerFactory)
internal MxAccessStaSession(Func<MxAccessEventQueue, Action, IMxAccessComObjectFactory, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
@@ -103,7 +106,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
MxAccessEventQueue eventQueue,
Func<MxAccessEventQueue, Action, IAlarmCommandHandler>? alarmCommandHandlerFactory)
Func<MxAccessEventQueue, Action, IMxAccessComObjectFactory, IAlarmCommandHandler>? alarmCommandHandlerFactory)
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory)
{
}
@@ -141,7 +144,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink,
MxAccessEventQueue eventQueue,
Func<MxAccessEventQueue, Action, IAlarmCommandHandler>? alarmCommandHandlerFactory)
Func<MxAccessEventQueue, Action, IMxAccessComObjectFactory, IAlarmCommandHandler>? alarmCommandHandlerFactory)
{
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
@@ -209,9 +212,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
// on convention alone; a future refactor that let a
// command run off-STA would silently deadlock on
// cross-apartment marshaling against the wnwrap consumer.
// Worker-9: the factory also receives the session's
// IMxAccessComObjectFactory so the subtag-fallback source
// (LmxSubtagAlarmSource) can create its OWN proxy-server COM
// object on this STA, isolated from the item pipeline's
// MxAccessSession. The factory call runs on the STA, so the
// resulting source is bound to the correct apartment.
alarmCommandHandler = alarmCommandHandlerFactory(
eventQueue,
EnsureOnAlarmConsumerThread);
EnsureOnAlarmConsumerThread,
factory);
}
commandDispatcher = new StaCommandDispatcher(
staRuntime,