From d3b3d15018c50c9bf6c15317bcf283e2498ba702 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 29 May 2026 16:09:31 -0400 Subject: [PATCH] feat(dcl): DataConnectionActor native alarm subscribe + source-ref routing + unavailable signal --- .../Actors/DataConnectionActor.cs | 208 ++++++++++++++++++ .../DataConnectionActorAlarmTests.cs | 66 ++++++ 2 files changed, 274 insertions(+) create mode 100644 tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorAlarmTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs index dd897632..82e1965f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -3,6 +3,7 @@ using Akka.Event; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; using ZB.MOM.WW.ScadaBridge.SiteEventLogging; @@ -93,6 +94,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers /// private readonly Dictionary _subscribers = new(); + // ── Native alarm subscriptions (Task-10) ── + // The connection opens one alarm feed per source reference; transitions are + // routed to subscribers (NativeAlarmActors) by source-object reference. + /// sourceReference → set of subscriber actor refs (NativeAlarmActors), for routing + ref-count. + private readonly Dictionary> _alarmSourceSubscribers = new(); + /// sourceReference → optional condition filter (first subscriber wins). + private readonly Dictionary _alarmSourceFilter = new(); + /// sourceReference → adapter alarm subscription id. + private readonly Dictionary _alarmSubscriptionIds = new(); + /// sourceReferences whose adapter SubscribeAlarmsAsync is currently in flight. + private readonly HashSet _alarmSubscribesInFlight = new(); + /// /// Tracks total subscribed and resolved tags for health reporting. /// @@ -227,6 +240,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case SubscribeTagsRequest: case WriteTagRequest: case UnsubscribeTagsRequest: + case SubscribeAlarmsRequest: + case UnsubscribeAlarmsRequest: Stash.Stash(); break; case SubscribeCompleted sc: @@ -234,6 +249,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // apply it so its state survives into the next ReSubscribeAll. HandleSubscribeCompleted(sc); break; + case AlarmSubscribeCompleted asc: + HandleAlarmSubscribeCompleted(asc); + break; + case AlarmTransitionReceived: + // No live feed yet in Connecting; ignore (snapshot replays on subscribe). + break; case BrowseNodeCommand browse: // Browse is an interactive design-time query; never stash. The // adapter has no session yet in this state, so reply with a @@ -289,6 +310,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case UnsubscribeTagsRequest req: HandleUnsubscribe(req); break; + case SubscribeAlarmsRequest areq: + HandleSubscribeAlarms(areq); + break; + case UnsubscribeAlarmsRequest areq: + HandleUnsubscribeAlarms(areq); + break; + case AlarmSubscribeCompleted asc: + HandleAlarmSubscribeCompleted(asc); + break; + case AlarmTransitionReceived atr: + HandleAlarmTransitionReceived(atr); + break; case WriteTagRequest req: HandleWrite(req); break; @@ -398,6 +431,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // WP-9: Push bad quality for all subscribed tags on disconnect PushBadQualityForAllTags(); + // Task-10: notify native alarm subscribers the source feed is unavailable + // (mark mirrored alarms uncertain; the reconnect snapshot reconciles them). + PushAlarmSourceUnavailable(); + // Schedule reconnect attempt Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval); } @@ -414,15 +451,23 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers break; case SubscribeTagsRequest: case WriteTagRequest: + case SubscribeAlarmsRequest: Stash.Stash(); break; case UnsubscribeTagsRequest req: // Allow unsubscribe even during reconnect (for cleanup on instance stop) HandleUnsubscribe(req); break; + case UnsubscribeAlarmsRequest areq: + // Allow alarm unsubscribe during reconnect (cleanup on instance stop). + HandleUnsubscribeAlarms(areq); + break; case TagValueReceived: // Ignore — stale callback from previous connection break; + case AlarmTransitionReceived: + // Ignore — stale alarm callback from previous connection; ReSubscribeAll re-seeds. + break; case TagResolutionSucceeded: case TagResolutionFailed: // Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect @@ -432,6 +477,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // apply it so its state survives into the next ReSubscribeAll. HandleSubscribeCompleted(sc); break; + case AlarmSubscribeCompleted asc: + HandleAlarmSubscribeCompleted(asc); + break; case BrowseNodeCommand browse: // Browse is design-time and never stashed. While reconnecting // the adapter has no live session, so the adapter call will @@ -505,6 +553,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // WP-10: Transparent re-subscribe — re-establish all active subscriptions ReSubscribeAll(); + // Task-10: re-establish native alarm feeds (source replays a snapshot). + ReSubscribeAllAlarms(); BecomeConnected(); } @@ -1406,6 +1456,160 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality); } + // ── Native alarm subscriptions (Task-10) ── + + private void HandleSubscribeAlarms(SubscribeAlarmsRequest request) + { + var subscriber = Sender; + var now = DateTimeOffset.UtcNow; + + if (_adapter is not IAlarmSubscribableConnection alarmable) + { + subscriber.Tell(new SubscribeAlarmsResponse( + request.CorrelationId, request.InstanceUniqueName, false, + $"Connection '{_connectionName}' is not alarm-capable.", now)); + return; + } + + // Register the subscriber for routing (idempotent) before issuing the + // adapter subscribe so a transition that arrives mid-subscribe is routed. + if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs)) + { + subs = new HashSet(); + _alarmSourceSubscribers[request.SourceReference] = subs; + } + subs.Add(subscriber); + _alarmSourceFilter[request.SourceReference] = request.ConditionFilter; + + // If the adapter feed for this source is already (being) established, the + // existing subscription serves the new subscriber too. + if (_alarmSubscriptionIds.ContainsKey(request.SourceReference) || + _alarmSubscribesInFlight.Contains(request.SourceReference)) + { + subscriber.Tell(new SubscribeAlarmsResponse( + request.CorrelationId, request.InstanceUniqueName, true, null, now)); + return; + } + + _alarmSubscribesInFlight.Add(request.SourceReference); + var self = Self; + var generation = _adapterGeneration; + var sourceRef = request.SourceReference; + var filter = request.ConditionFilter; + var corr = request.CorrelationId; + var inst = request.InstanceUniqueName; + + alarmable.SubscribeAlarmsAsync(sourceRef, filter, + t => self.Tell(new AlarmTransitionReceived(t, generation))) + .ContinueWith(task => task.IsCompletedSuccessfully + ? new AlarmSubscribeCompleted(sourceRef, true, task.Result, null, subscriber, corr, inst) as object + : new AlarmSubscribeCompleted(sourceRef, false, null, + task.Exception?.GetBaseException().Message ?? "Unknown error", subscriber, corr, inst)) + .PipeTo(self); + } + + private void HandleAlarmSubscribeCompleted(AlarmSubscribeCompleted msg) + { + _alarmSubscribesInFlight.Remove(msg.SourceReference); + + if (msg.Success && msg.SubscriptionId != null) + { + _alarmSubscriptionIds[msg.SourceReference] = msg.SubscriptionId; + _log.Info("[{0}] Alarm feed subscribed for source {1}", _connectionName, msg.SourceReference); + } + else if (!msg.Success) + { + _log.Warning("[{0}] Alarm subscribe failed for source {1}: {2}", + _connectionName, msg.SourceReference, msg.Error); + } + + // ReplyTo is null for reconnect re-subscribes (no original requester to answer). + msg.ReplyTo?.Tell(new SubscribeAlarmsResponse( + msg.CorrelationId ?? string.Empty, msg.InstanceUniqueName ?? string.Empty, + msg.Success, msg.Error, DateTimeOffset.UtcNow)); + } + + private void HandleAlarmTransitionReceived(AlarmTransitionReceived msg) + { + // DataConnectionLayer-011: drop transitions from a disposed adapter after failover. + if (msg.AdapterGeneration != _adapterGeneration) + return; + + var transition = msg.Transition; + var notified = new HashSet(); + foreach (var (sourceRef, subs) in _alarmSourceSubscribers) + { + // A subscriber bound to source S receives a transition whose source + // object (or full reference) falls under S. + var match = transition.SourceObjectReference.StartsWith(sourceRef, StringComparison.Ordinal) + || transition.SourceReference.StartsWith(sourceRef, StringComparison.Ordinal); + if (!match) + continue; + + foreach (var sub in subs) + { + if (notified.Add(sub)) + sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition)); + } + } + } + + private void HandleUnsubscribeAlarms(UnsubscribeAlarmsRequest request) + { + if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs)) + return; + + subs.Remove(Sender); + if (subs.Count > 0) + return; + + // No subscribers remain for this source — tear down the adapter feed. + _alarmSourceSubscribers.Remove(request.SourceReference); + _alarmSourceFilter.Remove(request.SourceReference); + if (_alarmSubscriptionIds.Remove(request.SourceReference, out var subId) && + _adapter is IAlarmSubscribableConnection alarmable) + { + _ = alarmable.UnsubscribeAlarmsAsync(subId); + } + } + + /// Re-establishes all native alarm feeds after a reconnect; the source replays a snapshot. + private void ReSubscribeAllAlarms() + { + if (_adapter is not IAlarmSubscribableConnection alarmable || _alarmSourceSubscribers.Count == 0) + return; + + _alarmSubscriptionIds.Clear(); + _alarmSubscribesInFlight.Clear(); + var self = Self; + var generation = _adapterGeneration; + + foreach (var sourceRef in _alarmSourceSubscribers.Keys.ToList()) + { + var sr = sourceRef; + var filter = _alarmSourceFilter.GetValueOrDefault(sourceRef); + _alarmSubscribesInFlight.Add(sr); + alarmable.SubscribeAlarmsAsync(sr, filter, + t => self.Tell(new AlarmTransitionReceived(t, generation))) + .ContinueWith(task => task.IsCompletedSuccessfully + ? new AlarmSubscribeCompleted(sr, true, task.Result, null, null, null, null) as object + : new AlarmSubscribeCompleted(sr, false, null, + task.Exception?.GetBaseException().Message ?? "Unknown error", null, null, null)) + .PipeTo(self); + } + } + + /// Notifies alarm subscribers that the source feed is unavailable (connection lost). + private void PushAlarmSourceUnavailable() + { + var now = DateTimeOffset.UtcNow; + foreach (var (sourceRef, subs) in _alarmSourceSubscribers) + { + foreach (var sub in subs) + sub.Tell(new NativeAlarmSourceUnavailable(_connectionName, sourceRef, now)); + } + } + // ── Internal messages ── internal record AttemptConnect; @@ -1420,5 +1624,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers bool ConnectionLevelFailure = false); internal record SubscribeCompleted( SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList Results); + internal record AlarmTransitionReceived(NativeAlarmTransition Transition, int AdapterGeneration); + internal record AlarmSubscribeCompleted( + string SourceReference, bool Success, string? SubscriptionId, string? Error, + IActorRef? ReplyTo, string? CorrelationId, string? InstanceUniqueName); public record GetHealthReport; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorAlarmTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorAlarmTests.cs new file mode 100644 index 00000000..bc321570 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorAlarmTests.cs @@ -0,0 +1,66 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors; +using ZB.MOM.WW.ScadaBridge.HealthMonitoring; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests; + +/// Task-10: native alarm subscribe + source-ref routing + unavailable signal. +public class DataConnectionActorAlarmTests : TestKit +{ + private readonly ISiteHealthCollector _health = Substitute.For(); + private readonly IDataConnectionFactory _factory = Substitute.For(); + private readonly DataConnectionOptions _options = new() + { + ReconnectInterval = TimeSpan.FromMilliseconds(100), + TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200), + WriteTimeout = TimeSpan.FromSeconds(5) + }; + + private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) => + new(sourceRef, sourceObj, "AnalogLimit.Hi", AlarmTransitionKind.Raise, + new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500), + "Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90"); + + [Fact] + public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber() + { + AlarmTransitionCallback? cb = null; + var adapter = Substitute.For(); + adapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + ((IAlarmSubscribableConnection)adapter) + .SubscribeAlarmsAsync(Arg.Any(), Arg.Any(), + Arg.Do(c => cb = c), Arg.Any()) + .Returns(Task.FromResult("alarm-sub-1")); + + var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( + "conn", adapter, _options, _health, _factory, "OpcUa"))); + + actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); + ExpectMsg(m => m.Success); + + Assert.NotNull(cb); + cb!(Raise("Tank01.Hi", "Tank01")); + ExpectMsg(u => u.Transition.SourceObjectReference == "Tank01"); + } + + [Fact] + public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure() + { + var adapter = Substitute.For(); // not IAlarmSubscribableConnection + adapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( + "conn", adapter, _options, _health, _factory, "OpcUa"))); + + actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); + ExpectMsg(m => !m.Success && m.ErrorMessage != null); + } +}