using Akka.Actor;
using Akka.TestKit.Xunit2;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
/// Task-10: native alarm subscribe + source-ref routing + unavailable signal.
public class DataConnectionActorAlarmTests : TestKit
{
private readonly ISiteHealthCollector _health = Substitute.For();
private readonly IDataConnectionFactory _factory = Substitute.For();
private readonly DataConnectionOptions _options = new()
{
ReconnectInterval = TimeSpan.FromMilliseconds(100),
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
WriteTimeout = TimeSpan.FromSeconds(5)
};
private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) =>
Raise(sourceRef, sourceObj, "AnalogLimit.Hi");
private static NativeAlarmTransition Raise(string sourceRef, string sourceObj, string typeName,
AlarmTransitionKind kind = AlarmTransitionKind.Raise) =>
new(sourceRef, sourceObj, typeName, kind,
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500),
"Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90");
private static (IDataConnection Adapter, Func Cb) BuildAlarmAdapter()
{
AlarmTransitionCallback? cb = null;
var adapter = Substitute.For();
adapter.ConnectAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
((IAlarmSubscribableConnection)adapter)
.SubscribeAlarmsAsync(Arg.Any(), Arg.Any(),
Arg.Do(c => cb = c), Arg.Any())
.Returns(Task.FromResult("alarm-sub-1"));
return (adapter, () => cb);
}
[Fact]
public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber()
{
AlarmTransitionCallback? cb = null;
var adapter = Substitute.For();
adapter.ConnectAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
((IAlarmSubscribableConnection)adapter)
.SubscribeAlarmsAsync(Arg.Any(), Arg.Any(),
Arg.Do(c => cb = c), Arg.Any())
.Returns(Task.FromResult("alarm-sub-1"));
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
Assert.NotNull(cb);
cb!(Raise("Tank01.Hi", "Tank01"));
ExpectMsg(u => u.Transition.SourceObjectReference == "Tank01");
}
[Fact]
public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure()
{
var adapter = Substitute.For(); // not IAlarmSubscribableConnection
adapter.ConnectAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
ExpectMsg(m => !m.Success && m.ErrorMessage != null);
}
// ── M2.4 (#8): conditionFilter is now applied client-side in the actor ──
[Fact]
public void SubscribeAlarms_WithTypeFilter_DeliversOnlyMatchingTypes()
{
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
"AnalogLimit.Hi,AnalogLimit.Lo", DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Non-matching type is dropped (no message delivered).
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi"));
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
// Matching type is delivered.
cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi"));
ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi");
}
[Fact]
public void SubscribeAlarms_WithNullFilter_DeliversAllTypes()
{
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi"));
ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.HiHi");
cb!(Raise("Tank01.Lo", "Tank01", "DiscreteAlarm"));
ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm");
}
[Fact]
public void SubscribeAlarms_FilterMatch_IgnoresCaseAndWhitespace()
{
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
" analoglimit.hi ,\tDISCRETEALARM ", DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); // case differs from filter
ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi");
cb!(Raise("Tank01.Disc", "Tank01", "DiscreteAlarm"));
ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm");
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); // not listed
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
}
[Fact]
public void SubscribeAlarms_GatewayWideFeed_IsFilteredClientSide()
{
// MxGateway has no server-side filter: its adapter opens ONE gateway-wide
// feed and the actor is the authoritative gate. A filtered source must
// only see its own matching types even though the feed carries everything.
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "MxGateway")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor",
"HighTemp", DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Gateway-wide feed delivers a transition for a different source object —
// dropped by source routing.
cb!(Raise("Pump.Fault", "Pump", "HighTemp"));
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
// Right source, wrong type — dropped by the client-side type gate.
cb!(Raise("Reactor.LowTemp", "Reactor", "LowTemp"));
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
// Right source, right type — delivered.
cb!(Raise("Reactor.HighTemp", "Reactor", "HighTemp"));
ExpectMsg(u =>
u.Transition.SourceObjectReference == "Reactor" && u.Transition.AlarmTypeName == "HighTemp");
}
[Fact]
public void SubscribeAlarms_WithFilter_StillForwardsSnapshotCompleteSentinel()
{
// The SnapshotComplete framing sentinel (empty AlarmTypeName) must survive
// the type gate so the NativeAlarmActor's snapshot swap can complete.
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
"AnalogLimit.Hi", DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Snapshot-complete sentinel: empty source refs (the framing marker) but
// routed because every subscriber receives it; never type-filtered.
cb!(new NativeAlarmTransition("Tank01", "Tank01", "", AlarmTransitionKind.SnapshotComplete,
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
[Fact]
public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource()
{
// Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the
// sentinel with EMPTY SourceReference / SourceObjectReference. With a
// specific (prefix) source like "Reactor." the per-source match
// ("".StartsWith("Reactor.") == false) used to drop it, stranding the
// buffered snapshot in the NativeAlarmActor forever — statically-active
// conditions (delivered only in the snapshot) never surfaced. The sentinel
// must be broadcast to every subscriber so the snapshot swap completes.
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "MxGateway")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow));
ExpectMsg(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Active alarm under the source arrives in the snapshot and routes by prefix.
cb!(new NativeAlarmTransition(
"Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst",
AlarmTransitionKind.Snapshot,
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400),
"Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg(u =>
u.Transition.Kind == AlarmTransitionKind.Snapshot &&
u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm");
// Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber.
cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete,
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
// ── DataConnectionLayer-023: mid-flight alarm unsubscribe must release the adapter feed ──
[Fact]
public async Task DCL023_UnsubscribeDuringInFlightAlarmSubscribe_ReleasesAdapterFeed_AndKeepsStateClean()
{
// Regression test for DataConnectionLayer-023. Previously the native-alarm
// subscribe path never inherited the DCL-021 obsolete-completion guard: if the
// last subscriber unsubscribed while the adapter alarm subscribe was in flight,
// HandleUnsubscribeAlarms could not tear down the feed (the subscription id was
// not stored yet), and the late AlarmSubscribeCompleted unconditionally stored
// _alarmSubscriptionIds[source] = id — an orphaned device-side alarm feed that
// streamed transitions to nobody for the lifetime of the adapter. After the fix,
// HandleUnsubscribeAlarms clears the in-flight marker and the late completion is
// recognized as orphaned (no subscriber remains) and released via
// UnsubscribeAlarmsAsync, with no leaked subscription id retained.
var subscribeStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var releaseSubscribe = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var adapter = Substitute.For();
adapter.ConnectAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
var alarmable = (IAlarmSubscribableConnection)adapter;
// Park the adapter subscribe so UnsubscribeAlarmsRequest is processed first.
alarmable.SubscribeAlarmsAsync(Arg.Any(), Arg.Any(),
Arg.Any(), Arg.Any())
.Returns(_ =>
{
subscribeStarted.TrySetResult();
return releaseSubscribe.Task;
});
alarmable.UnsubscribeAlarmsAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
// Subscribe a source — block on the parked adapter subscribe.
actor.Tell(new SubscribeAlarmsRequest("c1", "instA", "conn", "Tank01", null, DateTimeOffset.UtcNow));
// The immediate SubscribeAlarmsResponse only arrives after HandleAlarmSubscribeCompleted;
// since the subscribe is parked, none is expected yet.
await subscribeStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
// The last subscriber unsubscribes while the alarm subscribe is still in flight.
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c1", "instA", "conn", "Tank01", DateTimeOffset.UtcNow));
await Task.Delay(150);
// Release the parked subscribe — AlarmSubscribeCompleted is now orphaned.
releaseSubscribe.SetResult("alarm-sub-orphan");
await Task.Delay(300);
// The orphaned, just-created adapter feed must be released exactly once.
await alarmable.Received(1).UnsubscribeAlarmsAsync(
Arg.Is(s => s == "alarm-sub-orphan"), Arg.Any());
// No leaked subscription id must remain: a fresh subscribe to the same source
// must open a NEW adapter feed (proving _alarmSubscriptionIds was not populated
// with the orphaned id, which would have short-circuited the adapter subscribe).
var resubStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
alarmable.SubscribeAlarmsAsync(Arg.Any(), Arg.Any(),
Arg.Any(), Arg.Any())
.Returns(_ =>
{
resubStarted.TrySetResult();
return Task.FromResult("alarm-sub-2");
});
actor.Tell(new SubscribeAlarmsRequest("c2", "instB", "conn", "Tank01", null, DateTimeOffset.UtcNow));
await resubStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
ExpectMsg(m => m.Success, TimeSpan.FromSeconds(5));
// Two distinct adapter subscribes total (the orphaned one + the fresh one).
await alarmable.Received(2).SubscribeAlarmsAsync(
"Tank01", Arg.Any(), Arg.Any(), Arg.Any());
}
// ── DataConnectionLayer-029: a re-subscribe during an orphaned in-flight subscribe
// must not leak a duplicate adapter feed ──
[Fact]
public async Task DCL029_ResubscribeDuringOrphanedInFlightSubscribe_ReleasesDuplicateFeed_NoLeak()
{
// Regression test for DataConnectionLayer-029. The DCL-023 fix clears the in-flight
// marker on unsubscribe, which reopens a double-subscribe window: unsubscribe (last
// subscriber, subId not stored yet) → a fresh subscribe for the SAME source sees
// neither a stored id nor an in-flight marker, so it issues a SECOND adapter feed →
// both completions fire. The DCL-023 orphan guard does NOT trigger on either
// completion (the re-subscribe re-added the subscriber), so the alarm completion
// handler used to OVERWRITE _alarmSubscriptionIds with the second id — leaking the
// first feed (never unsubscribed, kept streaming). After DCL-029 the handler mirrors
// the tag-path re-check: when a feed is already stored, the redundant completion
// releases its just-created feed instead of overwriting + leaking.
var sub1Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var sub1Release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var sub2Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var sub2Release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var adapter = Substitute.For();
adapter.ConnectAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
var alarmable = (IAlarmSubscribableConnection)adapter;
var calls = 0;
alarmable.SubscribeAlarmsAsync(Arg.Any(), Arg.Any(),
Arg.Any(), Arg.Any())
.Returns(_ =>
{
if (Interlocked.Increment(ref calls) == 1)
{
sub1Started.TrySetResult();
return sub1Release.Task;
}
sub2Started.TrySetResult();
return sub2Release.Task;
});
alarmable.UnsubscribeAlarmsAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "OpcUa")));
// (1) Subscribe A — adapter subscribe #1 parks, in-flight={Tank01}.
actor.Tell(new SubscribeAlarmsRequest("c1", "instA", "conn", "Tank01", null, DateTimeOffset.UtcNow));
await sub1Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
// (2) Last subscriber unsubscribes while subscribe #1 is in flight — clears the
// in-flight marker (DCL-023); subId#1 is not stored yet so no teardown happens.
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c1", "instA", "conn", "Tank01", DateTimeOffset.UtcNow));
await Task.Delay(150);
// (3) Fresh subscribe for the SAME source before #1 completes — neither a stored id
// nor an in-flight marker exists, so the actor issues a SECOND adapter subscribe.
actor.Tell(new SubscribeAlarmsRequest("c2", "instB", "conn", "Tank01", null, DateTimeOffset.UtcNow));
await sub2Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
// (4) Complete subscribe #1 → a subscriber exists again (B re-added), so the orphan
// guard does NOT fire and subId#1 is stored as the live feed.
sub1Release.SetResult("alarm-sub-1");
await Task.Delay(150);
// (5) Complete subscribe #2 → a feed is already stored, so this redundant completion
// releases its just-created feed (#2) instead of overwriting + leaking subId#1.
sub2Release.SetResult("alarm-sub-2");
await Task.Delay(300);
// The duplicate feed (#2) is released exactly once; the first feed (#1) is retained.
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-2", Arg.Any());
await alarmable.DidNotReceive().UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any());
// The retained feed (#1) is what a later unsubscribe tears down — proving subId#1,
// not the duplicate, is the id _alarmSubscriptionIds actually tracks (no leak).
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c2", "instB", "conn", "Tank01", DateTimeOffset.UtcNow));
await Task.Delay(200);
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any());
}
}