feat(runtime): F11 — HistorianAdapterActor wired to IAlarmHistorianSink
Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / build (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled

Reshapes the placeholder buffered-counter actor into a thin fire-and-forget
bridge over the existing IAlarmHistorianSink contract. Default sink is
NullAlarmHistorianSink; production deployments override the DI binding to
SqliteStoreAndForwardSink wrapping WonderwareHistorianClient (the v1
components in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware*
are reused verbatim — actor is just a mailbox-friendly entry point).

- HistorianAdapterActor.Props(IAlarmHistorianSink?) — null defaults to NullAlarmHistorianSink
- Receive<AlarmHistorianEvent>: fire-and-forget sink.EnqueueAsync
- Receive<GetStatus>: returns sink.GetStatus() (queue depth + drain state)
- ServiceCollectionExtensions.AddOtOpcUaRuntime registers the default sink
- WithOtOpcUaRuntimeActors spawns the actor + registers HistorianAdapterActorKey
- Program.cs calls AddOtOpcUaRuntime when hasDriver

Tests: 2 new (forward-to-sink + GetStatus). Runtime suite 17 → 18.
This commit is contained in:
Joseph Doherty
2026-05-26 07:18:08 -04:00
parent cd5540cb1a
commit 686138123f
6 changed files with 140 additions and 29 deletions

View File

@@ -38,6 +38,9 @@ builder.Host.UseSerilog((ctx, lc) => lc
builder.Services.AddOtOpcUaConfigDb(builder.Configuration);
builder.Services.AddOtOpcUaCluster(builder.Configuration);
if (hasDriver)
builder.Services.AddOtOpcUaRuntime();
// Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder
// from inside the configurator lambda. AddAkka spins the ActorSystem at host start.
builder.Services.AddAkka("otopcua", (ab, sp) =>

View File

@@ -1,32 +1,58 @@
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// <summary>
/// Wraps the named-pipe IPC to the Wonderware historian sidecar with a store-and-forward
/// SQLite buffer for pipe outages. Engine wiring (named-pipe client + <c>SqliteStoreAndForwardSink</c>)
/// is staged for follow-up F11.
/// Thin actor wrapper around <see cref="IAlarmHistorianSink"/>. Engine code (ScriptedAlarmActor,
/// Galaxy native alarm bridge, AB CIP ALMD reader) tells <see cref="AlarmHistorianEvent"/>s to this
/// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register
/// <see cref="SqliteStoreAndForwardSink"/> against <c>IAlarmHistorianSink</c>; the sink owns the
/// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond
/// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking
/// them on disk I/O or pipe handshakes.
///
/// Query queue depth + drain health via <see cref="GetStatus"/>.
/// </summary>
public sealed class HistorianAdapterActor : ReceiveActor
{
public sealed record HistoryRow(string Source, string AttributeId, object? Value, DateTime TimestampUtc);
private readonly ILoggingAdapter _log = Context.GetLogger();
private int _buffered;
public int BufferedCount => _buffered;
public static Props Props() => Akka.Actor.Props.Create(() => new HistorianAdapterActor());
public HistorianAdapterActor()
public sealed record GetStatus
{
Receive<HistoryRow>(row =>
public static readonly GetStatus Instance = new();
}
private readonly IAlarmHistorianSink _sink;
private readonly ILoggingAdapter _log = Context.GetLogger();
public static Props Props(IAlarmHistorianSink? sink = null) =>
Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance));
public HistorianAdapterActor(IAlarmHistorianSink sink)
{
_sink = sink;
Receive<AlarmHistorianEvent>(evt =>
{
// F11: dispatch to named-pipe sink; on disconnect → buffer in SQLite.
Interlocked.Increment(ref _buffered);
_log.Debug("Historian: buffered row for {Source}/{Attr} (sink wiring staged for F11)",
row.Source, row.AttributeId);
// Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously
// inside EnqueueAsync (it returns once the row is committed), so we don't block on
// network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state.
_ = EnqueueAsync(evt);
});
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
}
private async Task EnqueueAsync(AlarmHistorianEvent evt)
{
try
{
await _sink.EnqueueAsync(evt, CancellationToken.None);
}
catch (Exception ex)
{
_log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}",
evt.AlarmId, evt.TimestampUtc);
}
}
}

View File

@@ -1,10 +1,14 @@
using Akka.Actor;
using Akka.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
namespace ZB.MOM.WW.OtOpcUa.Runtime;
@@ -14,11 +18,25 @@ public static class ServiceCollectionExtensions
public const string DriverHostActorName = "driver-host";
public const string DbHealthProbeActorName = "db-health";
public const string HistorianAdapterActorName = "historian-adapter";
/// <summary>
/// Registers shared runtime services. Currently binds <see cref="IAlarmHistorianSink"/>
/// to <see cref="NullAlarmHistorianSink"/> as the default; production deployments
/// override this with <c>SqliteStoreAndForwardSink</c> wrapping <c>WonderwareHistorianClient</c>.
/// Call this BEFORE <c>AddAkka</c>.
/// </summary>
public static IServiceCollection AddOtOpcUaRuntime(this IServiceCollection services)
{
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
return services;
}
/// <summary>
/// Spawns the per-node driver-role actors on the host's <see cref="ActorSystem"/>:
/// <see cref="DriverHostActor"/> (one per node) and <see cref="DbHealthProbeActor"/>
/// (consumed by the health endpoint + redundancy calc).
/// <see cref="DriverHostActor"/> (one per node), <see cref="DbHealthProbeActor"/>
/// (consumed by the health endpoint + redundancy calc), and
/// <see cref="HistorianAdapterActor"/> wrapping the registered <see cref="IAlarmHistorianSink"/>.
///
/// Mirror of <c>WithOtOpcUaControlPlaneSingletons</c> for the driver role. Both must
/// be registered on the same <see cref="AkkaConfigurationBuilder"/> as the cluster
@@ -26,8 +44,8 @@ public static class ServiceCollectionExtensions
///
/// Wire from the fused Host's Program.cs when the node carries the <c>driver</c> role:
/// <code>
/// if (hasDriver)
/// ab.WithOtOpcUaRuntimeActors();
/// services.AddOtOpcUaRuntime();
/// services.AddAkka("otopcua", (ab, sp) => { ab.WithOtOpcUaClusterBootstrap(sp); if (hasDriver) ab.WithOtOpcUaRuntimeActors(); });
/// </code>
/// </summary>
public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder)
@@ -36,6 +54,8 @@ public static class ServiceCollectionExtensions
{
var dbFactory = resolver.GetService<IDbContextFactory<OtOpcUaConfigDbContext>>();
var roleInfo = resolver.GetService<IClusterRoleInfo>();
// Fallback to NullAlarmHistorianSink if AddOtOpcUaRuntime wasn't called (e.g., test harnesses).
var historianSink = resolver.GetService<IAlarmHistorianSink>() ?? NullAlarmHistorianSink.Instance;
var dbHealth = system.ActorOf(
DbHealthProbeActor.Props(dbFactory),
@@ -46,6 +66,11 @@ public static class ServiceCollectionExtensions
DriverHostActor.Props(dbFactory, roleInfo.LocalNode),
DriverHostActorName);
registry.Register<DriverHostActorKey>(driverHost);
var historian = system.ActorOf(
HistorianAdapterActor.Props(historianSink),
HistorianAdapterActorName);
registry.Register<HistorianAdapterActorKey>(historian);
});
return builder;
@@ -55,3 +80,4 @@ public static class ServiceCollectionExtensions
/// <summary>Marker key types used by <c>Akka.Hosting</c> to resolve runtime actors from the registry.</summary>
public sealed class DriverHostActorKey { }
public sealed class DbHealthProbeActorKey { }
public sealed class HistorianAdapterActorKey { }

View File

@@ -24,6 +24,7 @@
the reflective-load design.
-->
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
</ItemGroup>
<ItemGroup>