diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/DbHealthProbeActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/DbHealthProbeActor.cs new file mode 100644 index 0000000..2d9c17c --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/DbHealthProbeActor.cs @@ -0,0 +1,58 @@ +using Akka.Actor; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Configuration; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Health; + +/// +/// Single-flight cached health probe against the ConfigDb. Reads cached state via +/// Ask<DbHealthStatus>; a single SELECT 1 runs at most every RefreshInterval. +/// Consumed by both the host's /health/ready endpoint (Task 54) and +/// RedundancyStateActor's stale calc. +/// +public sealed class DbHealthProbeActor : ReceiveActor, IWithTimers +{ + public static readonly TimeSpan RefreshInterval = TimeSpan.FromSeconds(5); + + public sealed class GetStatus { public static readonly GetStatus Instance = new(); private GetStatus() { } } + public sealed record DbHealthStatus(bool Reachable, DateTime AsOfUtc, string? LastError); + public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } } + + private readonly IDbContextFactory _dbFactory; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private DbHealthStatus _last = new(false, DateTime.MinValue, "not probed yet"); + + public ITimerScheduler Timers { get; set; } = null!; + + public static Props Props(IDbContextFactory dbFactory) => + Akka.Actor.Props.Create(() => new DbHealthProbeActor(dbFactory)); + + public DbHealthProbeActor(IDbContextFactory dbFactory) + { + _dbFactory = dbFactory; + Receive(_ => Sender.Tell(_last)); + Receive(_ => RunProbe()); + } + + protected override void PreStart() + { + RunProbe(); + Timers.StartPeriodicTimer("probe", Tick.Instance, RefreshInterval); + } + + private void RunProbe() + { + try + { + using var db = _dbFactory.CreateDbContext(); + _ = db.Deployments.AsNoTracking().Take(1).ToList(); + _last = new DbHealthStatus(true, DateTime.UtcNow, null); + } + catch (Exception ex) + { + _last = new DbHealthStatus(false, DateTime.UtcNow, ex.Message); + _log.Warning(ex, "DbHealthProbe: probe failed"); + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs new file mode 100644 index 0000000..7e8c157 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs @@ -0,0 +1,54 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Types; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Health; + +/// +/// Periodically pings a peer node's OPC UA endpoint (opc.tcp://peer:4840) and publishes +/// the result on the cluster's redundancy-state input topic so the admin RedundancyStateActor +/// can react. Real OPC UA probe call is staged for follow-up F12. +/// +public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers +{ + // Owned by ControlPlane.Redundancy.RedundancyStateActor; duplicated here to avoid a + // Runtime → ControlPlane project reference. Keep both literals in lock-step. + public const string RedundancyStateTopic = "redundancy-state"; + + public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10); + + public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok); + public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } } + + private readonly NodeId _peer; + private readonly TimeSpan _interval; + private readonly Action? _broadcastOverride; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public ITimerScheduler Timers { get; set; } = null!; + + public static Props Props(NodeId peer, TimeSpan? interval = null, Action? broadcast = null) => + Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(peer, interval ?? DefaultProbeInterval, broadcast)); + + public PeerOpcUaProbeActor(NodeId peer, TimeSpan interval, Action? broadcastOverride) + { + _peer = peer; + _interval = interval; + _broadcastOverride = broadcastOverride; + + Receive(_ => RunProbe()); + } + + protected override void PreStart() => + Timers.StartPeriodicTimer("probe", Tick.Instance, _interval); + + private void RunProbe() + { + // F12: actual opc.tcp ping. Assume Ok=true until the probe is wired. + var msg = new OpcUaProbeResult(_peer, Ok: true); + if (_broadcastOverride is not null) _broadcastOverride(msg); + else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg)); + _log.Debug("PeerOpcUaProbe: pinged {Peer} (probe staged for F12)", _peer); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs new file mode 100644 index 0000000..7f5315d --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs @@ -0,0 +1,32 @@ +using Akka.Actor; +using Akka.Event; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; + +/// +/// Wraps the named-pipe IPC to the Wonderware historian sidecar with a store-and-forward +/// SQLite buffer for pipe outages. Engine wiring (named-pipe client + SqliteStoreAndForwardSink) +/// is staged for follow-up F11. +/// +public sealed class HistorianAdapterActor : ReceiveActor +{ + public sealed record HistoryRow(string Source, string AttributeId, object? Value, DateTime TimestampUtc); + + private readonly ILoggingAdapter _log = Context.GetLogger(); + private int _buffered; + + public int BufferedCount => _buffered; + + public static Props Props() => Akka.Actor.Props.Create(() => new HistorianAdapterActor()); + + public HistorianAdapterActor() + { + Receive(row => + { + // F11: dispatch to named-pipe sink; on disconnect → buffer in SQLite. + Interlocked.Increment(ref _buffered); + _log.Debug("Historian: buffered row for {Source}/{Attr} (sink wiring staged for F11)", + row.Source, row.AttributeId); + }); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs new file mode 100644 index 0000000..fc7c910 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs @@ -0,0 +1,50 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Runtime.Health; +using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health; + +public sealed class HealthProbeActorTests : RuntimeActorTestBase +{ + [Fact] + public async Task DbHealthProbeActor_returns_reachable_against_in_memory_db() + { + var db = NewInMemoryDbFactory(); + var actor = Sys.ActorOf(DbHealthProbeActor.Props(db)); + + var status = await actor.Ask( + DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(3)); + + status.Reachable.ShouldBeTrue(); + status.LastError.ShouldBeNull(); + } + + [Fact] + public void PeerOpcUaProbeActor_publishes_probe_result_at_each_tick() + { + var received = new System.Collections.Generic.List(); + var actor = Sys.ActorOf(PeerOpcUaProbeActor.Props( + NodeId.Parse("peer-1"), + interval: TimeSpan.FromMilliseconds(50), + broadcast: msg => received.Add(msg))); + + AwaitCondition(() => received.Count >= 2, TimeSpan.FromSeconds(2)); + received.OfType().ShouldNotBeEmpty(); + } + + [Fact] + public void HistorianAdapterActor_buffers_rows() + { + var actor = Sys.ActorOf(HistorianAdapterActor.Props()); + for (var i = 0; i < 5; i++) + actor.Tell(new HistorianAdapterActor.HistoryRow("driver-a", $"tag-{i}", i, DateTime.UtcNow)); + + ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + // No direct readback of the count from a sealed actor — assert by Ask of a self-probe later + // when the engine wiring lands (F11). For now this asserts the actor accepts the contract. + } +}