From 7f313df7a6af1e8f24c94f876043a7c7a3ac05b0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 00:42:43 -0400 Subject: [PATCH] fix(alarms): subscribe native alarms to un-gate the IAlarmSource feed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase B native alarms never fired end-to-end: GalaxyDriver suppresses OnAlarmEvent until an alarm subscription exists (_alarmSubscriptions.Count > 0), but the runtime only attached the OnAlarmEvent handler and never called SubscribeAlarmsAsync — so the central feed stayed gated and no transition reached the Part 9 condition / /alerts. Unit tests passed because they inject through the IAlarmSource seam directly; the deferred live /run surfaced it. DriverHostActor computes per-driver alarm refs (alarm-bearing tags' FullNames) and hands them via SetDesiredSubscriptions; DriverInstanceActor calls SubscribeAlarmsAsync for IAlarmSource drivers on Connected entry and whenever alarm refs are pushed while Connected (the deploy path), idempotent via a cached handle reset on detach so reconnect re-subscribes. --- .../Drivers/DriverHostActor.cs | 18 ++++- .../Drivers/DriverInstanceActor.cs | 75 ++++++++++++++++++- .../DriverInstanceActorNativeAlarmTests.cs | 53 ++++++++++++- 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index bbf77430..b48adf81 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -854,6 +854,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers .ToArray(), StringComparer.Ordinal); + // Native-alarm subscription set: the alarm-bearing tags' FullNames (= the driver's + // ConditionId/AlarmFullReference). An IAlarmSource driver suppresses OnAlarmEvent until at least one + // alarm subscription exists (e.g. GalaxyDriver gates its central feed on _alarmSubscriptions), so the + // instance actor must SubscribeAlarmsAsync these refs to un-gate the feed. Routing stays by + // ConditionId in ForwardNativeAlarm; this set just opens (and scopes) the subscription. + var alarmRefsByDriver = composition.EquipmentTags + .Where(t => t.Alarm is not null) + .GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal) + .ToDictionary( + g => g.Key, + g => (IReadOnlyList)g.Select(t => t.FullName) + .Distinct(StringComparer.Ordinal) + .ToArray(), + StringComparer.Ordinal); + // Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors // VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to // the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux @@ -904,7 +919,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers foreach (var (driverId, entry) in _children) { var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); - entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval)); + var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty(); + entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs)); total += refs.Count; } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index d3c8a29f..07a05250 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -49,7 +49,18 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// a single message to drive the SubscribeBulk pass after an /// apply. Sending an empty set clears the desired subscription. /// - public sealed record SetDesiredSubscriptions(IReadOnlyList FullReferences, TimeSpan PublishingInterval); + /// + /// The native-alarm references (alarm-bearing equipment-tag FullNames = the driver's + /// ConditionId/AlarmFullReference) this driver should keep an alarm subscription open for. + /// An driver suppresses until at + /// least one alarm subscription exists, so the actor calls + /// with this set to un-gate the native feed. Empty + /// (or null) means the driver has no alarm tags. Defaults to null so non-alarm callers are unchanged. + /// + public sealed record SetDesiredSubscriptions( + IReadOnlyList FullReferences, + TimeSpan PublishingInterval, + IReadOnlyList? AlarmReferences = null); public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount); public sealed record SubscriptionFailed(string Reason); public sealed record Unsubscribe; @@ -69,6 +80,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// to the materialised Part 9 condition. Parallels . public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args); private sealed record NativeAlarmRaised(AlarmEventArgs Args); + /// Self-sent on Connected entry / when alarm refs are (re)pushed, to establish the native-alarm + /// subscription that un-gates an driver's feed. Handled async so the + /// call is bounded + off the synchronous handlers. + private sealed record SubscribeAlarms; public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); @@ -112,6 +127,17 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private IReadOnlyList _desiredRefs = Array.Empty(); private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1); + /// The native-alarm references the host wants kept subscribed (set by + /// ). Re-applied on every Connected entry so the alarm + /// feed is re-un-gated after a reconnect/redeploy. + private IReadOnlyList _desiredAlarmRefs = Array.Empty(); + + /// The active native-alarm subscription handle for an driver, or + /// null when none is established. Reset on so the next Connected entry + /// re-subscribes against the freshly re-initialised driver; the null check makes the subscribe + /// idempotent across repeated pushes. + private IAlarmSubscriptionHandle? _alarmSubscriptionHandle; + /// /// Gets or sets the timer scheduler for scheduling reconnection attempts. /// @@ -237,6 +263,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers PublishHealthSnapshot(); ResubscribeDesired(); AttachAlarmSource(); + SubscribeDesiredAlarms(); }); Receive(msg => { @@ -291,7 +318,13 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers StoreDesiredSubscriptions(msg); if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval)); else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe()); + // Native-alarm analogue: un-gate the IAlarmSource feed when alarm tags are (now) present. The + // common live path — a deploy delivers SetDesiredSubscriptions while the driver is already + // Connected — flows through HERE, so the alarm subscribe must happen on this message, not only + // on Connected entry. + SubscribeDesiredAlarms(); }); + ReceiveAsync(HandleSubscribeAlarmsAsync); Receive(OnDataChangeForward); // Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent; // project it to the parent the same way DataChangeForward projects AttributeValuePublished. @@ -325,6 +358,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers PublishHealthSnapshot(); ResubscribeDesired(); AttachAlarmSource(); + SubscribeDesiredAlarms(); }); // A failure here is a no-op regardless of generation — the retry timer keeps trying the // current config; only a (generation-matched) InitializeSucceeded transitions state. @@ -524,6 +558,44 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers if (_driver is IAlarmSource src && _alarmEventHandler is not null) src.OnAlarmEvent -= _alarmEventHandler; _alarmEventHandler = null; + // Drop the handle so the next Connected entry re-subscribes against the freshly re-initialised + // driver (its old alarm-subscription set was cleared on reconnect). The desired alarm refs persist. + _alarmSubscriptionHandle = null; + } + + /// Establish the native-alarm subscription that un-gates an driver's + /// feed — the driver suppresses until at least one alarm + /// subscription exists. Self-sends the async the Connected behaviour + /// handles. Idempotent: a no-op unless the driver is an , alarm refs are + /// desired, and no subscription is yet established. + private void SubscribeDesiredAlarms() + { + if (_driver is IAlarmSource && _desiredAlarmRefs.Count > 0 && _alarmSubscriptionHandle is null) + Self.Tell(new SubscribeAlarms()); + } + + /// Calls the driver's (bounded) to register the + /// alarm subscription that un-gates its native feed, caching the returned handle. Re-checks the guard + /// (the desired set may have cleared, or another SubscribeAlarms may have already established a handle, + /// while this was queued). Failures are logged and retried on the next Connected entry — the feed simply + /// stays gated until then. + private async Task HandleSubscribeAlarmsAsync(SubscribeAlarms _) + { + if (_driver is not IAlarmSource src || _desiredAlarmRefs.Count == 0 || _alarmSubscriptionHandle is not null) + return; + var refs = _desiredAlarmRefs; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + try + { + _alarmSubscriptionHandle = await src.SubscribeAlarmsAsync(refs, cts.Token); + _log.Info("DriverInstance {Id}: native-alarm subscription established for {Count} alarm ref(s) ({Diag})", + _driverInstanceId, refs.Count, _alarmSubscriptionHandle.DiagnosticId); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverInstance {Id}: native-alarm subscription failed — feed stays gated until reconnect", + _driverInstanceId); + } } /// Records the host's desired subscription set without touching the live subscription. @@ -532,6 +604,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { _desiredRefs = msg.FullReferences; _desiredInterval = msg.PublishingInterval; + _desiredAlarmRefs = msg.AlarmReferences ?? Array.Empty(); } /// Re-establish the desired subscription after (re)connecting. Self-sends the one-shot diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs index b9d2dfcf..a549e701 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs @@ -64,6 +64,42 @@ public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase .FullReference.ShouldBe("tag-z"); } + /// + /// The native-alarm path is gated at the driver: an suppresses + /// OnAlarmEvent until at least one alarm subscription exists (e.g. GalaxyDriver gates its + /// central feed). When the host pushes a + /// carrying native-alarm refs to a Connected driver — the live deploy path — the actor must + /// call with those refs to un-gate the feed, and must + /// NOT re-subscribe when the same set is redeployed (the feed is already un-gated). + /// + [Fact] + public async Task Alarm_subscription_is_established_when_alarm_refs_are_pushed_while_connected() + { + var driver = new AlarmSourceStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2)); // reached Connected + driver.SubscribeAlarmsCallCount.ShouldBe(0, "no alarm subscription before any alarm refs are pushed"); + + // A deploy delivers the desired set with a native-alarm ref while the driver is already Connected. + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions( + Array.Empty(), TimeSpan.FromMilliseconds(100), new[] { "Temp.HiHi" })); + + AwaitCondition(() => driver.SubscribeAlarmsCallCount == 1, TimeSpan.FromSeconds(2)); + driver.LastAlarmRefs.ShouldBe(new[] { "Temp.HiHi" }); + + // Redeploying the same alarm set must NOT re-subscribe (idempotent — already un-gated). Round-trip a + // data Subscribe to flush the mailbox so the second SetDesiredSubscriptions is fully processed first. + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions( + Array.Empty(), TimeSpan.FromMilliseconds(100), new[] { "Temp.HiHi" })); + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-z" }, TimeSpan.FromMilliseconds(100)), + TimeSpan.FromSeconds(3)); + driver.SubscribeAlarmsCallCount.ShouldBe(1, "an already-established alarm subscription is not re-issued"); + } + /// /// A driver that is NOT an (only ) connects /// and serves data changes normally — AttachAlarmSource is a safe no-op (no crash) and no @@ -266,10 +302,23 @@ public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => Task.CompletedTask; + private int _subscribeAlarmsCallCount; + + /// Number of calls — proves the actor established the + /// native-alarm subscription that un-gates the feed (incremented on the actor dispatcher thread). + public int SubscribeAlarmsCallCount => Volatile.Read(ref _subscribeAlarmsCallCount); + + /// The node set handed to the most recent call. + public IReadOnlyList? LastAlarmRefs { get; private set; } + /// Subscribes to alarm events for the specified node set. public Task SubscribeAlarmsAsync( - IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) => - Task.FromResult(new StubAlarmHandle()); + IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) + { + LastAlarmRefs = sourceNodeIds; + Interlocked.Increment(ref _subscribeAlarmsCallCount); + return Task.FromResult(new StubAlarmHandle()); + } /// Cancels an alarm subscription. public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) =>