diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs index 27b2f08..9c6e55b 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandExecutorTests.cs @@ -372,11 +372,11 @@ public sealed class AlarmCommandExecutorTests public string? LastFilterPrefix { get; private set; } /// Records a subscription. - /// The subscription expression. + /// The subscribe-alarms command. /// The session identifier. - public void Subscribe(string subscription, string sessionId) + public void Subscribe(SubscribeAlarmsCommand command, string sessionId) { - LastSubscription = subscription; + LastSubscription = command.SubscriptionExpression; LastSessionId = sessionId; } diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs index fdd004a..87382fe 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmCommandHandlerTests.cs @@ -21,7 +21,7 @@ public sealed class AlarmCommandHandlerTests new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!Area", "session-1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!Area" }, "session-1"); Assert.True(handler.IsSubscribed); Assert.Equal(@"\\HOST\Galaxy!Area", consumer.LastSubscription); @@ -36,9 +36,9 @@ public sealed class AlarmCommandHandlerTests new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); Assert.Throws( - () => handler.Subscribe(@"\\HOST\Galaxy!B", "s1")); + () => handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!B" }, "s1")); } /// @@ -63,7 +63,7 @@ public sealed class AlarmCommandHandlerTests () => consumer); InvalidOperationException exception = Assert.Throws( - () => handler.Subscribe(@"\\HOST\Galaxy!A", "s1")); + () => handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1")); Assert.Contains("simulated wnwrap subscribe failure", exception.Message); Assert.False(handler.IsSubscribed); Assert.True(consumer.Disposed); @@ -77,7 +77,7 @@ public sealed class AlarmCommandHandlerTests AlarmCommandHandler handler = new AlarmCommandHandler( new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); handler.Unsubscribe(); @@ -104,7 +104,7 @@ public sealed class AlarmCommandHandlerTests AlarmCommandHandler handler = new AlarmCommandHandler( new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); Guid g = Guid.NewGuid(); int rc = handler.Acknowledge(g, "c", "u", "n", "d", "F"); @@ -149,7 +149,7 @@ public sealed class AlarmCommandHandlerTests AlarmCommandHandler handler = new AlarmCommandHandler( new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); IReadOnlyList snapshots = handler.QueryActive(null); @@ -173,7 +173,7 @@ public sealed class AlarmCommandHandlerTests AlarmCommandHandler handler = new AlarmCommandHandler( new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); IReadOnlyList filtered = handler.QueryActive("Galaxy!AreaA"); @@ -189,13 +189,13 @@ public sealed class AlarmCommandHandlerTests AlarmCommandHandler handler = new AlarmCommandHandler( new MxAccessEventQueue(), () => consumer); - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); handler.Dispose(); Assert.True(consumer.Disposed); Assert.Throws( - () => handler.Subscribe("x", "y")); + () => handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = "x" }, "y")); } /// @@ -218,7 +218,7 @@ public sealed class AlarmCommandHandlerTests // factory is invoked. We tally invocation counts after each call so // that a missed guard surfaces as the diagnostic count, not a generic // "Subscribe should have failed". - handler.Subscribe(@"\\HOST\Galaxy!A", "s1"); + handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); Assert.Equal(1, guardInvocations); handler.Acknowledge(Guid.NewGuid(), "c", "u", "n", "d", "F"); @@ -254,7 +254,7 @@ public sealed class AlarmCommandHandlerTests // Subscribe: guard runs before the dispatcher is constructed. Assert.Throws( - () => handler.Subscribe(@"\\HOST\Galaxy!A", "s1")); + () => handler.Subscribe(new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1")); // To exercise the other entry points we need a subscribed handler. // Construct a parallel handler with a passing guard, then swap in a @@ -273,6 +273,132 @@ public sealed class AlarmCommandHandlerTests Assert.Throws(() => handler.Unsubscribe()); } + /// + /// Worker-9: ForcedMode=Subtag builds a subtag consumer (via the + /// injected standby factory) and advises it — the primary + /// (alarmmgr) consumer is NOT created. + /// + [Fact] + public void Subscribe_WithForcedSubtagMode_BuildsStandbyConsumerOnly() + { + FakeConsumer primary = new FakeConsumer(); + FakeConsumer standby = new FakeConsumer(); + IReadOnlyList? capturedWatchList = null; + AlarmCommandHandler handler = new AlarmCommandHandler( + new MxAccessEventQueue(), + () => primary, + threadAffinityCheck: null, + comFactory: null, + standbyFactory: watch => + { + capturedWatchList = watch; + return standby; + }); + + SubscribeAlarmsCommand command = new SubscribeAlarmsCommand + { + SubscriptionExpression = @"\\HOST\Galaxy!Area", + ForcedMode = AlarmProviderMode.Subtag, + }; + command.WatchList.Add(new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Level.HiHi" }); + + handler.Subscribe(command, "s1"); + + Assert.True(handler.IsSubscribed); + Assert.Equal(@"\\HOST\Galaxy!Area", standby.LastSubscription); // standby advised + Assert.Null(primary.LastSubscription); // primary never built + Assert.NotNull(capturedWatchList); + Assert.Single(capturedWatchList!); + } + + /// + /// Worker-9: ForcedMode=Unspecified + a non-empty watch list builds a + /// failover composite (primary + subtag standby). Forcing the primary + /// to fail on subscribe with a threshold of 1 drives the composite to + /// switch to the subtag provider, which must enqueue an + /// OnAlarmProviderModeChanged event carrying mode=Subtag. + /// + [Fact] + public void Subscribe_AutoModeWithWatchList_FailoverModeChange_EnqueuesProviderModeChangedEvent() + { + FakeConsumer primary = new FakeConsumer { ThrowOnSubscribe = true }; + FakeConsumer standby = new FakeConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + AlarmCommandHandler handler = new AlarmCommandHandler( + queue, + () => primary, + threadAffinityCheck: null, + comFactory: null, + standbyFactory: _ => standby); + + SubscribeAlarmsCommand command = new SubscribeAlarmsCommand + { + SubscriptionExpression = @"\\HOST\Galaxy!Area", + ForcedMode = AlarmProviderMode.Unspecified, + Failover = new AlarmFailoverConfig + { + ConsecutiveFailureThreshold = 1, + FailbackProbeIntervalSeconds = 1, + FailbackStableProbes = 1, + }, + }; + command.WatchList.Add(new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Level.HiHi" }); + + // Subscribe: standby is armed cleanly; the primary subscribe throws and, + // at threshold 1, the failover composite switches to standby and raises + // ProviderModeChanged. The handler enqueues the proto event. + handler.Subscribe(command, "s1"); + + IReadOnlyList drained = queue.Drain(0); + Assert.Single(drained); + MxEvent evt = drained[0].Event; + Assert.Equal(MxEventFamily.OnAlarmProviderModeChanged, evt.Family); + Assert.Equal("s1", evt.SessionId); + Assert.NotNull(evt.OnAlarmProviderModeChanged); + Assert.Equal(AlarmProviderMode.Subtag, evt.OnAlarmProviderModeChanged.Mode); + } + + /// + /// Worker-9: a non-failover subscribe (alarmmgr-only) never enqueues a + /// provider-mode-changed event, and a subsequent Unsubscribe detaches + /// the handler so no event leaks. + /// + [Fact] + public void Subscribe_AlarmmgrOnly_DoesNotEnqueueProviderModeChangedEvent() + { + FakeConsumer consumer = new FakeConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + AlarmCommandHandler handler = new AlarmCommandHandler(queue, () => consumer); + + handler.Subscribe( + new SubscribeAlarmsCommand { SubscriptionExpression = @"\\HOST\Galaxy!A" }, "s1"); + handler.Unsubscribe(); + + Assert.Empty(queue.Drain(0)); + } + + /// + /// Worker-9: the mapper builds a well-formed OnAlarmProviderModeChanged + /// MxEvent — correct family and populated body fields. + /// + [Fact] + public void Mapper_CreateOnAlarmProviderModeChanged_PopulatesFamilyAndBody() + { + MxAccessEventMapper mapper = new MxAccessEventMapper(); + DateTime at = new DateTime(2026, 6, 13, 10, 0, 0, DateTimeKind.Utc); + + MxEvent evt = mapper.CreateOnAlarmProviderModeChanged( + "session-7", AlarmProviderMode.Subtag, "primary PollOnce failed", unchecked((int)0x80004005), at); + + Assert.Equal(MxEventFamily.OnAlarmProviderModeChanged, evt.Family); + Assert.Equal("session-7", evt.SessionId); + Assert.NotNull(evt.OnAlarmProviderModeChanged); + Assert.Equal(AlarmProviderMode.Subtag, evt.OnAlarmProviderModeChanged.Mode); + Assert.Equal("primary PollOnce failed", evt.OnAlarmProviderModeChanged.Reason); + Assert.Equal(unchecked((int)0x80004005), evt.OnAlarmProviderModeChanged.Hresult); + Assert.Equal(at, evt.OnAlarmProviderModeChanged.At.ToDateTime()); + } + private static MxAlarmSnapshotRecord NewRecord(string provider, string group, string tag) { return new MxAlarmSnapshotRecord diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs index fcf563a..7482041 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs @@ -200,7 +200,7 @@ public sealed class MxAccessStaSessionTests factory, eventSink, new MxAccessEventQueue(), - (_eq, _affinity) => handler); + (_eq, _affinity, _comFactory) => handler); await session.StartAsync("session-1", workerProcessId: 1); @@ -279,7 +279,7 @@ public sealed class MxAccessStaSessionTests factory, eventSink, new MxAccessEventQueue(), - (_eq, _affinity) => handler); + (_eq, _affinity, _comFactory) => handler); await session.StartAsync("session-1", workerProcessId: 1); @@ -320,7 +320,7 @@ public sealed class MxAccessStaSessionTests factory, eventSink, new MxAccessEventQueue(), - (_eq, _affinity) => handler); + (_eq, _affinity, _comFactory) => handler); await session.StartAsync("session-1", workerProcessId: 1); @@ -369,7 +369,7 @@ public sealed class MxAccessStaSessionTests factory, eventSink, eventQueue, - (_eq, _affinity) => handler); + (_eq, _affinity, _comFactory) => handler); await session.StartAsync("session-1", workerProcessId: 1); @@ -416,7 +416,7 @@ public sealed class MxAccessStaSessionTests factory, eventSink, eventQueue, - (_eq, _affinity) => handler); + (_eq, _affinity, _comFactory) => handler); await session.StartAsync("session-1", workerProcessId: 1); @@ -496,12 +496,12 @@ public sealed class MxAccessStaSessionTests } /// Subscribes to alarm events. - /// The subscription descriptor. + /// The subscribe-alarms command. /// The session identifier. - public void Subscribe(string subscription, string sessionId) + public void Subscribe(SubscribeAlarmsCommand command, string sessionId) { IsSubscribed = true; - LastSubscription = subscription; + LastSubscription = command.SubscriptionExpression; } /// Unsubscribes from alarm events. diff --git a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs index fa9f297..238b77a 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -51,7 +51,7 @@ public sealed class WorkerPipeSession options, () => Process.GetCurrentProcess().Id, new WorkerPipeSessionOptions(), - () => new MxAccessStaSession((eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity)), + () => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)), logger) { } @@ -72,7 +72,7 @@ public sealed class WorkerPipeSession options, processIdProvider, new WorkerPipeSessionOptions(), - () => new MxAccessStaSession((eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity)), + () => new MxAccessStaSession((eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)), logger: null) { } @@ -867,7 +867,7 @@ public sealed class WorkerPipeSession // parameterless CompleteStartupHandshakeAsync is used without a // prior factory call. _runtimeSession ??= new MxAccessStaSession( - (eq, affinity) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity)); + (eq, affinity, comFactory) => new AlarmCommandHandler(eq, () => new WnWrapAlarmConsumer(), affinity, comFactory, standbyFactory: null)); IWorkerRuntimeSession session = _runtimeSession; try { diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmCommandHandler.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmCommandHandler.cs index 1afa909..16a2be3 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmCommandHandler.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmCommandHandler.cs @@ -37,8 +37,14 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler private readonly MxAccessEventQueue eventQueue; private readonly Func consumerFactory; private readonly Action? threadAffinityCheck; + private readonly IMxAccessComObjectFactory? comFactory; + private readonly Func, IMxAccessAlarmConsumer>? standbyFactory; + private readonly MxAccessEventMapper mapper = new MxAccessEventMapper(); private readonly object syncRoot = new object(); private AlarmDispatcher? dispatcher; + private FailoverAlarmConsumer? failoverConsumer; + private EventHandler? providerModeChangedHandler; + private string subscribeSessionId = string.Empty; private bool disposed; /// Initializes a new alarm command handler with the given event queue. @@ -79,10 +85,49 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler MxAccessEventQueue eventQueue, Func consumerFactory, Action? threadAffinityCheck) + : this(eventQueue, consumerFactory, threadAffinityCheck, comFactory: null, standbyFactory: null) + { + } + + /// + /// Full constructor that also threads the MXAccess COM-object factory and + /// an optional standby-consumer seam so the subscribe path can build the + /// subtag / failover consumers required by + /// and + /// . + /// + /// The event queue. + /// + /// Factory for the PRIMARY (alarmmgr) consumer — the existing + /// wnwrap-backed source. Used alone in alarmmgr mode and as the primary + /// of the failover composite in auto mode. + /// + /// Optional STA thread-affinity guard. + /// + /// The MXAccess COM-object factory used to build the + /// backing the subtag consumer. May be + /// when a is + /// supplied (tests) or when only the alarmmgr path is ever exercised. + /// + /// + /// Optional seam that builds the STANDBY (subtag) consumer from a watch + /// list. Defaults to a over an + /// built from + /// . Tests inject a fake so they need no + /// live COM factory. + /// + public AlarmCommandHandler( + MxAccessEventQueue eventQueue, + Func consumerFactory, + Action? threadAffinityCheck, + IMxAccessComObjectFactory? comFactory, + Func, IMxAccessAlarmConsumer>? standbyFactory) { this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue)); this.consumerFactory = consumerFactory ?? throw new ArgumentNullException(nameof(consumerFactory)); this.threadAffinityCheck = threadAffinityCheck; + this.comFactory = comFactory; + this.standbyFactory = standbyFactory; } /// Gets a value indicating whether the handler is subscribed. @@ -92,10 +137,10 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler } /// - public void Subscribe(string subscription, string sessionId) + public void Subscribe(SubscribeAlarmsCommand command, string sessionId) { if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler)); - if (subscription is null) throw new ArgumentNullException(nameof(subscription)); + if (command is null) throw new ArgumentNullException(nameof(command)); threadAffinityCheck?.Invoke(); lock (syncRoot) @@ -106,17 +151,31 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler "AlarmCommandHandler already has an active subscription; " + "call Unsubscribe before issuing another SubscribeAlarms command."); } - IMxAccessAlarmConsumer consumer = consumerFactory() + + subscribeSessionId = sessionId ?? string.Empty; + IMxAccessAlarmConsumer consumer = BuildConsumer(command) ?? throw new InvalidOperationException("Alarm consumer factory returned null."); - MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink( - eventQueue, new MxAccessEventMapper()); - dispatcher = new AlarmDispatcher(consumer, sink, sessionId ?? string.Empty); + + // When the selected consumer is a failover composite, surface its + // provider switches onto the worker's event queue so connected + // gateway clients can observe degraded/recovered state. The handler + // is unsubscribed/disposed on Unsubscribe/Dispose below. + if (consumer is FailoverAlarmConsumer failover) + { + failoverConsumer = failover; + providerModeChangedHandler = OnProviderModeChanged; + failover.ProviderModeChanged += providerModeChangedHandler; + } + + MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(eventQueue, mapper); + dispatcher = new AlarmDispatcher(consumer, sink, subscribeSessionId); try { - dispatcher.Subscribe(subscription); + dispatcher.Subscribe(command.SubscriptionExpression ?? string.Empty); } catch { + DetachProviderModeChanged(); try { dispatcher.Dispose(); } catch { /* swallow */ } dispatcher = null; throw; @@ -124,6 +183,89 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler } } + /// + /// Selects and builds the alarm consumer from the command's + /// and + /// : + /// + /// + /// Alarmmgr, or Unspecified with an empty watch + /// list: the existing primary (alarmmgr) consumer only — + /// today's behavior. + /// + /// + /// Subtag: a only. + /// + /// + /// Unspecified with a non-empty watch list (auto): a + /// over the primary and a + /// subtag standby. + /// + /// + /// + private IMxAccessAlarmConsumer BuildConsumer(SubscribeAlarmsCommand command) + { + List watchList = new List(command.WatchList); + + if (command.ForcedMode == AlarmProviderMode.Subtag) + { + return BuildStandby(watchList); + } + + if (command.ForcedMode == AlarmProviderMode.Unspecified && watchList.Count > 0) + { + IMxAccessAlarmConsumer primary = consumerFactory() + ?? throw new InvalidOperationException("Alarm consumer factory returned null."); + IMxAccessAlarmConsumer standby = BuildStandby(watchList); + AlarmFailoverConfig? failoverConfig = command.Failover; + FailoverSettings settings = new FailoverSettings( + failoverConfig?.ConsecutiveFailureThreshold ?? 3, + failoverConfig?.FailbackProbeIntervalSeconds ?? 30, + failoverConfig?.FailbackStableProbes ?? 3); + return new FailoverAlarmConsumer(primary, standby, settings); + } + + // Alarmmgr, or Unspecified with an empty watch list — primary only. + return consumerFactory() + ?? throw new InvalidOperationException("Alarm consumer factory returned null."); + } + + private IMxAccessAlarmConsumer BuildStandby(IReadOnlyList watchList) + { + if (standbyFactory is not null) + { + return standbyFactory(watchList) + ?? throw new InvalidOperationException("Standby alarm consumer factory returned null."); + } + + if (comFactory is null) + { + throw new InvalidOperationException( + "Subtag alarm consumer requires an IMxAccessComObjectFactory; the alarm command " + + "handler was constructed without one and no standby factory was supplied."); + } + + return new SubtagAlarmConsumer(new LmxSubtagAlarmSource(comFactory), watchList); + } + + private void OnProviderModeChanged(object? sender, AlarmProviderModeChange change) + { + if (change is null) return; + eventQueue.Enqueue(mapper.CreateOnAlarmProviderModeChanged( + subscribeSessionId, change.Mode, change.Reason, change.HResult, change.AtUtc)); + } + + private void DetachProviderModeChanged() + { + if (failoverConsumer is not null && providerModeChangedHandler is not null) + { + try { failoverConsumer.ProviderModeChanged -= providerModeChangedHandler; } + catch { /* swallow */ } + } + failoverConsumer = null; + providerModeChangedHandler = null; + } + /// public void Unsubscribe() { @@ -131,6 +273,7 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler AlarmDispatcher? toDispose; lock (syncRoot) { + DetachProviderModeChanged(); toDispose = dispatcher; dispatcher = null; } diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs index bad5680..9b359ca 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs @@ -13,10 +13,16 @@ namespace ZB.MOM.WW.MxGateway.Worker.MxAccess; /// public interface IAlarmCommandHandler : IDisposable { - /// Begin a subscription against the supplied AVEVA alarm-provider expression. - /// The AVEVA alarm-provider subscription expression. + /// + /// Begin an alarm subscription from the supplied command. The command's + /// and + /// select the consumer: + /// alarmmgr-only (the default), subtag-only, or an auto-failover + /// composite over both. + /// + /// The full SubscribeAlarms command. /// The session identifier. - void Subscribe(string subscription, string sessionId); + void Subscribe(SubscribeAlarmsCommand command, string sessionId); /// Tear down the active subscription. No-op if not subscribed. void Unsubscribe(); diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessCommandExecutor.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessCommandExecutor.cs index 446be67..1dfd718 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessCommandExecutor.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessCommandExecutor.cs @@ -598,7 +598,8 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor "SubscribeAlarms requires an alarm command handler; the worker was constructed without one."); } - string subscription = command.Command.SubscribeAlarms.SubscriptionExpression ?? string.Empty; + SubscribeAlarmsCommand subscribeCommand = command.Command.SubscribeAlarms; + string subscription = subscribeCommand.SubscriptionExpression ?? string.Empty; if (string.IsNullOrWhiteSpace(subscription)) { return CreateInvalidRequestReply(command, "SubscribeAlarms.subscription_expression is required."); @@ -606,7 +607,7 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor try { - alarmCommandHandler.Subscribe(subscription, command.SessionId); + alarmCommandHandler.Subscribe(subscribeCommand, command.SessionId); return CreateOkReply(command); } catch (Exception ex) diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessEventMapper.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessEventMapper.cs index 08f844f..5a66f43 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessEventMapper.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessEventMapper.cs @@ -182,6 +182,43 @@ public sealed class MxAccessEventMapper return mxEvent; } + /// + /// Creates an OnAlarmProviderModeChanged event from a + /// provider switch. The worker's + /// alarm path drives this when the failover composite switches between + /// the alarmmgr primary and the subtag standby, so connected gateway + /// clients can observe the degraded/recovered provider state. + /// + /// Identifier of the session. + /// The provider mode now active after the switch. + /// Human-readable reason for the switch. + /// The COM HRESULT that triggered a failover, or 0 for a clean failback. + /// The UTC instant the switch occurred. + public MxEvent CreateOnAlarmProviderModeChanged( + string sessionId, + AlarmProviderMode mode, + string reason, + int hresult, + DateTime atUtc) + { + MxEvent mxEvent = CreateBaseEvent( + MxEventFamily.OnAlarmProviderModeChanged, + sessionId, + serverHandle: 0, + itemHandle: 0, + statuses: null); + + mxEvent.OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent + { + Mode = mode, + Reason = reason ?? string.Empty, + Hresult = hresult, + At = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime( + DateTime.SpecifyKind(atUtc, DateTimeKind.Utc)), + }; + return mxEvent; + } + /// Creates an OnBufferedDataChange event from MXAccess COM event arguments. /// Identifier of the session. /// Handle returned by the worker. diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessStaSession.cs index cd8448b..385ddab 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -23,7 +23,10 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession // then invokes the guard at the entry of every method that touches the // wnwrap consumer, matching the STA-affinity invariant already enforced // for the poll path via EnsureOnAlarmConsumerThread. - private readonly Func? alarmCommandHandlerFactory; + // Worker-9: the third arg is the session's IMxAccessComObjectFactory, so + // the handler can build the subtag-fallback source's own proxy-server COM + // object on this STA when a subscribe selects the subtag / failover path. + private readonly Func? alarmCommandHandlerFactory; private StaCommandDispatcher? commandDispatcher; private MxAccessSession? session; private IAlarmCommandHandler? alarmCommandHandler; @@ -51,7 +54,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession /// of alarm-side commands. /// /// Factory that constructs the alarm-command handler. - internal MxAccessStaSession(Func? alarmCommandHandlerFactory) + internal MxAccessStaSession(Func? alarmCommandHandlerFactory) : this( new StaRuntime(), new MxAccessComObjectFactory(), @@ -103,7 +106,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession StaRuntime staRuntime, IMxAccessComObjectFactory factory, MxAccessEventQueue eventQueue, - Func? alarmCommandHandlerFactory) + Func? alarmCommandHandlerFactory) : this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue, alarmCommandHandlerFactory) { } @@ -141,7 +144,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession IMxAccessComObjectFactory factory, IMxAccessEventSink eventSink, MxAccessEventQueue eventQueue, - Func? alarmCommandHandlerFactory) + Func? alarmCommandHandlerFactory) { this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime)); this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); @@ -209,9 +212,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession // on convention alone; a future refactor that let a // command run off-STA would silently deadlock on // cross-apartment marshaling against the wnwrap consumer. + // Worker-9: the factory also receives the session's + // IMxAccessComObjectFactory so the subtag-fallback source + // (LmxSubtagAlarmSource) can create its OWN proxy-server COM + // object on this STA, isolated from the item pipeline's + // MxAccessSession. The factory call runs on the STA, so the + // resulting source is bound to the correct apartment. alarmCommandHandler = alarmCommandHandlerFactory( eventQueue, - EnsureOnAlarmConsumerThread); + EnsureOnAlarmConsumerThread, + factory); } commandDispatcher = new StaCommandDispatcher( staRuntime,