diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs index 180804e5..6dd62c3c 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs @@ -1,10 +1,13 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; @@ -57,39 +60,83 @@ public sealed class HistorianAdapterActor : ReceiveActor _sink = sink; _localNode = localNode; - Receive(evt => - { - // Warm-standby dedup (forward-looking): only the Primary historizes to the durable sink so a - // future per-node feeder writes exactly once. Default-write until told Secondary/Detached so - // single-node deploys + the boot window never drop historization. (Currently the actor has no - // production feeder — this is a defensive guard for when engine→historian wiring lands.) - if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) - { - return; - } + // A direct AlarmHistorianEvent source (kept for a future engine→historian path) goes through the + // same Primary gate as the alerts-topic feed below. + Receive(Historize); - // Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously - // inside EnqueueAsync (it returns once the row is committed), so we don't block on - // network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state. - _ = EnqueueAsync(evt); - }); + // Live alarm transitions arrive off the cluster `alerts` DPS topic (subscribed in PreStart). The + // Primary ScriptedAlarmHostActor publishes each transition ONCE, but DistributedPubSub fans that + // single message to EVERY node's subscriber — including BOTH central nodes' historian adapters. The + // Primary gate in Historize keeps only the Primary writing ⇒ exactly-once across the warm pair. + Receive(t => Historize(Translate(t))); Receive(_ => Sender.Tell(_sink.GetStatus())); // Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart) - // cache this node's role so the AlarmHistorianEvent handler can gate the durable sink enqueue to the - // Primary. The PubSub Subscribe is acked back to Self (no-op below). + // cache this node's role so the historize gate can scope the durable sink enqueue to the Primary. + // The PubSub Subscribe acks (redundancy-state + alerts) are acked back to Self (no-op below). Receive(OnRedundancyStateChanged); Receive(_ => { }); } - /// Subscribes to the redundancy-state topic so cluster role changes land as - /// and cache this node's role — the historian enqueue is gated to - /// the Primary so a future per-node feeder doesn't double-write across the warm-redundant pair. + /// Gates a historian event to the Primary then enqueues it fire-and-forget. Warm-standby + /// dedup: only the Primary historizes to the durable sink so the per-node alerts feed writes exactly + /// once. Default-write until told Secondary/Detached so single-node deploys + the boot window never + /// drop historization. Fire-and-forget because persists to + /// local SQLite synchronously inside (it returns once the row is committed), + /// so we don't block the mailbox on network/pipe latency; failures surface via . + /// The historian event to gate + enqueue. + private void Historize(AlarmHistorianEvent evt) + { + if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) + { + return; + } + + _ = EnqueueAsync(evt); + } + + /// Translates a live (the alerts-topic shape) into the + /// historian's . AlarmTypeName is null-coalesced to + /// "AlarmCondition": during a rolling restart Akka's JSON serializer applies the CLR default (null) to + /// an old-format message's AlarmTypeName rather than the record's call-site default, and the + /// historian must never store a null alarm type. + /// The live transition published on the alerts topic. + /// The translated historian event. + private static AlarmHistorianEvent Translate(AlarmTransitionEvent t) => new( + AlarmId: t.AlarmId, + EquipmentPath: t.EquipmentPath, + AlarmName: t.AlarmName, + AlarmTypeName: string.IsNullOrEmpty(t.AlarmTypeName) ? "AlarmCondition" : t.AlarmTypeName, + Severity: ToSeverity(t.Severity), + EventKind: t.TransitionKind, + Message: t.Message, + User: t.User, + Comment: t.Comment, + TimestampUtc: t.TimestampUtc); + + /// Maps the OPC UA 1–1000 numeric severity carried on a transition back to the coarse + /// , inverting ScriptedAlarmHostActor.SeverityToInt's bucket ceilings + /// (Low=250, Medium=500, High=750, Critical=1000). + /// The OPC UA 1–1000 numeric severity. + /// The coarse alarm severity bucket. + private static AlarmSeverity ToSeverity(int severity) => severity switch + { + <= 250 => AlarmSeverity.Low, + <= 500 => AlarmSeverity.Medium, + <= 750 => AlarmSeverity.High, + _ => AlarmSeverity.Critical, + }; + + /// Subscribes to the redundancy-state topic (so cluster role changes land as + /// and cache this node's role — the historize enqueue is gated to + /// the Primary so the alerts feed doesn't double-write across the warm-redundant pair) and to the + /// alerts topic (so live s are translated + historized). protected override void PreStart() { _mediator = DistributedPubSub.Get(Context.System).Mediator; _mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self)); + _mediator.Tell(new Subscribe(ScriptedAlarmHostActor.AlertsTopic, Self)); base.PreStart(); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs index 0356fc8e..34387e4f 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/HistorianAdapterActorTests.cs @@ -1,6 +1,7 @@ using Akka.Actor; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; @@ -29,13 +30,26 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase private sealed class RecordingSink : IAlarmHistorianSink { private int _count; + private readonly object _lock = new(); + private readonly List _events = new(); /// The number of calls observed so far. public int EnqueueCount => Volatile.Read(ref _count); + /// A snapshot of every event enqueued so far (in arrival order). + public IReadOnlyList Events + { + get { lock (_lock) { return _events.ToArray(); } } + } + /// public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) { + lock (_lock) + { + _events.Add(evt); + } + Interlocked.Increment(ref _count); return Task.CompletedTask; } @@ -164,4 +178,118 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase // Local role is still unknown ⇒ default-historize path: sink must record exactly one enqueue. AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle); } + + /// Builds an (the shape published on the alerts + /// DPS topic) for the translate tests, with overridable severity / type / comment / kind. + private static AlarmTransitionEvent SampleTransition( + int severity = 750, + string alarmTypeName = "LimitAlarm", + string? comment = "note", + string transitionKind = "Activated") => new( + AlarmId: "alm-9", + EquipmentPath: "Area/Line/Equip", + AlarmName: "HiHi", + TransitionKind: transitionKind, + Severity: severity, + Message: "level high", + User: "operator1", + TimestampUtc: DateTime.UtcNow, + AlarmTypeName: alarmTypeName, + Comment: comment); + + /// Alerts translate (T6): an off the alerts topic + /// is translated to an and historized by default (unknown role). + /// The translation must carry AlarmId, AlarmTypeName, EventKind (← TransitionKind), Severity bucket, + /// and Comment through faithfully. + [Fact] + public void Alerts_transition_is_historized_by_default() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); + + actor.Tell(SampleTransition()); + + AwaitAssert( + () => + { + sink.EnqueueCount.ShouldBe(1); + var e = sink.Events.ShouldHaveSingleItem(); + e.AlarmId.ShouldBe("alm-9"); + e.AlarmTypeName.ShouldBe("LimitAlarm"); + e.EventKind.ShouldBe("Activated"); + e.Severity.ShouldBe(AlarmSeverity.High); + e.Comment.ShouldBe("note"); + }, + Settle); + } + + /// Secondary suppression for alerts (T7): a Secondary node must NOT historize a transition + /// off the alerts topic — the Primary writes the single copy (DistributedPubSub fans the + /// single publish to BOTH nodes' historian adapters, so the gate is what makes it exactly-once). + [Fact] + public void Secondary_node_does_not_historize_alerts_transition() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + TellRedundancyRole(actor, RedundancyRole.Secondary); + actor.Tell(SampleTransition()); + + ExpectNoMsg(Settle); + sink.EnqueueCount.ShouldBe(0); + } + + /// Primary writes alerts (T8): a Primary node historizes a transition off the + /// alerts topic (the single copy the durable sink sees). + [Fact] + public void Primary_node_historizes_alerts_transition() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode)); + + TellRedundancyRole(actor, RedundancyRole.Primary); + actor.Tell(SampleTransition()); + + AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle); + } + + /// Severity buckets (T9): the OPC UA 1–1000 numeric severity on the transition maps onto + /// the coarse at the same ceilings ScriptedAlarmHostActor.SeverityToInt + /// emits (Low≤250, Medium≤500, High≤750, Critical otherwise). Driven end-to-end through the enqueue. + [Theory] + [InlineData(250, AlarmSeverity.Low)] + [InlineData(251, AlarmSeverity.Medium)] + [InlineData(500, AlarmSeverity.Medium)] + [InlineData(750, AlarmSeverity.High)] + [InlineData(751, AlarmSeverity.Critical)] + [InlineData(1000, AlarmSeverity.Critical)] + public void Alerts_transition_severity_buckets(int severity, AlarmSeverity expected) + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); + + actor.Tell(SampleTransition(severity: severity)); + + AwaitAssert( + () => sink.Events.ShouldHaveSingleItem().Severity.ShouldBe(expected), + Settle); + } + + /// Rolling-restart null default (T10): an old-format transition deserialized by Akka's JSON + /// serializer applies the CLR default (null) to AlarmTypeName rather than the record's + /// "AlarmCondition" call-site default. The translation must null-coalesce that back to + /// "AlarmCondition" so the historian never stores a null alarm type. Forced here by constructing the + /// transition with AlarmTypeName: null! (simulating the post-deserialization shape). + [Fact] + public void Alerts_transition_with_missing_AlarmTypeName_defaults() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink)); + + actor.Tell(SampleTransition(alarmTypeName: null!)); + + AwaitAssert( + () => sink.Events.ShouldHaveSingleItem().AlarmTypeName.ShouldBe("AlarmCondition"), + Settle); + } }