feat(runtime): HistorianAdapter + PeerOpcUaProbe + DbHealthProbe actors (engine wiring tracked as F11/F12)

This commit is contained in:
Joseph Doherty
2026-05-26 05:09:06 -04:00
parent e115f13104
commit 28639cb14d
4 changed files with 194 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
using Akka.Actor;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Configuration;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Health;
/// <summary>
/// Single-flight cached health probe against the ConfigDb. Reads cached state via
/// <c>Ask&lt;DbHealthStatus&gt;</c>; a single SELECT 1 runs at most every <c>RefreshInterval</c>.
/// Consumed by both the host's <c>/health/ready</c> endpoint (Task 54) and
/// <c>RedundancyStateActor</c>'s stale calc.
/// </summary>
public sealed class DbHealthProbeActor : ReceiveActor, IWithTimers
{
public static readonly TimeSpan RefreshInterval = TimeSpan.FromSeconds(5);
public sealed class GetStatus { public static readonly GetStatus Instance = new(); private GetStatus() { } }
public sealed record DbHealthStatus(bool Reachable, DateTime AsOfUtc, string? LastError);
public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } }
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly ILoggingAdapter _log = Context.GetLogger();
private DbHealthStatus _last = new(false, DateTime.MinValue, "not probed yet");
public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory) =>
Akka.Actor.Props.Create(() => new DbHealthProbeActor(dbFactory));
public DbHealthProbeActor(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory)
{
_dbFactory = dbFactory;
Receive<GetStatus>(_ => Sender.Tell(_last));
Receive<Tick>(_ => RunProbe());
}
protected override void PreStart()
{
RunProbe();
Timers.StartPeriodicTimer("probe", Tick.Instance, RefreshInterval);
}
private void RunProbe()
{
try
{
using var db = _dbFactory.CreateDbContext();
_ = db.Deployments.AsNoTracking().Take(1).ToList();
_last = new DbHealthStatus(true, DateTime.UtcNow, null);
}
catch (Exception ex)
{
_last = new DbHealthStatus(false, DateTime.UtcNow, ex.Message);
_log.Warning(ex, "DbHealthProbe: probe failed");
}
}
}

View File

@@ -0,0 +1,54 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Health;
/// <summary>
/// Periodically pings a peer node's OPC UA endpoint (<c>opc.tcp://peer:4840</c>) and publishes
/// the result on the cluster's redundancy-state input topic so the admin <c>RedundancyStateActor</c>
/// can react. Real OPC UA probe call is staged for follow-up F12.
/// </summary>
public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers
{
// Owned by ControlPlane.Redundancy.RedundancyStateActor; duplicated here to avoid a
// Runtime → ControlPlane project reference. Keep both literals in lock-step.
public const string RedundancyStateTopic = "redundancy-state";
public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10);
public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok);
public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } }
private readonly NodeId _peer;
private readonly TimeSpan _interval;
private readonly Action<object>? _broadcastOverride;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(NodeId peer, TimeSpan? interval = null, Action<object>? broadcast = null) =>
Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(peer, interval ?? DefaultProbeInterval, broadcast));
public PeerOpcUaProbeActor(NodeId peer, TimeSpan interval, Action<object>? broadcastOverride)
{
_peer = peer;
_interval = interval;
_broadcastOverride = broadcastOverride;
Receive<Tick>(_ => RunProbe());
}
protected override void PreStart() =>
Timers.StartPeriodicTimer("probe", Tick.Instance, _interval);
private void RunProbe()
{
// F12: actual opc.tcp ping. Assume Ok=true until the probe is wired.
var msg = new OpcUaProbeResult(_peer, Ok: true);
if (_broadcastOverride is not null) _broadcastOverride(msg);
else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg));
_log.Debug("PeerOpcUaProbe: pinged {Peer} (probe staged for F12)", _peer);
}
}

View File

@@ -0,0 +1,32 @@
using Akka.Actor;
using Akka.Event;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// <summary>
/// Wraps the named-pipe IPC to the Wonderware historian sidecar with a store-and-forward
/// SQLite buffer for pipe outages. Engine wiring (named-pipe client + <c>SqliteStoreAndForwardSink</c>)
/// is staged for follow-up F11.
/// </summary>
public sealed class HistorianAdapterActor : ReceiveActor
{
public sealed record HistoryRow(string Source, string AttributeId, object? Value, DateTime TimestampUtc);
private readonly ILoggingAdapter _log = Context.GetLogger();
private int _buffered;
public int BufferedCount => _buffered;
public static Props Props() => Akka.Actor.Props.Create(() => new HistorianAdapterActor());
public HistorianAdapterActor()
{
Receive<HistoryRow>(row =>
{
// F11: dispatch to named-pipe sink; on disconnect → buffer in SQLite.
Interlocked.Increment(ref _buffered);
_log.Debug("Historian: buffered row for {Source}/{Attr} (sink wiring staged for F11)",
row.Source, row.AttributeId);
});
}
}

View File

@@ -0,0 +1,50 @@
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health;
public sealed class HealthProbeActorTests : RuntimeActorTestBase
{
[Fact]
public async Task DbHealthProbeActor_returns_reachable_against_in_memory_db()
{
var db = NewInMemoryDbFactory();
var actor = Sys.ActorOf(DbHealthProbeActor.Props(db));
var status = await actor.Ask<DbHealthProbeActor.DbHealthStatus>(
DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(3));
status.Reachable.ShouldBeTrue();
status.LastError.ShouldBeNull();
}
[Fact]
public void PeerOpcUaProbeActor_publishes_probe_result_at_each_tick()
{
var received = new System.Collections.Generic.List<object>();
var actor = Sys.ActorOf(PeerOpcUaProbeActor.Props(
NodeId.Parse("peer-1"),
interval: TimeSpan.FromMilliseconds(50),
broadcast: msg => received.Add(msg)));
AwaitCondition(() => received.Count >= 2, TimeSpan.FromSeconds(2));
received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().ShouldNotBeEmpty();
}
[Fact]
public void HistorianAdapterActor_buffers_rows()
{
var actor = Sys.ActorOf(HistorianAdapterActor.Props());
for (var i = 0; i < 5; i++)
actor.Tell(new HistorianAdapterActor.HistoryRow("driver-a", $"tag-{i}", i, DateTime.UtcNow));
ExpectNoMsg(TimeSpan.FromMilliseconds(100));
// No direct readback of the count from a sealed actor — assert by Ask of a self-probe later
// when the engine wiring lands (F11). For now this asserts the actor accepts the contract.
}
}