using Akka.Actor; using Akka.TestKit.Xunit2; using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests; /// Task-10: native alarm subscribe + source-ref routing + unavailable signal. public class DataConnectionActorAlarmTests : TestKit { private readonly ISiteHealthCollector _health = Substitute.For(); private readonly IDataConnectionFactory _factory = Substitute.For(); private readonly DataConnectionOptions _options = new() { ReconnectInterval = TimeSpan.FromMilliseconds(100), TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200), WriteTimeout = TimeSpan.FromSeconds(5) }; private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) => Raise(sourceRef, sourceObj, "AnalogLimit.Hi"); private static NativeAlarmTransition Raise(string sourceRef, string sourceObj, string typeName, AlarmTransitionKind kind = AlarmTransitionKind.Raise) => new(sourceRef, sourceObj, typeName, kind, new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500), "Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90"); private static (IDataConnection Adapter, Func Cb) BuildAlarmAdapter() { AlarmTransitionCallback? cb = null; var adapter = Substitute.For(); adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); ((IAlarmSubscribableConnection)adapter) .SubscribeAlarmsAsync(Arg.Any(), Arg.Any(), Arg.Do(c => cb = c), Arg.Any()) .Returns(Task.FromResult("alarm-sub-1")); return (adapter, () => cb); } [Fact] public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber() { AlarmTransitionCallback? cb = null; var adapter = Substitute.For(); adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); ((IAlarmSubscribableConnection)adapter) .SubscribeAlarmsAsync(Arg.Any(), Arg.Any(), Arg.Do(c => cb = c), Arg.Any()) .Returns(Task.FromResult("alarm-sub-1")); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); Assert.NotNull(cb); cb!(Raise("Tank01.Hi", "Tank01")); ExpectMsg(u => u.Transition.SourceObjectReference == "Tank01"); } [Fact] public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure() { var adapter = Substitute.For(); // not IAlarmSubscribableConnection adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => !m.Success && m.ErrorMessage != null); } // ── M2.4 (#8): conditionFilter is now applied client-side in the actor ── [Fact] public void SubscribeAlarms_WithTypeFilter_DeliversOnlyMatchingTypes() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", "AnalogLimit.Hi,AnalogLimit.Lo", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Non-matching type is dropped (no message delivered). cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); ExpectNoMsg(TimeSpan.FromMilliseconds(250)); // Matching type is delivered. cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi"); } [Fact] public void SubscribeAlarms_WithNullFilter_DeliversAllTypes() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.HiHi"); cb!(Raise("Tank01.Lo", "Tank01", "DiscreteAlarm")); ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm"); } [Fact] public void SubscribeAlarms_FilterMatch_IgnoresCaseAndWhitespace() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", " analoglimit.hi ,\tDISCRETEALARM ", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); // case differs from filter ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi"); cb!(Raise("Tank01.Disc", "Tank01", "DiscreteAlarm")); ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm"); cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); // not listed ExpectNoMsg(TimeSpan.FromMilliseconds(250)); } [Fact] public void SubscribeAlarms_GatewayWideFeed_IsFilteredClientSide() { // MxGateway has no server-side filter: its adapter opens ONE gateway-wide // feed and the actor is the authoritative gate. A filtered source must // only see its own matching types even though the feed carries everything. var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "MxGateway"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor", "HighTemp", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Gateway-wide feed delivers a transition for a different source object — // dropped by source routing. cb!(Raise("Pump.Fault", "Pump", "HighTemp")); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // Right source, wrong type — dropped by the client-side type gate. cb!(Raise("Reactor.LowTemp", "Reactor", "LowTemp")); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // Right source, right type — delivered. cb!(Raise("Reactor.HighTemp", "Reactor", "HighTemp")); ExpectMsg(u => u.Transition.SourceObjectReference == "Reactor" && u.Transition.AlarmTypeName == "HighTemp"); } [Fact] public void SubscribeAlarms_WithFilter_StillForwardsSnapshotCompleteSentinel() { // The SnapshotComplete framing sentinel (empty AlarmTypeName) must survive // the type gate so the NativeAlarmActor's snapshot swap can complete. var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", "AnalogLimit.Hi", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Snapshot-complete sentinel: empty source refs (the framing marker) but // routed because every subscriber receives it; never type-filtered. cb!(new NativeAlarmTransition("Tank01", "Tank01", "", AlarmTransitionKind.SnapshotComplete, new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), "", "", "", "", "", null, DateTimeOffset.UtcNow, "", "")); ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete); } }