using Akka.Actor; using Akka.TestKit.Xunit2; using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests; /// Task-10: native alarm subscribe + source-ref routing + unavailable signal. public class DataConnectionActorAlarmTests : TestKit { private readonly ISiteHealthCollector _health = Substitute.For(); private readonly IDataConnectionFactory _factory = Substitute.For(); private readonly DataConnectionOptions _options = new() { ReconnectInterval = TimeSpan.FromMilliseconds(100), TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200), WriteTimeout = TimeSpan.FromSeconds(5) }; private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) => Raise(sourceRef, sourceObj, "AnalogLimit.Hi"); private static NativeAlarmTransition Raise(string sourceRef, string sourceObj, string typeName, AlarmTransitionKind kind = AlarmTransitionKind.Raise) => new(sourceRef, sourceObj, typeName, kind, new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500), "Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90"); private static (IDataConnection Adapter, Func Cb) BuildAlarmAdapter() { AlarmTransitionCallback? cb = null; var adapter = Substitute.For(); adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); ((IAlarmSubscribableConnection)adapter) .SubscribeAlarmsAsync(Arg.Any(), Arg.Any(), Arg.Do(c => cb = c), Arg.Any()) .Returns(Task.FromResult("alarm-sub-1")); return (adapter, () => cb); } [Fact] public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber() { AlarmTransitionCallback? cb = null; var adapter = Substitute.For(); adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); ((IAlarmSubscribableConnection)adapter) .SubscribeAlarmsAsync(Arg.Any(), Arg.Any(), Arg.Do(c => cb = c), Arg.Any()) .Returns(Task.FromResult("alarm-sub-1")); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); Assert.NotNull(cb); cb!(Raise("Tank01.Hi", "Tank01")); ExpectMsg(u => u.Transition.SourceObjectReference == "Tank01"); } [Fact] public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure() { var adapter = Substitute.For(); // not IAlarmSubscribableConnection adapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => !m.Success && m.ErrorMessage != null); } // ── M2.4 (#8): conditionFilter is now applied client-side in the actor ── [Fact] public void SubscribeAlarms_WithTypeFilter_DeliversOnlyMatchingTypes() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", "AnalogLimit.Hi,AnalogLimit.Lo", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Non-matching type is dropped (no message delivered). cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); ExpectNoMsg(TimeSpan.FromMilliseconds(250)); // Matching type is delivered. cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi"); } [Fact] public void SubscribeAlarms_WithNullFilter_DeliversAllTypes() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.HiHi"); cb!(Raise("Tank01.Lo", "Tank01", "DiscreteAlarm")); ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm"); } [Fact] public void SubscribeAlarms_FilterMatch_IgnoresCaseAndWhitespace() { var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", " analoglimit.hi ,\tDISCRETEALARM ", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); // case differs from filter ExpectMsg(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi"); cb!(Raise("Tank01.Disc", "Tank01", "DiscreteAlarm")); ExpectMsg(u => u.Transition.AlarmTypeName == "DiscreteAlarm"); cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); // not listed ExpectNoMsg(TimeSpan.FromMilliseconds(250)); } [Fact] public void SubscribeAlarms_GatewayWideFeed_IsFilteredClientSide() { // MxGateway has no server-side filter: its adapter opens ONE gateway-wide // feed and the actor is the authoritative gate. A filtered source must // only see its own matching types even though the feed carries everything. var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "MxGateway"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor", "HighTemp", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Gateway-wide feed delivers a transition for a different source object — // dropped by source routing. cb!(Raise("Pump.Fault", "Pump", "HighTemp")); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // Right source, wrong type — dropped by the client-side type gate. cb!(Raise("Reactor.LowTemp", "Reactor", "LowTemp")); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // Right source, right type — delivered. cb!(Raise("Reactor.HighTemp", "Reactor", "HighTemp")); ExpectMsg(u => u.Transition.SourceObjectReference == "Reactor" && u.Transition.AlarmTypeName == "HighTemp"); } [Fact] public void SubscribeAlarms_WithFilter_StillForwardsSnapshotCompleteSentinel() { // The SnapshotComplete framing sentinel (empty AlarmTypeName) must survive // the type gate so the NativeAlarmActor's snapshot swap can complete. var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "OpcUa"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", "AnalogLimit.Hi", DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Snapshot-complete sentinel: empty source refs (the framing marker) but // routed because every subscriber receives it; never type-filtered. cb!(new NativeAlarmTransition("Tank01", "Tank01", "", AlarmTransitionKind.SnapshotComplete, new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), "", "", "", "", "", null, DateTimeOffset.UtcNow, "", "")); ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete); } [Fact] public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource() { // Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the // sentinel with EMPTY SourceReference / SourceObjectReference. With a // specific (prefix) source like "Reactor." the per-source match // ("".StartsWith("Reactor.") == false) used to drop it, stranding the // buffered snapshot in the NativeAlarmActor forever — statically-active // conditions (delivered only in the snapshot) never surfaced. The sentinel // must be broadcast to every subscriber so the snapshot swap completes. var (adapter, getCb) = BuildAlarmAdapter(); var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor( "conn", adapter, _options, _health, _factory, "MxGateway"))); actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow)); ExpectMsg(m => m.Success); var cb = getCb(); Assert.NotNull(cb); // Active alarm under the source arrives in the snapshot and routes by prefix. cb!(new NativeAlarmTransition( "Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst", AlarmTransitionKind.Snapshot, new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400), "Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", "")); ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.Snapshot && u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm"); // Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber. cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete, new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), "", "", "", "", "", null, DateTimeOffset.UtcNow, "", "")); ExpectMsg(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete); } }