using System.Collections.Concurrent; using Akka.Actor; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa; public sealed class OpcUaPublishActorTests : RuntimeActorTestBase { [Fact] public void Accepts_message_contracts_without_pinned_dispatcher_in_tests() { var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests()); actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaQuality.Good, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); } [Fact] public void Production_Props_targets_opcua_synchronized_dispatcher() { var props = OpcUaPublishActor.Props(); props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId); } [Fact] public void AttributeValueUpdate_routes_to_sink_WriteValue() { var sink = new RecordingSink(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T1", 3.14, OpcUaQuality.Good, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T2", "abc", OpcUaQuality.Uncertain, DateTime.UtcNow)); AwaitAssert(() => { sink.Values.Count.ShouldBe(2); sink.Values[0].NodeId.ShouldBe("ns=2;s=T1"); sink.Values[0].Value.ShouldBe(3.14); sink.Values[0].Quality.ShouldBe(OpcUaQuality.Good); sink.Values[1].Quality.ShouldBe(OpcUaQuality.Uncertain); }, duration: TimeSpan.FromMilliseconds(500)); } [Fact] public void AlarmStateUpdate_routes_to_sink_WriteAlarmState() { var sink = new RecordingSink(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=A1", Active: true, Acknowledged: false, DateTime.UtcNow)); AwaitAssert(() => { sink.Alarms.Count.ShouldBe(1); sink.Alarms[0].AlarmNodeId.ShouldBe("ns=2;s=A1"); sink.Alarms[0].Active.ShouldBeTrue(); sink.Alarms[0].Acknowledged.ShouldBeFalse(); }, duration: TimeSpan.FromMilliseconds(500)); } [Fact] public void RebuildAddressSpace_calls_sink_Rebuild() { var sink = new RecordingSink(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500)); } [Fact] public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level() { var publisher = new RecordingPublisher(); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher)); actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); // dedup actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(100)); AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240, 100 }), duration: TimeSpan.FromMilliseconds(500)); } [Fact] public void RedundancyStateChanged_drives_local_ServiceLevel_publish_for_primary_leader() { var publisher = new RecordingPublisher(); var local = NodeId.Parse("primary-node"); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( serviceLevel: publisher, localNode: local)); var snapshot = new RedundancyStateChanged( Nodes: new[] { new NodeRedundancyState(local, RedundancyRole.Primary, IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), new NodeRedundancyState(NodeId.Parse("other-node"), RedundancyRole.Secondary, IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), }, CorrelationId.NewId()); actor.Tell(snapshot); AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240 }), duration: TimeSpan.FromMilliseconds(500)); } [Fact] public void RedundancyStateChanged_for_secondary_publishes_100() { var publisher = new RecordingPublisher(); var local = NodeId.Parse("secondary-node"); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher, localNode: local)); var snapshot = new RedundancyStateChanged( Nodes: new[] { new NodeRedundancyState(local, RedundancyRole.Secondary, IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), }, CorrelationId.NewId()); actor.Tell(snapshot); AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 100 }), duration: TimeSpan.FromMilliseconds(500)); } private sealed class RecordingSink : IOpcUaAddressSpaceSink { public ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> ValueQueue { get; } = new(); public ConcurrentQueue<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> AlarmQueue { get; } = new(); public int RebuildCalls; public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values => ValueQueue.ToList(); public List<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> Alarms => AlarmQueue.ToList(); public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) => ValueQueue.Enqueue((nodeId, value, quality, ts)); public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) => AlarmQueue.Enqueue((alarmNodeId, active, acknowledged, ts)); public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) { } public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls); } private sealed class RecordingPublisher : IServiceLevelPublisher { private readonly ConcurrentQueue _q = new(); public byte[] Levels => _q.ToArray(); public void Publish(byte serviceLevel) => _q.Enqueue(serviceLevel); } }