From 25c3bd16ba2f1848b3c7ead7517c959ca17c0b59 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 14 Jun 2026 03:24:24 -0400 Subject: [PATCH] feat(alarms): DriverInstanceActor forwards native OnAlarmEvent to parent (Phase B WS-4b) --- .../Drivers/DriverInstanceActor.cs | 45 ++- .../DriverInstanceActorNativeAlarmTests.cs | 259 ++++++++++++++++++ 2 files changed, 301 insertions(+), 3 deletions(-) create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index 8d94e46f..866a48f0 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -64,6 +64,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// . The parent forwards to OpcUaPublishActor. public sealed record AttributeValuePublished(string DriverInstanceId, string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot); + /// Published to the parent whenever the subscribed driver (an ) fires + /// . The parent () projects + routes it + /// to the materialised Part 9 condition. Parallels . + public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args); + private sealed record NativeAlarmRaised(AlarmEventArgs Args); public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); @@ -94,6 +99,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// do not need to re-send subscription requests after a reconnect. private ISubscriptionHandle? _subscriptionHandle; private EventHandler? _dataChangeHandler; + private EventHandler? _alarmEventHandler; /// The references the host wants kept subscribed (set by ). /// Re-applied on every entry into Connected so values resume after a reconnect or redeploy. @@ -222,6 +228,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Become(Connected); PublishHealthSnapshot(); ResubscribeDesired(); + AttachAlarmSource(); }); Receive(msg => { @@ -241,6 +248,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); + // A native alarm transition can race in while still (re)connecting (the driver's feed runs on its + // own thread); drop it — the feed re-delivers active alarms once Connected. Trace only. + Receive(_ => + _log.Debug("DriverInstance {Id}: native alarm arrived during connect — dropped (feed re-delivers)", _driverInstanceId)); Receive(_ => PublishHealthSnapshot()); } @@ -273,6 +284,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe()); }); Receive(OnDataChangeForward); + // Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent; + // project it to the parent the same way DataChangeForward projects AttributeValuePublished. + Receive(m => Context.Parent.Tell(new AttributeAlarmPublished(_driverInstanceId, m.Args))); // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. Receive(msg => @@ -299,6 +313,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Become(Connected); PublishHealthSnapshot(); ResubscribeDesired(); + AttachAlarmSource(); }); Receive(_ => { /* keep retrying via timer */ }); Receive(StoreDesiredSubscriptions); @@ -312,6 +327,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); + // A native alarm transition can race in while still reconnecting (the driver's feed runs on its + // own thread); drop it — the feed re-delivers active alarms once Connected. Trace only. + Receive(_ => + _log.Debug("DriverInstance {Id}: native alarm arrived during reconnect — dropped (feed re-delivers)", _driverInstanceId)); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } @@ -446,9 +465,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers } } - /// Tear down the event handler + null the handle. Called from Unsubscribe path, on - /// PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't push - /// data-change events to an actor that has lost its driver connection. + /// Tear down the data-change + native-alarm event handlers + null the handle. Called from the + /// Unsubscribe path, on PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't + /// push data-change / alarm events to an actor that has lost its driver connection. private void DetachSubscription() { if (_driver is ISubscribable subscribable && _dataChangeHandler is not null) @@ -457,6 +476,26 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers } _dataChangeHandler = null; _subscriptionHandle = null; + DetachAlarmSource(); + } + + /// Subscribe the driver's native alarm event (if it is an ), + /// marshaling each transition to the actor thread. Idempotent; mirrors the OnDataChange attach. + private void AttachAlarmSource() + { + if (_driver is not IAlarmSource src || _alarmEventHandler is not null) return; + var self = Self; + _alarmEventHandler = (_, e) => self.Tell(new NativeAlarmRaised(e)); + src.OnAlarmEvent += _alarmEventHandler; + } + + /// Symmetric teardown — called from and PostStop so a stale + /// handler never pushes to a disconnected actor. + private void DetachAlarmSource() + { + if (_driver is IAlarmSource src && _alarmEventHandler is not null) + src.OnAlarmEvent -= _alarmEventHandler; + _alarmEventHandler = null; } /// Records the host's desired subscription set without touching the live subscription. diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs new file mode 100644 index 00000000..2a239e18 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorNativeAlarmTests.cs @@ -0,0 +1,259 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Covers WS-4b: subscribes a driver's native +/// (when the driver is an ) +/// and forwards every transition to its parent as +/// — mirroring the OnDataChange → +/// forward. The driver fires +/// OnAlarmEvent on its OWN thread; the actor marshals it onto the actor thread via Self. +/// +public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase +{ + /// + /// Driving an driver to Connected then raising a native alarm forwards + /// it to the parent as carrying the + /// driver-instance id + the original (SourceNodeId preserved). + /// + [Fact] + public async Task Native_alarm_is_forwarded_to_parent_as_AttributeAlarmPublished() + { + var driver = new AlarmSourceStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + // The alarm handler is attached after Become(Connected); wait for it to be wired before raising. + AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2)); + // Also establish a data-change subscription so the OnDataChange path is wired (the alarm attach is + // independent of Subscribe — this proves both event paths coexist on one driver). + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-z" }, TimeSpan.FromMilliseconds(100)), + TimeSpan.FromSeconds(3)); + + driver.RaiseAlarm(new AlarmEventArgs( + new StubAlarmHandle(), + SourceNodeId: "src-node-7", + ConditionId: "cond-1", + AlarmType: "AnalogLimitAlarm.HiHi", + Message: "level too high", + Severity: AlarmSeverity.High, + SourceTimestampUtc: DateTime.UtcNow, + Kind: AlarmTransitionKind.Raise)); + + var published = parent.ExpectMsg(TimeSpan.FromSeconds(2)); + published.DriverInstanceId.ShouldBe(driver.DriverInstanceId); + published.Args.SourceNodeId.ShouldBe("src-node-7"); + published.Args.ConditionId.ShouldBe("cond-1"); + published.Args.Kind.ShouldBe(AlarmTransitionKind.Raise); + + // The same driver's OnDataChange still flows independently — alarm + value events coexist. + driver.FireDataChange("tag-z", value: 9.0, statusCode: 0u); + parent.ExpectMsg(TimeSpan.FromSeconds(2)) + .FullReference.ShouldBe("tag-z"); + } + + /// + /// A driver that is NOT an (only ) connects + /// and serves data changes normally — AttachAlarmSource is a safe no-op (no crash) and no + /// is ever produced. + /// + [Fact] + public async Task Non_alarm_source_driver_serves_data_changes_and_never_publishes_alarms() + { + var driver = new SubscribableOnlyStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)), + TimeSpan.FromSeconds(3)); + + driver.FireDataChange("tag-a", value: 1.5, statusCode: 0u); + + // Data-change forwarding still works (no crash on AttachAlarmSource for a non-IAlarmSource driver). + var dc = parent.ExpectMsg(TimeSpan.FromSeconds(2)); + dc.FullReference.ShouldBe("tag-a"); + // An AttributeAlarmPublished can never be produced for a non-IAlarmSource driver. + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + /// + /// After a full reconnect cycle (Connected → Reconnecting → Connected), a single raised alarm still + /// yields EXACTLY ONE — the + /// _alarmEventHandler is not null guard in AttachAlarmSource plus the detach on the + /// Connected → Reconnecting transition prevent a double-attach (which would publish twice). + /// + [Fact] + public void Reconnect_does_not_double_attach_alarm_handler() + { + var driver = new AlarmSourceStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50))); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2)); + + // Force Connected → Reconnecting (detaches) → Connected (re-attaches). InitializeCount climbs on + // the retry; the alarm handler count must settle back to exactly one (no leaked extra handler). + var initBefore = driver.InitializeCount; + actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip")); + AwaitCondition(() => driver.InitializeCount > initBefore, TimeSpan.FromSeconds(3)); + AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(3)); + + driver.RaiseAlarm(new AlarmEventArgs( + new StubAlarmHandle(), + SourceNodeId: "src-node-1", + ConditionId: "cond-x", + AlarmType: "T", + Message: "m", + Severity: AlarmSeverity.Low, + SourceTimestampUtc: DateTime.UtcNow, + Kind: AlarmTransitionKind.Raise)); + + // Exactly one forward — a double-attach would surface as a second AttributeAlarmPublished. + parent.ExpectMsg(TimeSpan.FromSeconds(2)) + .Args.SourceNodeId.ShouldBe("src-node-1"); + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + // --- stub drivers ---------------------------------------------------------------------------- + + private class StubDriver : IDriver + { + /// Gets the number of times initialization was called. + public int InitializeCount; + + /// Gets the driver instance ID. + public string DriverInstanceId => "alarm-stub-driver-1"; + /// Gets the driver type. + public string DriverType => "Stub"; + + /// Initializes the driver. + public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + { + Interlocked.Increment(ref InitializeCount); + return Task.CompletedTask; + } + + /// Reinitializes the driver. + public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Shuts down the driver. + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// Gets the health status of the driver. + public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null); + /// Gets the memory footprint of the driver. + public long GetMemoryFootprint() => 0; + /// Flushes optional caches in the driver. + public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } + + /// A driver that implements + and lets the + /// test raise on demand (the driver fires it on its own thread in + /// production; here the test thread stands in for that). + private sealed class AlarmSourceStubDriver : StubDriver, ISubscribable, IAlarmSource + { + private readonly StubHandle _subHandle = new(); + + /// Occurs when data changes. + public event EventHandler? OnDataChange; + /// Server-pushed alarm transition. + public event EventHandler? OnAlarmEvent; + + /// Number of live subscribers on — proves attach/detach. + public int AlarmSubscriberCount => OnAlarmEvent?.GetInvocationList().Length ?? 0; + + /// Subscribes to the specified full references. + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) => + Task.FromResult(_subHandle); + + /// Unsubscribes from the specified subscription handle. + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Subscribes to alarm events for the specified node set. + public Task SubscribeAlarmsAsync( + IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) => + Task.FromResult(new StubAlarmHandle()); + + /// Cancels an alarm subscription. + public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Acknowledges a batch of alarms. + public Task AcknowledgeAsync( + IReadOnlyList acknowledgements, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Fires — stands in for the driver's native feed. + public void RaiseAlarm(AlarmEventArgs args) => OnAlarmEvent?.Invoke(this, args); + + /// Fires a data change event (keeps OnDataChange exercised; the actor wires both events). + public void FireDataChange(string fullRef, object? value, uint statusCode) + { + var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow); + OnDataChange?.Invoke(this, new DataChangeEventArgs(_subHandle, fullRef, snapshot)); + } + + private sealed class StubHandle : ISubscriptionHandle + { + /// Gets the diagnostic ID of the subscription. + public string DiagnosticId => "stub-sub"; + } + } + + /// A driver that is but NOT — proves + /// AttachAlarmSource is a no-op (no crash) and no alarm forward ever happens. + private sealed class SubscribableOnlyStubDriver : StubDriver, ISubscribable + { + private readonly StubHandle _subHandle = new(); + + /// Occurs when data changes. + public event EventHandler? OnDataChange; + + /// Subscribes to the specified full references. + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) => + Task.FromResult(_subHandle); + + /// Unsubscribes from the specified subscription handle. + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Fires a data change event with the specified parameters. + public void FireDataChange(string fullRef, object? value, uint statusCode) + { + var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow); + OnDataChange?.Invoke(this, new DataChangeEventArgs(_subHandle, fullRef, snapshot)); + } + + private sealed class StubHandle : ISubscriptionHandle + { + /// Gets the diagnostic ID of the subscription. + public string DiagnosticId => "stub-sub"; + } + } + + /// Minimal for building . + private sealed class StubAlarmHandle : IAlarmSubscriptionHandle + { + /// Gets the diagnostic ID of the alarm subscription. + public string DiagnosticId => "stub-alarm-sub"; + } +}