feat(alarms): DriverInstanceActor forwards native OnAlarmEvent to parent (Phase B WS-4b)
This commit is contained in:
@@ -64,6 +64,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// <see cref="ISubscribable.OnDataChange"/>. The parent forwards to OpcUaPublishActor.</summary>
|
||||
public sealed record AttributeValuePublished(string DriverInstanceId, string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
||||
private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot);
|
||||
/// <summary>Published to the parent whenever the subscribed driver (an <see cref="IAlarmSource"/>) fires
|
||||
/// <see cref="IAlarmSource.OnAlarmEvent"/>. The parent (<see cref="DriverHostActor"/>) projects + routes it
|
||||
/// to the materialised Part 9 condition. Parallels <see cref="AttributeValuePublished"/>.</summary>
|
||||
public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args);
|
||||
private sealed record NativeAlarmRaised(AlarmEventArgs Args);
|
||||
public sealed class RetryConnect
|
||||
{
|
||||
public static readonly RetryConnect Instance = new();
|
||||
@@ -94,6 +99,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// do not need to re-send subscription requests after a reconnect.</summary>
|
||||
private ISubscriptionHandle? _subscriptionHandle;
|
||||
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
|
||||
private EventHandler<AlarmEventArgs>? _alarmEventHandler;
|
||||
|
||||
/// <summary>The references the host wants kept subscribed (set by <see cref="SetDesiredSubscriptions"/>).
|
||||
/// Re-applied on every entry into <c>Connected</c> so values resume after a reconnect or redeploy.</summary>
|
||||
@@ -222,6 +228,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Become(Connected);
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
});
|
||||
Receive<InitializeFailed>(msg =>
|
||||
{
|
||||
@@ -241,6 +248,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||||
Receive<SubscriptionFailed>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||||
// A native alarm transition can race in while still (re)connecting (the driver's feed runs on its
|
||||
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
|
||||
Receive<NativeAlarmRaised>(_ =>
|
||||
_log.Debug("DriverInstance {Id}: native alarm arrived during connect — dropped (feed re-delivers)", _driverInstanceId));
|
||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||
}
|
||||
|
||||
@@ -273,6 +284,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
|
||||
});
|
||||
Receive<DataChangeForward>(OnDataChangeForward);
|
||||
// Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent;
|
||||
// project it to the parent the same way DataChangeForward projects AttributeValuePublished.
|
||||
Receive<NativeAlarmRaised>(m => Context.Parent.Tell(new AttributeAlarmPublished(_driverInstanceId, m.Args)));
|
||||
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
|
||||
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
|
||||
Receive<SubscriptionEstablished>(msg =>
|
||||
@@ -299,6 +313,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Become(Connected);
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
});
|
||||
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
|
||||
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
||||
@@ -312,6 +327,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||||
Receive<SubscriptionFailed>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||||
// A native alarm transition can race in while still reconnecting (the driver's feed runs on its
|
||||
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
|
||||
Receive<NativeAlarmRaised>(_ =>
|
||||
_log.Debug("DriverInstance {Id}: native alarm arrived during reconnect — dropped (feed re-delivers)", _driverInstanceId));
|
||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
||||
}
|
||||
@@ -446,9 +465,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Tear down the event handler + null the handle. Called from Unsubscribe path, on
|
||||
/// PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't push
|
||||
/// data-change events to an actor that has lost its driver connection.</summary>
|
||||
/// <summary>Tear down the data-change + native-alarm event handlers + null the handle. Called from the
|
||||
/// Unsubscribe path, on PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't
|
||||
/// push data-change / alarm events to an actor that has lost its driver connection.</summary>
|
||||
private void DetachSubscription()
|
||||
{
|
||||
if (_driver is ISubscribable subscribable && _dataChangeHandler is not null)
|
||||
@@ -457,6 +476,26 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
_dataChangeHandler = null;
|
||||
_subscriptionHandle = null;
|
||||
DetachAlarmSource();
|
||||
}
|
||||
|
||||
/// <summary>Subscribe the driver's native alarm event (if it is an <see cref="IAlarmSource"/>),
|
||||
/// marshaling each transition to the actor thread. Idempotent; mirrors the OnDataChange attach.</summary>
|
||||
private void AttachAlarmSource()
|
||||
{
|
||||
if (_driver is not IAlarmSource src || _alarmEventHandler is not null) return;
|
||||
var self = Self;
|
||||
_alarmEventHandler = (_, e) => self.Tell(new NativeAlarmRaised(e));
|
||||
src.OnAlarmEvent += _alarmEventHandler;
|
||||
}
|
||||
|
||||
/// <summary>Symmetric teardown — called from <see cref="DetachSubscription"/> and PostStop so a stale
|
||||
/// handler never pushes to a disconnected actor.</summary>
|
||||
private void DetachAlarmSource()
|
||||
{
|
||||
if (_driver is IAlarmSource src && _alarmEventHandler is not null)
|
||||
src.OnAlarmEvent -= _alarmEventHandler;
|
||||
_alarmEventHandler = null;
|
||||
}
|
||||
|
||||
/// <summary>Records the host's desired subscription set without touching the live subscription.
|
||||
|
||||
+259
@@ -0,0 +1,259 @@
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Covers WS-4b: <see cref="DriverInstanceActor"/> subscribes a driver's native
|
||||
/// <see cref="IAlarmSource.OnAlarmEvent"/> (when the driver is an <see cref="IAlarmSource"/>)
|
||||
/// and forwards every transition to its parent as
|
||||
/// <see cref="DriverInstanceActor.AttributeAlarmPublished"/> — mirroring the OnDataChange →
|
||||
/// <see cref="DriverInstanceActor.AttributeValuePublished"/> forward. The driver fires
|
||||
/// <c>OnAlarmEvent</c> on its OWN thread; the actor marshals it onto the actor thread via Self.
|
||||
/// </summary>
|
||||
public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase
|
||||
{
|
||||
/// <summary>
|
||||
/// Driving an <see cref="IAlarmSource"/> driver to Connected then raising a native alarm forwards
|
||||
/// it to the parent as <see cref="DriverInstanceActor.AttributeAlarmPublished"/> carrying the
|
||||
/// driver-instance id + the original <see cref="AlarmEventArgs"/> (SourceNodeId preserved).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Native_alarm_is_forwarded_to_parent_as_AttributeAlarmPublished()
|
||||
{
|
||||
var driver = new AlarmSourceStubDriver();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
||||
// The alarm handler is attached after Become(Connected); wait for it to be wired before raising.
|
||||
AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2));
|
||||
// Also establish a data-change subscription so the OnDataChange path is wired (the alarm attach is
|
||||
// independent of Subscribe — this proves both event paths coexist on one driver).
|
||||
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "tag-z" }, TimeSpan.FromMilliseconds(100)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
driver.RaiseAlarm(new AlarmEventArgs(
|
||||
new StubAlarmHandle(),
|
||||
SourceNodeId: "src-node-7",
|
||||
ConditionId: "cond-1",
|
||||
AlarmType: "AnalogLimitAlarm.HiHi",
|
||||
Message: "level too high",
|
||||
Severity: AlarmSeverity.High,
|
||||
SourceTimestampUtc: DateTime.UtcNow,
|
||||
Kind: AlarmTransitionKind.Raise));
|
||||
|
||||
var published = parent.ExpectMsg<DriverInstanceActor.AttributeAlarmPublished>(TimeSpan.FromSeconds(2));
|
||||
published.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
|
||||
published.Args.SourceNodeId.ShouldBe("src-node-7");
|
||||
published.Args.ConditionId.ShouldBe("cond-1");
|
||||
published.Args.Kind.ShouldBe(AlarmTransitionKind.Raise);
|
||||
|
||||
// The same driver's OnDataChange still flows independently — alarm + value events coexist.
|
||||
driver.FireDataChange("tag-z", value: 9.0, statusCode: 0u);
|
||||
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2))
|
||||
.FullReference.ShouldBe("tag-z");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A driver that is NOT an <see cref="IAlarmSource"/> (only <see cref="ISubscribable"/>) connects
|
||||
/// and serves data changes normally — <c>AttachAlarmSource</c> is a safe no-op (no crash) and no
|
||||
/// <see cref="DriverInstanceActor.AttributeAlarmPublished"/> is ever produced.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Non_alarm_source_driver_serves_data_changes_and_never_publishes_alarms()
|
||||
{
|
||||
var driver = new SubscribableOnlyStubDriver();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
||||
|
||||
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
driver.FireDataChange("tag-a", value: 1.5, statusCode: 0u);
|
||||
|
||||
// Data-change forwarding still works (no crash on AttachAlarmSource for a non-IAlarmSource driver).
|
||||
var dc = parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2));
|
||||
dc.FullReference.ShouldBe("tag-a");
|
||||
// An AttributeAlarmPublished can never be produced for a non-IAlarmSource driver.
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// After a full reconnect cycle (Connected → Reconnecting → Connected), a single raised alarm still
|
||||
/// yields EXACTLY ONE <see cref="DriverInstanceActor.AttributeAlarmPublished"/> — the
|
||||
/// <c>_alarmEventHandler is not null</c> guard in <c>AttachAlarmSource</c> plus the detach on the
|
||||
/// Connected → Reconnecting transition prevent a double-attach (which would publish twice).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void Reconnect_does_not_double_attach_alarm_handler()
|
||||
{
|
||||
var driver = new AlarmSourceStubDriver();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
// Force Connected → Reconnecting (detaches) → Connected (re-attaches). InitializeCount climbs on
|
||||
// the retry; the alarm handler count must settle back to exactly one (no leaked extra handler).
|
||||
var initBefore = driver.InitializeCount;
|
||||
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
|
||||
AwaitCondition(() => driver.InitializeCount > initBefore, TimeSpan.FromSeconds(3));
|
||||
AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(3));
|
||||
|
||||
driver.RaiseAlarm(new AlarmEventArgs(
|
||||
new StubAlarmHandle(),
|
||||
SourceNodeId: "src-node-1",
|
||||
ConditionId: "cond-x",
|
||||
AlarmType: "T",
|
||||
Message: "m",
|
||||
Severity: AlarmSeverity.Low,
|
||||
SourceTimestampUtc: DateTime.UtcNow,
|
||||
Kind: AlarmTransitionKind.Raise));
|
||||
|
||||
// Exactly one forward — a double-attach would surface as a second AttributeAlarmPublished.
|
||||
parent.ExpectMsg<DriverInstanceActor.AttributeAlarmPublished>(TimeSpan.FromSeconds(2))
|
||||
.Args.SourceNodeId.ShouldBe("src-node-1");
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
// --- stub drivers ----------------------------------------------------------------------------
|
||||
|
||||
private class StubDriver : IDriver
|
||||
{
|
||||
/// <summary>Gets the number of times initialization was called.</summary>
|
||||
public int InitializeCount;
|
||||
|
||||
/// <summary>Gets the driver instance ID.</summary>
|
||||
public string DriverInstanceId => "alarm-stub-driver-1";
|
||||
/// <summary>Gets the driver type.</summary>
|
||||
public string DriverType => "Stub";
|
||||
|
||||
/// <summary>Initializes the driver.</summary>
|
||||
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||
{
|
||||
Interlocked.Increment(ref InitializeCount);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>Reinitializes the driver.</summary>
|
||||
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
/// <summary>Shuts down the driver.</summary>
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
/// <summary>Gets the health status of the driver.</summary>
|
||||
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
/// <summary>Gets the memory footprint of the driver.</summary>
|
||||
public long GetMemoryFootprint() => 0;
|
||||
/// <summary>Flushes optional caches in the driver.</summary>
|
||||
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>A driver that implements <see cref="ISubscribable"/> + <see cref="IAlarmSource"/> and lets the
|
||||
/// test raise <see cref="IAlarmSource.OnAlarmEvent"/> on demand (the driver fires it on its own thread in
|
||||
/// production; here the test thread stands in for that).</summary>
|
||||
private sealed class AlarmSourceStubDriver : StubDriver, ISubscribable, IAlarmSource
|
||||
{
|
||||
private readonly StubHandle _subHandle = new();
|
||||
|
||||
/// <summary>Occurs when data changes.</summary>
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
/// <summary>Server-pushed alarm transition.</summary>
|
||||
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
|
||||
|
||||
/// <summary>Number of live subscribers on <see cref="OnAlarmEvent"/> — proves attach/detach.</summary>
|
||||
public int AlarmSubscriberCount => OnAlarmEvent?.GetInvocationList().Length ?? 0;
|
||||
|
||||
/// <summary>Subscribes to the specified full references.</summary>
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<ISubscriptionHandle>(_subHandle);
|
||||
|
||||
/// <summary>Unsubscribes from the specified subscription handle.</summary>
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
/// <summary>Subscribes to alarm events for the specified node set.</summary>
|
||||
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<IAlarmSubscriptionHandle>(new StubAlarmHandle());
|
||||
|
||||
/// <summary>Cancels an alarm subscription.</summary>
|
||||
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
/// <summary>Acknowledges a batch of alarms.</summary>
|
||||
public Task AcknowledgeAsync(
|
||||
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
/// <summary>Fires <see cref="OnAlarmEvent"/> — stands in for the driver's native feed.</summary>
|
||||
public void RaiseAlarm(AlarmEventArgs args) => OnAlarmEvent?.Invoke(this, args);
|
||||
|
||||
/// <summary>Fires a data change event (keeps OnDataChange exercised; the actor wires both events).</summary>
|
||||
public void FireDataChange(string fullRef, object? value, uint statusCode)
|
||||
{
|
||||
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(_subHandle, fullRef, snapshot));
|
||||
}
|
||||
|
||||
private sealed class StubHandle : ISubscriptionHandle
|
||||
{
|
||||
/// <summary>Gets the diagnostic ID of the subscription.</summary>
|
||||
public string DiagnosticId => "stub-sub";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>A driver that is <see cref="ISubscribable"/> but NOT <see cref="IAlarmSource"/> — proves
|
||||
/// <c>AttachAlarmSource</c> is a no-op (no crash) and no alarm forward ever happens.</summary>
|
||||
private sealed class SubscribableOnlyStubDriver : StubDriver, ISubscribable
|
||||
{
|
||||
private readonly StubHandle _subHandle = new();
|
||||
|
||||
/// <summary>Occurs when data changes.</summary>
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
|
||||
/// <summary>Subscribes to the specified full references.</summary>
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<ISubscriptionHandle>(_subHandle);
|
||||
|
||||
/// <summary>Unsubscribes from the specified subscription handle.</summary>
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
/// <summary>Fires a data change event with the specified parameters.</summary>
|
||||
public void FireDataChange(string fullRef, object? value, uint statusCode)
|
||||
{
|
||||
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(_subHandle, fullRef, snapshot));
|
||||
}
|
||||
|
||||
private sealed class StubHandle : ISubscriptionHandle
|
||||
{
|
||||
/// <summary>Gets the diagnostic ID of the subscription.</summary>
|
||||
public string DiagnosticId => "stub-sub";
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Minimal <see cref="IAlarmSubscriptionHandle"/> for building <see cref="AlarmEventArgs"/>.</summary>
|
||||
private sealed class StubAlarmHandle : IAlarmSubscriptionHandle
|
||||
{
|
||||
/// <summary>Gets the diagnostic ID of the alarm subscription.</summary>
|
||||
public string DiagnosticId => "stub-alarm-sub";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user