diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs index f048f2b7..180804e5 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs @@ -1,6 +1,10 @@ using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; @@ -23,22 +27,47 @@ public sealed class HistorianAdapterActor : ReceiveActor } private readonly IAlarmHistorianSink _sink; + private readonly NodeId? _localNode; private readonly ILoggingAdapter _log = Context.GetLogger(); + private IActorRef? _mediator; + + /// Cached local from the latest + /// snapshot (null = unknown until the first snapshot arrives, or no wired). The + /// durable sink enqueue in the handler is gated on this: only the + /// Primary historizes (default-write while unknown so single-node deploys + the boot window never drop + /// historization). + private RedundancyRole? _localRole; /// Creates the props for a HistorianAdapterActor instance. /// The alarm historian sink implementation, or null to use a null sink. + /// The local cluster node id, used to read this node's + /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null (the + /// default) leaves the role unknown ⇒ default-write (single-node deploys + tests). /// Props configured for creating a HistorianAdapterActor. - public static Props Props(IAlarmHistorianSink? sink = null) => - Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance)); + public static Props Props(IAlarmHistorianSink? sink = null, NodeId? localNode = null) => + Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance, localNode)); /// Initializes a new instance of the HistorianAdapterActor class. /// The alarm historian sink to forward enqueued events to. - public HistorianAdapterActor(IAlarmHistorianSink sink) + /// The local cluster node id, used to read this node's + /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null leaves + /// the role unknown ⇒ default-write (single-node deploys + tests). + public HistorianAdapterActor(IAlarmHistorianSink sink, NodeId? localNode = null) { _sink = sink; + _localNode = localNode; Receive(evt => { + // Warm-standby dedup (forward-looking): only the Primary historizes to the durable sink so a + // future per-node feeder writes exactly once. Default-write until told Secondary/Detached so + // single-node deploys + the boot window never drop historization. (Currently the actor has no + // production feeder — this is a defensive guard for when engine→historian wiring lands.) + if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) + { + return; + } + // Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously // inside EnqueueAsync (it returns once the row is committed), so we don't block on // network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state. @@ -46,6 +75,41 @@ public sealed class HistorianAdapterActor : ReceiveActor }); Receive(_ => Sender.Tell(_sink.GetStatus())); + + // Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart) + // cache this node's role so the AlarmHistorianEvent handler can gate the durable sink enqueue to the + // Primary. The PubSub Subscribe is acked back to Self (no-op below). + Receive(OnRedundancyStateChanged); + Receive(_ => { }); + } + + /// Subscribes to the redundancy-state topic so cluster role changes land as + /// and cache this node's role — the historian enqueue is gated to + /// the Primary so a future per-node feeder doesn't double-write across the warm-redundant pair. + protected override void PreStart() + { + _mediator = DistributedPubSub.Get(Context.System).Mediator; + _mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self)); + base.PreStart(); + } + + /// Caches this node's from a cluster redundancy snapshot so the + /// handler can gate the durable sink enqueue to the Primary. A snapshot + /// that doesn't mention (or no local node wired) leaves the cached role unchanged + /// ⇒ default-write. Mirrors 's handler. + /// The cluster redundancy snapshot. + private void OnRedundancyStateChanged(RedundancyStateChanged msg) + { + if (_localNode is null) + { + return; + } + + var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); + if (local is not null) + { + _localRole = local.Role; + } } private async Task EnqueueAsync(AlarmHistorianEvent evt) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 93301836..eea9f934 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -143,7 +143,7 @@ public static class ServiceCollectionExtensions registry.Register(driverHost); var historian = system.ActorOf( - HistorianAdapterActor.Props(historianSink), + HistorianAdapterActor.Props(historianSink, roleInfo.LocalNode), HistorianAdapterActorName); registry.Register(historian); }); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs new file mode 100644 index 00000000..72115b77 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs @@ -0,0 +1,138 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian; + +/// +/// TestKit coverage for 's Primary-only historization gate. +/// The actor caches this node's from the redundancy-state +/// topic and SKIPS the sink enqueue when the local node is Secondary/Detached so a future +/// per-node feeder writes exactly once across the warm-redundant pair. Unknown/null role +/// default-writes (single-node deploys + the boot window must never silently drop historization). +/// +public sealed class HistorianAdapterActorTests : RuntimeActorTestBase +{ + /// The local node id the gating tests construct the adapter with. + private static readonly NodeId LocalNode = new("node-A"); + + /// A short window we allow the fire-and-forget enqueue to land within. + private static readonly TimeSpan Settle = TimeSpan.FromMilliseconds(500); + + /// Thread-safe fake sink that records every call. + private sealed class RecordingSink : IAlarmHistorianSink + { + private int _count; + + /// The number of calls observed so far. + public int EnqueueCount => Volatile.Read(ref _count); + + /// + public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _count); + return Task.CompletedTask; + } + + /// + public HistorianSinkStatus GetStatus() => new( + QueueDepth: 0, + DeadLetterDepth: 0, + LastDrainUtc: null, + LastSuccessUtc: null, + LastError: null, + DrainState: HistorianDrainState.Idle); + } + + /// Builds a minimal for the gate tests. + private static AlarmHistorianEvent SampleEvent() => new( + AlarmId: "alm-1", + EquipmentPath: "Area/Line/Equip", + AlarmName: "HiHi", + AlarmTypeName: "LimitAlarm", + Severity: AlarmSeverity.High, + EventKind: "Activated", + Message: "level high", + User: "system", + Comment: null, + TimestampUtc: DateTime.UtcNow); + + /// Tell a snapshot marking + /// with so the gate observes the local role. + private static void TellRedundancyRole(IActorRef actor, RedundancyRole role) => + actor.Tell(new RedundancyStateChanged( + new[] + { + new NodeRedundancyState( + NodeId: LocalNode, + Role: role, + IsClusterLeader: role == RedundancyRole.Primary, + IsRoleLeaderForDriver: role == RedundancyRole.Primary, + AsOfUtc: DateTime.UtcNow), + }, + CorrelationId.NewId())); + + /// Default-write (T1): before any redundancy snapshot — the boot window and the steady + /// state for single-node deploys — the adapter MUST historize. Constructed WITH a localNode but + /// no snapshot sent, so the cached role is unknown ⇒ default-write. + [Fact] + public void Default_before_redundancy_state_historizes() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + actor.Tell(SampleEvent()); + + AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle); + } + + /// Secondary suppression (T2): when the cached local role is Secondary, the adapter MUST + /// NOT enqueue to the durable sink (the Primary writes the single copy). + [Fact] + public void Secondary_node_does_not_historize() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + TellRedundancyRole(actor, RedundancyRole.Secondary); + actor.Tell(SampleEvent()); + + // Give the (suppressed) fire-and-forget a stable window, then assert nothing landed. + ExpectNoMsg(Settle); + sink.EnqueueCount.ShouldBe(0); + } + + /// Detached suppression (T3): a Detached node likewise MUST NOT historize. + [Fact] + public void Detached_node_does_not_historize() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + TellRedundancyRole(actor, RedundancyRole.Detached); + actor.Tell(SampleEvent()); + + ExpectNoMsg(Settle); + sink.EnqueueCount.ShouldBe(0); + } + + /// Primary writes (T4): when the cached local role is Primary, the adapter historizes as + /// normal (this is the single copy the durable sink sees). + [Fact] + public void Primary_node_historizes() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + TellRedundancyRole(actor, RedundancyRole.Primary); + actor.Tell(SampleEvent()); + + AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle); + } +}