From 686138123fef6dc47d81bb99b14ab7790604523d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 07:18:08 -0400 Subject: [PATCH] =?UTF-8?q?feat(runtime):=20F11=20=E2=80=94=20HistorianAda?= =?UTF-8?q?pterActor=20wired=20to=20IAlarmHistorianSink?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reshapes the placeholder buffered-counter actor into a thin fire-and-forget bridge over the existing IAlarmHistorianSink contract. Default sink is NullAlarmHistorianSink; production deployments override the DI binding to SqliteStoreAndForwardSink wrapping WonderwareHistorianClient (the v1 components in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware* are reused verbatim — actor is just a mailbox-friendly entry point). - HistorianAdapterActor.Props(IAlarmHistorianSink?) — null defaults to NullAlarmHistorianSink - Receive: fire-and-forget sink.EnqueueAsync - Receive: returns sink.GetStatus() (queue depth + drain state) - ServiceCollectionExtensions.AddOtOpcUaRuntime registers the default sink - WithOtOpcUaRuntimeActors spawns the actor + registers HistorianAdapterActorKey - Program.cs calls AddOtOpcUaRuntime when hasDriver Tests: 2 new (forward-to-sink + GetStatus). Runtime suite 17 → 18. --- src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs | 3 + .../Historian/HistorianAdapterActor.cs | 62 ++++++++++++----- .../ServiceCollectionExtensions.cs | 34 ++++++++-- .../ZB.MOM.WW.OtOpcUa.Runtime.csproj | 1 + .../Health/HealthProbeActorTests.cs | 66 +++++++++++++++++-- .../ServiceCollectionExtensionsTests.cs | 3 + 6 files changed, 140 insertions(+), 29 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs index b4faddb..69147d1 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs @@ -38,6 +38,9 @@ builder.Host.UseSerilog((ctx, lc) => lc builder.Services.AddOtOpcUaConfigDb(builder.Configuration); builder.Services.AddOtOpcUaCluster(builder.Configuration); +if (hasDriver) + builder.Services.AddOtOpcUaRuntime(); + // Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder // from inside the configurator lambda. AddAkka spins the ActorSystem at host start. builder.Services.AddAkka("otopcua", (ab, sp) => diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs index 7f5315d..2167426 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs @@ -1,32 +1,58 @@ using Akka.Actor; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; /// -/// Wraps the named-pipe IPC to the Wonderware historian sidecar with a store-and-forward -/// SQLite buffer for pipe outages. Engine wiring (named-pipe client + SqliteStoreAndForwardSink) -/// is staged for follow-up F11. +/// Thin actor wrapper around . Engine code (ScriptedAlarmActor, +/// Galaxy native alarm bridge, AB CIP ALMD reader) tells s to this +/// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register +/// against IAlarmHistorianSink; the sink owns the +/// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond +/// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking +/// them on disk I/O or pipe handshakes. +/// +/// Query queue depth + drain health via . /// public sealed class HistorianAdapterActor : ReceiveActor { - public sealed record HistoryRow(string Source, string AttributeId, object? Value, DateTime TimestampUtc); - - private readonly ILoggingAdapter _log = Context.GetLogger(); - private int _buffered; - - public int BufferedCount => _buffered; - - public static Props Props() => Akka.Actor.Props.Create(() => new HistorianAdapterActor()); - - public HistorianAdapterActor() + public sealed record GetStatus { - Receive(row => + public static readonly GetStatus Instance = new(); + } + + private readonly IAlarmHistorianSink _sink; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public static Props Props(IAlarmHistorianSink? sink = null) => + Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance)); + + public HistorianAdapterActor(IAlarmHistorianSink sink) + { + _sink = sink; + + Receive(evt => { - // F11: dispatch to named-pipe sink; on disconnect → buffer in SQLite. - Interlocked.Increment(ref _buffered); - _log.Debug("Historian: buffered row for {Source}/{Attr} (sink wiring staged for F11)", - row.Source, row.AttributeId); + // Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously + // inside EnqueueAsync (it returns once the row is committed), so we don't block on + // network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state. + _ = EnqueueAsync(evt); }); + + Receive(_ => Sender.Tell(_sink.GetStatus())); + } + + private async Task EnqueueAsync(AlarmHistorianEvent evt) + { + try + { + await _sink.EnqueueAsync(evt, CancellationToken.None); + } + catch (Exception ex) + { + _log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}", + evt.AlarmId, evt.TimestampUtc); + } } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index bf410b8..8dd17ac 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -1,10 +1,14 @@ using Akka.Actor; using Akka.Hosting; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; +using ZB.MOM.WW.OtOpcUa.Runtime.Historian; namespace ZB.MOM.WW.OtOpcUa.Runtime; @@ -14,11 +18,25 @@ public static class ServiceCollectionExtensions public const string DriverHostActorName = "driver-host"; public const string DbHealthProbeActorName = "db-health"; + public const string HistorianAdapterActorName = "historian-adapter"; + + /// + /// Registers shared runtime services. Currently binds + /// to as the default; production deployments + /// override this with SqliteStoreAndForwardSink wrapping WonderwareHistorianClient. + /// Call this BEFORE AddAkka. + /// + public static IServiceCollection AddOtOpcUaRuntime(this IServiceCollection services) + { + services.TryAddSingleton(NullAlarmHistorianSink.Instance); + return services; + } /// /// Spawns the per-node driver-role actors on the host's : - /// (one per node) and - /// (consumed by the health endpoint + redundancy calc). + /// (one per node), + /// (consumed by the health endpoint + redundancy calc), and + /// wrapping the registered . /// /// Mirror of WithOtOpcUaControlPlaneSingletons for the driver role. Both must /// be registered on the same as the cluster @@ -26,8 +44,8 @@ public static class ServiceCollectionExtensions /// /// Wire from the fused Host's Program.cs when the node carries the driver role: /// - /// if (hasDriver) - /// ab.WithOtOpcUaRuntimeActors(); + /// services.AddOtOpcUaRuntime(); + /// services.AddAkka("otopcua", (ab, sp) => { ab.WithOtOpcUaClusterBootstrap(sp); if (hasDriver) ab.WithOtOpcUaRuntimeActors(); }); /// /// public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder) @@ -36,6 +54,8 @@ public static class ServiceCollectionExtensions { var dbFactory = resolver.GetService>(); var roleInfo = resolver.GetService(); + // Fallback to NullAlarmHistorianSink if AddOtOpcUaRuntime wasn't called (e.g., test harnesses). + var historianSink = resolver.GetService() ?? NullAlarmHistorianSink.Instance; var dbHealth = system.ActorOf( DbHealthProbeActor.Props(dbFactory), @@ -46,6 +66,11 @@ public static class ServiceCollectionExtensions DriverHostActor.Props(dbFactory, roleInfo.LocalNode), DriverHostActorName); registry.Register(driverHost); + + var historian = system.ActorOf( + HistorianAdapterActor.Props(historianSink), + HistorianAdapterActorName); + registry.Register(historian); }); return builder; @@ -55,3 +80,4 @@ public static class ServiceCollectionExtensions /// Marker key types used by Akka.Hosting to resolve runtime actors from the registry. public sealed class DriverHostActorKey { } public sealed class DbHealthProbeActorKey { } +public sealed class HistorianAdapterActorKey { } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj index 2e4ca60..c051c39 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj @@ -24,6 +24,7 @@ the reflective-load design. --> + diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs index 5b2e618..4ab7a15 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs @@ -1,9 +1,12 @@ +using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using Akka.Actor; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.Historian; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; @@ -61,14 +64,63 @@ public sealed class HealthProbeActorTests : RuntimeActorTestBase } [Fact] - public void HistorianAdapterActor_buffers_rows() + public void HistorianAdapterActor_forwards_events_to_injected_sink() { - var actor = Sys.ActorOf(HistorianAdapterActor.Props()); - for (var i = 0; i < 5; i++) - actor.Tell(new HistorianAdapterActor.HistoryRow("driver-a", $"tag-{i}", i, DateTime.UtcNow)); + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - // No direct readback of the count from a sealed actor — assert by Ask of a self-probe later - // when the engine wiring lands (F11). For now this asserts the actor accepts the contract. + for (var i = 0; i < 5; i++) + actor.Tell(new AlarmHistorianEvent( + AlarmId: $"alm-{i}", + EquipmentPath: "Plant/LineA", + AlarmName: $"Alarm{i}", + AlarmTypeName: "LimitAlarm", + Severity: AlarmSeverity.High, + EventKind: "Activated", + Message: $"Test alarm {i}", + User: "system", + Comment: null, + TimestampUtc: DateTime.UtcNow)); + + AwaitCondition(() => sink.Enqueued.Count == 5, TimeSpan.FromSeconds(2)); + sink.Enqueued.Select(e => e.AlarmId).OrderBy(s => s).ShouldBe( + new[] { "alm-0", "alm-1", "alm-2", "alm-3", "alm-4" }); + } + + [Fact] + public async Task HistorianAdapterActor_returns_sink_status_via_GetStatus() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); + + actor.Tell(new AlarmHistorianEvent( + "alm-x", "Plant/LineB", "OffNormal", "OffNormalAlarm", + AlarmSeverity.Low, "Activated", "msg", "system", null, DateTime.UtcNow)); + AwaitCondition(() => sink.Enqueued.Count == 1, TimeSpan.FromSeconds(2)); + + var status = await actor.Ask( + HistorianAdapterActor.GetStatus.Instance, TimeSpan.FromSeconds(2)); + + status.QueueDepth.ShouldBe(1); + status.DrainState.ShouldBe(HistorianDrainState.Idle); + } + + private sealed class RecordingSink : IAlarmHistorianSink + { + public ConcurrentBag Enqueued { get; } = []; + + public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) + { + Enqueued.Add(evt); + return Task.CompletedTask; + } + + public HistorianSinkStatus GetStatus() => new( + QueueDepth: Enqueued.Count, + DeadLetterDepth: 0, + LastDrainUtc: null, + LastSuccessUtc: null, + LastError: null, + DrainState: HistorianDrainState.Idle); } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs index 33907eb..0d5d2a1 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs @@ -46,11 +46,14 @@ public sealed class ServiceCollectionExtensionsTests { var driverHost = host.Services.GetRequiredService>(); var dbHealth = host.Services.GetRequiredService>(); + var historian = host.Services.GetRequiredService>(); driverHost.ActorRef.ShouldNotBeNull(); dbHealth.ActorRef.ShouldNotBeNull(); + historian.ActorRef.ShouldNotBeNull(); driverHost.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DriverHostActorName); dbHealth.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DbHealthProbeActorName); + historian.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.HistorianAdapterActorName); } finally {