feat(dcl): DataConnectionActor native alarm subscribe + source-ref routing + unavailable signal

This commit is contained in:
Joseph Doherty
2026-05-29 16:09:31 -04:00
parent ba278736af
commit d3b3d15018
2 changed files with 274 additions and 0 deletions
@@ -3,6 +3,7 @@ using Akka.Event;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
@@ -93,6 +94,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
/// </summary>
private readonly Dictionary<string, IActorRef> _subscribers = new();
// ── Native alarm subscriptions (Task-10) ──
// The connection opens one alarm feed per source reference; transitions are
// routed to subscribers (NativeAlarmActors) by source-object reference.
/// <summary>sourceReference → set of subscriber actor refs (NativeAlarmActors), for routing + ref-count.</summary>
private readonly Dictionary<string, HashSet<IActorRef>> _alarmSourceSubscribers = new();
/// <summary>sourceReference → optional condition filter (first subscriber wins).</summary>
private readonly Dictionary<string, string?> _alarmSourceFilter = new();
/// <summary>sourceReference → adapter alarm subscription id.</summary>
private readonly Dictionary<string, string> _alarmSubscriptionIds = new();
/// <summary>sourceReferences whose adapter SubscribeAlarmsAsync is currently in flight.</summary>
private readonly HashSet<string> _alarmSubscribesInFlight = new();
/// <summary>
/// Tracks total subscribed and resolved tags for health reporting.
/// </summary>
@@ -227,6 +240,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case SubscribeTagsRequest:
case WriteTagRequest:
case UnsubscribeTagsRequest:
case SubscribeAlarmsRequest:
case UnsubscribeAlarmsRequest:
Stash.Stash();
break;
case SubscribeCompleted sc:
@@ -234,6 +249,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case AlarmTransitionReceived:
// No live feed yet in Connecting; ignore (snapshot replays on subscribe).
break;
case BrowseNodeCommand browse:
// Browse is an interactive design-time query; never stash. The
// adapter has no session yet in this state, so reply with a
@@ -289,6 +310,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case UnsubscribeTagsRequest req:
HandleUnsubscribe(req);
break;
case SubscribeAlarmsRequest areq:
HandleSubscribeAlarms(areq);
break;
case UnsubscribeAlarmsRequest areq:
HandleUnsubscribeAlarms(areq);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case AlarmTransitionReceived atr:
HandleAlarmTransitionReceived(atr);
break;
case WriteTagRequest req:
HandleWrite(req);
break;
@@ -398,6 +431,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// WP-9: Push bad quality for all subscribed tags on disconnect
PushBadQualityForAllTags();
// Task-10: notify native alarm subscribers the source feed is unavailable
// (mark mirrored alarms uncertain; the reconnect snapshot reconciles them).
PushAlarmSourceUnavailable();
// Schedule reconnect attempt
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
@@ -414,15 +451,23 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
break;
case SubscribeTagsRequest:
case WriteTagRequest:
case SubscribeAlarmsRequest:
Stash.Stash();
break;
case UnsubscribeTagsRequest req:
// Allow unsubscribe even during reconnect (for cleanup on instance stop)
HandleUnsubscribe(req);
break;
case UnsubscribeAlarmsRequest areq:
// Allow alarm unsubscribe during reconnect (cleanup on instance stop).
HandleUnsubscribeAlarms(areq);
break;
case TagValueReceived:
// Ignore — stale callback from previous connection
break;
case AlarmTransitionReceived:
// Ignore — stale alarm callback from previous connection; ReSubscribeAll re-seeds.
break;
case TagResolutionSucceeded:
case TagResolutionFailed:
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
@@ -432,6 +477,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case BrowseNodeCommand browse:
// Browse is design-time and never stashed. While reconnecting
// the adapter has no live session, so the adapter call will
@@ -505,6 +553,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// WP-10: Transparent re-subscribe — re-establish all active subscriptions
ReSubscribeAll();
// Task-10: re-establish native alarm feeds (source replays a snapshot).
ReSubscribeAllAlarms();
BecomeConnected();
}
@@ -1406,6 +1456,160 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
}
// ── Native alarm subscriptions (Task-10) ──
private void HandleSubscribeAlarms(SubscribeAlarmsRequest request)
{
var subscriber = Sender;
var now = DateTimeOffset.UtcNow;
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Connection '{_connectionName}' is not alarm-capable.", now));
return;
}
// Register the subscriber for routing (idempotent) before issuing the
// adapter subscribe so a transition that arrives mid-subscribe is routed.
if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs))
{
subs = new HashSet<IActorRef>();
_alarmSourceSubscribers[request.SourceReference] = subs;
}
subs.Add(subscriber);
_alarmSourceFilter[request.SourceReference] = request.ConditionFilter;
// If the adapter feed for this source is already (being) established, the
// existing subscription serves the new subscriber too.
if (_alarmSubscriptionIds.ContainsKey(request.SourceReference) ||
_alarmSubscribesInFlight.Contains(request.SourceReference))
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, true, null, now));
return;
}
_alarmSubscribesInFlight.Add(request.SourceReference);
var self = Self;
var generation = _adapterGeneration;
var sourceRef = request.SourceReference;
var filter = request.ConditionFilter;
var corr = request.CorrelationId;
var inst = request.InstanceUniqueName;
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => task.IsCompletedSuccessfully
? new AlarmSubscribeCompleted(sourceRef, true, task.Result, null, subscriber, corr, inst) as object
: new AlarmSubscribeCompleted(sourceRef, false, null,
task.Exception?.GetBaseException().Message ?? "Unknown error", subscriber, corr, inst))
.PipeTo(self);
}
private void HandleAlarmSubscribeCompleted(AlarmSubscribeCompleted msg)
{
_alarmSubscribesInFlight.Remove(msg.SourceReference);
if (msg.Success && msg.SubscriptionId != null)
{
_alarmSubscriptionIds[msg.SourceReference] = msg.SubscriptionId;
_log.Info("[{0}] Alarm feed subscribed for source {1}", _connectionName, msg.SourceReference);
}
else if (!msg.Success)
{
_log.Warning("[{0}] Alarm subscribe failed for source {1}: {2}",
_connectionName, msg.SourceReference, msg.Error);
}
// ReplyTo is null for reconnect re-subscribes (no original requester to answer).
msg.ReplyTo?.Tell(new SubscribeAlarmsResponse(
msg.CorrelationId ?? string.Empty, msg.InstanceUniqueName ?? string.Empty,
msg.Success, msg.Error, DateTimeOffset.UtcNow));
}
private void HandleAlarmTransitionReceived(AlarmTransitionReceived msg)
{
// DataConnectionLayer-011: drop transitions from a disposed adapter after failover.
if (msg.AdapterGeneration != _adapterGeneration)
return;
var transition = msg.Transition;
var notified = new HashSet<IActorRef>();
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
// A subscriber bound to source S receives a transition whose source
// object (or full reference) falls under S.
var match = transition.SourceObjectReference.StartsWith(sourceRef, StringComparison.Ordinal)
|| transition.SourceReference.StartsWith(sourceRef, StringComparison.Ordinal);
if (!match)
continue;
foreach (var sub in subs)
{
if (notified.Add(sub))
sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition));
}
}
}
private void HandleUnsubscribeAlarms(UnsubscribeAlarmsRequest request)
{
if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs))
return;
subs.Remove(Sender);
if (subs.Count > 0)
return;
// No subscribers remain for this source — tear down the adapter feed.
_alarmSourceSubscribers.Remove(request.SourceReference);
_alarmSourceFilter.Remove(request.SourceReference);
if (_alarmSubscriptionIds.Remove(request.SourceReference, out var subId) &&
_adapter is IAlarmSubscribableConnection alarmable)
{
_ = alarmable.UnsubscribeAlarmsAsync(subId);
}
}
/// <summary>Re-establishes all native alarm feeds after a reconnect; the source replays a snapshot.</summary>
private void ReSubscribeAllAlarms()
{
if (_adapter is not IAlarmSubscribableConnection alarmable || _alarmSourceSubscribers.Count == 0)
return;
_alarmSubscriptionIds.Clear();
_alarmSubscribesInFlight.Clear();
var self = Self;
var generation = _adapterGeneration;
foreach (var sourceRef in _alarmSourceSubscribers.Keys.ToList())
{
var sr = sourceRef;
var filter = _alarmSourceFilter.GetValueOrDefault(sourceRef);
_alarmSubscribesInFlight.Add(sr);
alarmable.SubscribeAlarmsAsync(sr, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => task.IsCompletedSuccessfully
? new AlarmSubscribeCompleted(sr, true, task.Result, null, null, null, null) as object
: new AlarmSubscribeCompleted(sr, false, null,
task.Exception?.GetBaseException().Message ?? "Unknown error", null, null, null))
.PipeTo(self);
}
}
/// <summary>Notifies alarm subscribers that the source feed is unavailable (connection lost).</summary>
private void PushAlarmSourceUnavailable()
{
var now = DateTimeOffset.UtcNow;
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
foreach (var sub in subs)
sub.Tell(new NativeAlarmSourceUnavailable(_connectionName, sourceRef, now));
}
}
// ── Internal messages ──
internal record AttemptConnect;
@@ -1420,5 +1624,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
internal record AlarmTransitionReceived(NativeAlarmTransition Transition, int AdapterGeneration);
internal record AlarmSubscribeCompleted(
string SourceReference, bool Success, string? SubscriptionId, string? Error,
IActorRef? ReplyTo, string? CorrelationId, string? InstanceUniqueName);
public record GetHealthReport;
}
@@ -0,0 +1,66 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
/// <summary>Task-10: native alarm subscribe + source-ref routing + unavailable signal.</summary>
public class DataConnectionActorAlarmTests : TestKit
{
private readonly ISiteHealthCollector _health = Substitute.For<ISiteHealthCollector>();
private readonly IDataConnectionFactory _factory = Substitute.For<IDataConnectionFactory>();
private readonly DataConnectionOptions _options = new()
{
ReconnectInterval = TimeSpan.FromMilliseconds(100),
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
WriteTimeout = TimeSpan.FromSeconds(5)
};
private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) =>
new(sourceRef, sourceObj, "AnalogLimit.Hi", AlarmTransitionKind.Raise,
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500),
"Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90");
[Fact]
public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber()
{
AlarmTransitionCallback? cb = null;
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
((IAlarmSubscribableConnection)adapter)
.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
Arg.Do<AlarmTransitionCallback>(c => cb = c), Arg.Any<CancellationToken>())
.Returns(Task.FromResult("alarm-sub-1"));
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
Assert.NotNull(cb);
cb!(Raise("Tank01.Hi", "Tank01"));
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.SourceObjectReference == "Tank01");
}
[Fact]
public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure()
{
var adapter = Substitute.For<IDataConnection>(); // not IAlarmSubscribableConnection
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
ExpectMsg<SubscribeAlarmsResponse>(m => !m.Success && m.ErrorMessage != null);
}
}