using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using Akka.Actor; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.Historian; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health; public sealed class HealthProbeActorTests : RuntimeActorTestBase { [Fact] public async Task DbHealthProbeActor_returns_reachable_against_in_memory_db() { var db = NewInMemoryDbFactory(); var actor = Sys.ActorOf(DbHealthProbeActor.Props(db)); var status = await actor.Ask( DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(3)); status.Reachable.ShouldBeTrue(); status.LastError.ShouldBeNull(); } [Fact] public void PeerOpcUaProbeActor_reports_Ok_true_against_a_live_listener() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; var received = new System.Collections.Generic.List(); Sys.ActorOf(PeerOpcUaProbeActor.Props( NodeId.Parse($"127.0.0.1:{port}"), interval: TimeSpan.FromMilliseconds(50), connectTimeout: TimeSpan.FromMilliseconds(500), opcUaPort: port, broadcast: msg => received.Add(msg))); AwaitCondition(() => received.OfType().Any(r => r.Ok), TimeSpan.FromSeconds(3)); } [Fact] public void PeerOpcUaProbeActor_reports_Ok_false_against_an_unreachable_endpoint() { // Port 1 is reserved (tcpmux) and almost never bound on dev machines, so the connect fails fast. var received = new System.Collections.Generic.List(); Sys.ActorOf(PeerOpcUaProbeActor.Props( NodeId.Parse("127.0.0.1:1"), interval: TimeSpan.FromMilliseconds(50), connectTimeout: TimeSpan.FromMilliseconds(300), opcUaPort: 1, broadcast: msg => received.Add(msg))); AwaitCondition(() => received.OfType().Any(r => !r.Ok), TimeSpan.FromSeconds(3)); } [Fact] public void HistorianAdapterActor_forwards_events_to_injected_sink() { var sink = new RecordingSink(); var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); for (var i = 0; i < 5; i++) actor.Tell(new AlarmHistorianEvent( AlarmId: $"alm-{i}", EquipmentPath: "Plant/LineA", AlarmName: $"Alarm{i}", AlarmTypeName: "LimitAlarm", Severity: AlarmSeverity.High, EventKind: "Activated", Message: $"Test alarm {i}", User: "system", Comment: null, TimestampUtc: DateTime.UtcNow)); AwaitCondition(() => sink.Enqueued.Count == 5, TimeSpan.FromSeconds(2)); sink.Enqueued.Select(e => e.AlarmId).OrderBy(s => s).ShouldBe( new[] { "alm-0", "alm-1", "alm-2", "alm-3", "alm-4" }); } [Fact] public async Task HistorianAdapterActor_returns_sink_status_via_GetStatus() { var sink = new RecordingSink(); var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); actor.Tell(new AlarmHistorianEvent( "alm-x", "Plant/LineB", "OffNormal", "OffNormalAlarm", AlarmSeverity.Low, "Activated", "msg", "system", null, DateTime.UtcNow)); AwaitCondition(() => sink.Enqueued.Count == 1, TimeSpan.FromSeconds(2)); var status = await actor.Ask( HistorianAdapterActor.GetStatus.Instance, TimeSpan.FromSeconds(2)); status.QueueDepth.ShouldBe(1); status.DrainState.ShouldBe(HistorianDrainState.Idle); } private sealed class RecordingSink : IAlarmHistorianSink { public ConcurrentBag Enqueued { get; } = []; public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) { Enqueued.Add(evt); return Task.CompletedTask; } public HistorianSinkStatus GetStatus() => new( QueueDepth: Enqueued.Count, DeadLetterDepth: 0, LastDrainUtc: null, LastSuccessUtc: null, LastError: null, DrainState: HistorianDrainState.Idle); } }