Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / build (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled
Reshapes the placeholder buffered-counter actor into a thin fire-and-forget bridge over the existing IAlarmHistorianSink contract. Default sink is NullAlarmHistorianSink; production deployments override the DI binding to SqliteStoreAndForwardSink wrapping WonderwareHistorianClient (the v1 components in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware* are reused verbatim — actor is just a mailbox-friendly entry point). - HistorianAdapterActor.Props(IAlarmHistorianSink?) — null defaults to NullAlarmHistorianSink - Receive<AlarmHistorianEvent>: fire-and-forget sink.EnqueueAsync - Receive<GetStatus>: returns sink.GetStatus() (queue depth + drain state) - ServiceCollectionExtensions.AddOtOpcUaRuntime registers the default sink - WithOtOpcUaRuntimeActors spawns the actor + registers HistorianAdapterActorKey - Program.cs calls AddOtOpcUaRuntime when hasDriver Tests: 2 new (forward-to-sink + GetStatus). Runtime suite 17 → 18.
127 lines
4.6 KiB
C#
127 lines
4.6 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using Akka.Actor;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health;
|
|
|
|
public sealed class HealthProbeActorTests : RuntimeActorTestBase
|
|
{
|
|
[Fact]
|
|
public async Task DbHealthProbeActor_returns_reachable_against_in_memory_db()
|
|
{
|
|
var db = NewInMemoryDbFactory();
|
|
var actor = Sys.ActorOf(DbHealthProbeActor.Props(db));
|
|
|
|
var status = await actor.Ask<DbHealthProbeActor.DbHealthStatus>(
|
|
DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(3));
|
|
|
|
status.Reachable.ShouldBeTrue();
|
|
status.LastError.ShouldBeNull();
|
|
}
|
|
|
|
[Fact]
|
|
public void PeerOpcUaProbeActor_reports_Ok_true_against_a_live_listener()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
var received = new System.Collections.Generic.List<object>();
|
|
Sys.ActorOf(PeerOpcUaProbeActor.Props(
|
|
NodeId.Parse($"127.0.0.1:{port}"),
|
|
interval: TimeSpan.FromMilliseconds(50),
|
|
connectTimeout: TimeSpan.FromMilliseconds(500),
|
|
opcUaPort: port,
|
|
broadcast: msg => received.Add(msg)));
|
|
|
|
AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => r.Ok),
|
|
TimeSpan.FromSeconds(3));
|
|
}
|
|
|
|
[Fact]
|
|
public void PeerOpcUaProbeActor_reports_Ok_false_against_an_unreachable_endpoint()
|
|
{
|
|
// Port 1 is reserved (tcpmux) and almost never bound on dev machines, so the connect fails fast.
|
|
var received = new System.Collections.Generic.List<object>();
|
|
Sys.ActorOf(PeerOpcUaProbeActor.Props(
|
|
NodeId.Parse("127.0.0.1:1"),
|
|
interval: TimeSpan.FromMilliseconds(50),
|
|
connectTimeout: TimeSpan.FromMilliseconds(300),
|
|
opcUaPort: 1,
|
|
broadcast: msg => received.Add(msg)));
|
|
|
|
AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => !r.Ok),
|
|
TimeSpan.FromSeconds(3));
|
|
}
|
|
|
|
[Fact]
|
|
public void HistorianAdapterActor_forwards_events_to_injected_sink()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
actor.Tell(new AlarmHistorianEvent(
|
|
AlarmId: $"alm-{i}",
|
|
EquipmentPath: "Plant/LineA",
|
|
AlarmName: $"Alarm{i}",
|
|
AlarmTypeName: "LimitAlarm",
|
|
Severity: AlarmSeverity.High,
|
|
EventKind: "Activated",
|
|
Message: $"Test alarm {i}",
|
|
User: "system",
|
|
Comment: null,
|
|
TimestampUtc: DateTime.UtcNow));
|
|
|
|
AwaitCondition(() => sink.Enqueued.Count == 5, TimeSpan.FromSeconds(2));
|
|
sink.Enqueued.Select(e => e.AlarmId).OrderBy(s => s).ShouldBe(
|
|
new[] { "alm-0", "alm-1", "alm-2", "alm-3", "alm-4" });
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistorianAdapterActor_returns_sink_status_via_GetStatus()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
|
|
|
|
actor.Tell(new AlarmHistorianEvent(
|
|
"alm-x", "Plant/LineB", "OffNormal", "OffNormalAlarm",
|
|
AlarmSeverity.Low, "Activated", "msg", "system", null, DateTime.UtcNow));
|
|
AwaitCondition(() => sink.Enqueued.Count == 1, TimeSpan.FromSeconds(2));
|
|
|
|
var status = await actor.Ask<HistorianSinkStatus>(
|
|
HistorianAdapterActor.GetStatus.Instance, TimeSpan.FromSeconds(2));
|
|
|
|
status.QueueDepth.ShouldBe(1);
|
|
status.DrainState.ShouldBe(HistorianDrainState.Idle);
|
|
}
|
|
|
|
private sealed class RecordingSink : IAlarmHistorianSink
|
|
{
|
|
public ConcurrentBag<AlarmHistorianEvent> Enqueued { get; } = [];
|
|
|
|
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
|
{
|
|
Enqueued.Add(evt);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public HistorianSinkStatus GetStatus() => new(
|
|
QueueDepth: Enqueued.Count,
|
|
DeadLetterDepth: 0,
|
|
LastDrainUtc: null,
|
|
LastSuccessUtc: null,
|
|
LastError: null,
|
|
DrainState: HistorianDrainState.Idle);
|
|
}
|
|
}
|