diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs
index b4faddb..69147d1 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs
@@ -38,6 +38,9 @@ builder.Host.UseSerilog((ctx, lc) => lc
builder.Services.AddOtOpcUaConfigDb(builder.Configuration);
builder.Services.AddOtOpcUaCluster(builder.Configuration);
+if (hasDriver)
+ builder.Services.AddOtOpcUaRuntime();
+
// Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder
// from inside the configurator lambda. AddAkka spins the ActorSystem at host start.
builder.Services.AddAkka("otopcua", (ab, sp) =>
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs
index 7f5315d..2167426 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs
@@ -1,32 +1,58 @@
using Akka.Actor;
using Akka.Event;
+using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
///
-/// Wraps the named-pipe IPC to the Wonderware historian sidecar with a store-and-forward
-/// SQLite buffer for pipe outages. Engine wiring (named-pipe client + SqliteStoreAndForwardSink)
-/// is staged for follow-up F11.
+/// Thin actor wrapper around . Engine code (ScriptedAlarmActor,
+/// Galaxy native alarm bridge, AB CIP ALMD reader) tells s to this
+/// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register
+/// against IAlarmHistorianSink; the sink owns the
+/// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond
+/// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking
+/// them on disk I/O or pipe handshakes.
+///
+/// Query queue depth + drain health via .
///
public sealed class HistorianAdapterActor : ReceiveActor
{
- public sealed record HistoryRow(string Source, string AttributeId, object? Value, DateTime TimestampUtc);
-
- private readonly ILoggingAdapter _log = Context.GetLogger();
- private int _buffered;
-
- public int BufferedCount => _buffered;
-
- public static Props Props() => Akka.Actor.Props.Create(() => new HistorianAdapterActor());
-
- public HistorianAdapterActor()
+ public sealed record GetStatus
{
- Receive(row =>
+ public static readonly GetStatus Instance = new();
+ }
+
+ private readonly IAlarmHistorianSink _sink;
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+
+ public static Props Props(IAlarmHistorianSink? sink = null) =>
+ Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance));
+
+ public HistorianAdapterActor(IAlarmHistorianSink sink)
+ {
+ _sink = sink;
+
+ Receive(evt =>
{
- // F11: dispatch to named-pipe sink; on disconnect → buffer in SQLite.
- Interlocked.Increment(ref _buffered);
- _log.Debug("Historian: buffered row for {Source}/{Attr} (sink wiring staged for F11)",
- row.Source, row.AttributeId);
+ // Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously
+ // inside EnqueueAsync (it returns once the row is committed), so we don't block on
+ // network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state.
+ _ = EnqueueAsync(evt);
});
+
+ Receive(_ => Sender.Tell(_sink.GetStatus()));
+ }
+
+ private async Task EnqueueAsync(AlarmHistorianEvent evt)
+ {
+ try
+ {
+ await _sink.EnqueueAsync(evt, CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ _log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}",
+ evt.AlarmId, evt.TimestampUtc);
+ }
}
}
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
index bf410b8..8dd17ac 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
@@ -1,10 +1,14 @@
using Akka.Actor;
using Akka.Hosting;
using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Configuration;
+using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
+using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
namespace ZB.MOM.WW.OtOpcUa.Runtime;
@@ -14,11 +18,25 @@ public static class ServiceCollectionExtensions
public const string DriverHostActorName = "driver-host";
public const string DbHealthProbeActorName = "db-health";
+ public const string HistorianAdapterActorName = "historian-adapter";
+
+ ///
+ /// Registers shared runtime services. Currently binds
+ /// to as the default; production deployments
+ /// override this with SqliteStoreAndForwardSink wrapping WonderwareHistorianClient.
+ /// Call this BEFORE AddAkka.
+ ///
+ public static IServiceCollection AddOtOpcUaRuntime(this IServiceCollection services)
+ {
+ services.TryAddSingleton(NullAlarmHistorianSink.Instance);
+ return services;
+ }
///
/// Spawns the per-node driver-role actors on the host's :
- /// (one per node) and
- /// (consumed by the health endpoint + redundancy calc).
+ /// (one per node),
+ /// (consumed by the health endpoint + redundancy calc), and
+ /// wrapping the registered .
///
/// Mirror of WithOtOpcUaControlPlaneSingletons for the driver role. Both must
/// be registered on the same as the cluster
@@ -26,8 +44,8 @@ public static class ServiceCollectionExtensions
///
/// Wire from the fused Host's Program.cs when the node carries the driver role:
///
- /// if (hasDriver)
- /// ab.WithOtOpcUaRuntimeActors();
+ /// services.AddOtOpcUaRuntime();
+ /// services.AddAkka("otopcua", (ab, sp) => { ab.WithOtOpcUaClusterBootstrap(sp); if (hasDriver) ab.WithOtOpcUaRuntimeActors(); });
///
///
public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder)
@@ -36,6 +54,8 @@ public static class ServiceCollectionExtensions
{
var dbFactory = resolver.GetService>();
var roleInfo = resolver.GetService();
+ // Fallback to NullAlarmHistorianSink if AddOtOpcUaRuntime wasn't called (e.g., test harnesses).
+ var historianSink = resolver.GetService() ?? NullAlarmHistorianSink.Instance;
var dbHealth = system.ActorOf(
DbHealthProbeActor.Props(dbFactory),
@@ -46,6 +66,11 @@ public static class ServiceCollectionExtensions
DriverHostActor.Props(dbFactory, roleInfo.LocalNode),
DriverHostActorName);
registry.Register(driverHost);
+
+ var historian = system.ActorOf(
+ HistorianAdapterActor.Props(historianSink),
+ HistorianAdapterActorName);
+ registry.Register(historian);
});
return builder;
@@ -55,3 +80,4 @@ public static class ServiceCollectionExtensions
/// Marker key types used by Akka.Hosting to resolve runtime actors from the registry.
public sealed class DriverHostActorKey { }
public sealed class DbHealthProbeActorKey { }
+public sealed class HistorianAdapterActorKey { }
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
index 2e4ca60..c051c39 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
@@ -24,6 +24,7 @@
the reflective-load design.
-->
+
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs
index 5b2e618..4ab7a15 100644
--- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs
@@ -1,9 +1,12 @@
+using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
@@ -61,14 +64,63 @@ public sealed class HealthProbeActorTests : RuntimeActorTestBase
}
[Fact]
- public void HistorianAdapterActor_buffers_rows()
+ public void HistorianAdapterActor_forwards_events_to_injected_sink()
{
- var actor = Sys.ActorOf(HistorianAdapterActor.Props());
- for (var i = 0; i < 5; i++)
- actor.Tell(new HistorianAdapterActor.HistoryRow("driver-a", $"tag-{i}", i, DateTime.UtcNow));
+ var sink = new RecordingSink();
+ var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
- ExpectNoMsg(TimeSpan.FromMilliseconds(100));
- // No direct readback of the count from a sealed actor — assert by Ask of a self-probe later
- // when the engine wiring lands (F11). For now this asserts the actor accepts the contract.
+ for (var i = 0; i < 5; i++)
+ actor.Tell(new AlarmHistorianEvent(
+ AlarmId: $"alm-{i}",
+ EquipmentPath: "Plant/LineA",
+ AlarmName: $"Alarm{i}",
+ AlarmTypeName: "LimitAlarm",
+ Severity: AlarmSeverity.High,
+ EventKind: "Activated",
+ Message: $"Test alarm {i}",
+ User: "system",
+ Comment: null,
+ TimestampUtc: DateTime.UtcNow));
+
+ AwaitCondition(() => sink.Enqueued.Count == 5, TimeSpan.FromSeconds(2));
+ sink.Enqueued.Select(e => e.AlarmId).OrderBy(s => s).ShouldBe(
+ new[] { "alm-0", "alm-1", "alm-2", "alm-3", "alm-4" });
+ }
+
+ [Fact]
+ public async Task HistorianAdapterActor_returns_sink_status_via_GetStatus()
+ {
+ var sink = new RecordingSink();
+ var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
+
+ actor.Tell(new AlarmHistorianEvent(
+ "alm-x", "Plant/LineB", "OffNormal", "OffNormalAlarm",
+ AlarmSeverity.Low, "Activated", "msg", "system", null, DateTime.UtcNow));
+ AwaitCondition(() => sink.Enqueued.Count == 1, TimeSpan.FromSeconds(2));
+
+ var status = await actor.Ask(
+ HistorianAdapterActor.GetStatus.Instance, TimeSpan.FromSeconds(2));
+
+ status.QueueDepth.ShouldBe(1);
+ status.DrainState.ShouldBe(HistorianDrainState.Idle);
+ }
+
+ private sealed class RecordingSink : IAlarmHistorianSink
+ {
+ public ConcurrentBag Enqueued { get; } = [];
+
+ public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
+ {
+ Enqueued.Add(evt);
+ return Task.CompletedTask;
+ }
+
+ public HistorianSinkStatus GetStatus() => new(
+ QueueDepth: Enqueued.Count,
+ DeadLetterDepth: 0,
+ LastDrainUtc: null,
+ LastSuccessUtc: null,
+ LastError: null,
+ DrainState: HistorianDrainState.Idle);
}
}
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs
index 33907eb..0d5d2a1 100644
--- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs
@@ -46,11 +46,14 @@ public sealed class ServiceCollectionExtensionsTests
{
var driverHost = host.Services.GetRequiredService>();
var dbHealth = host.Services.GetRequiredService>();
+ var historian = host.Services.GetRequiredService>();
driverHost.ActorRef.ShouldNotBeNull();
dbHealth.ActorRef.ShouldNotBeNull();
+ historian.ActorRef.ShouldNotBeNull();
driverHost.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DriverHostActorName);
dbHealth.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DbHealthProbeActorName);
+ historian.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.HistorianAdapterActorName);
}
finally
{