332 lines
14 KiB
C#
332 lines
14 KiB
C#
using Akka.Actor;
|
||
using Shouldly;
|
||
using Xunit;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||
|
||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian;
|
||
|
||
/// <summary>
|
||
/// TestKit coverage for <see cref="HistorianAdapterActor"/>'s Primary-only historization gate.
|
||
/// The actor caches this node's <see cref="RedundancyRole"/> from the <c>redundancy-state</c>
|
||
/// topic and SKIPS the sink enqueue when the local node is Secondary/Detached so a future
|
||
/// per-node feeder writes exactly once across the warm-redundant pair. Unknown/null role
|
||
/// default-writes (single-node deploys + the boot window must never silently drop historization).
|
||
/// </summary>
|
||
public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
|
||
{
|
||
/// <summary>The local node id the gating tests construct the adapter with.</summary>
|
||
private static readonly NodeId LocalNode = new("node-A");
|
||
|
||
/// <summary>A short window we allow the fire-and-forget enqueue to land within.</summary>
|
||
private static readonly TimeSpan Settle = TimeSpan.FromMilliseconds(500);
|
||
|
||
/// <summary>Thread-safe fake sink that records every <see cref="EnqueueAsync"/> call.</summary>
|
||
private sealed class RecordingSink : IAlarmHistorianSink
|
||
{
|
||
private readonly object _lock = new();
|
||
private readonly List<AlarmHistorianEvent> _events = new();
|
||
|
||
/// <summary>The number of <see cref="EnqueueAsync"/> calls observed so far.</summary>
|
||
public int EnqueueCount { get { lock (_lock) { return _events.Count; } } }
|
||
|
||
/// <summary>A snapshot of every event enqueued so far (in arrival order).</summary>
|
||
public IReadOnlyList<AlarmHistorianEvent> Events
|
||
{
|
||
get { lock (_lock) { return _events.ToArray(); } }
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||
{
|
||
lock (_lock)
|
||
{
|
||
_events.Add(evt);
|
||
}
|
||
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
public HistorianSinkStatus GetStatus() => new(
|
||
QueueDepth: 0,
|
||
DeadLetterDepth: 0,
|
||
LastDrainUtc: null,
|
||
LastSuccessUtc: null,
|
||
LastError: null,
|
||
DrainState: HistorianDrainState.Idle);
|
||
}
|
||
|
||
/// <summary>Builds a minimal <see cref="AlarmHistorianEvent"/> for the gate tests.</summary>
|
||
private static AlarmHistorianEvent SampleEvent() => new(
|
||
AlarmId: "alm-1",
|
||
EquipmentPath: "Area/Line/Equip",
|
||
AlarmName: "HiHi",
|
||
AlarmTypeName: "LimitAlarm",
|
||
Severity: AlarmSeverity.High,
|
||
EventKind: "Activated",
|
||
Message: "level high",
|
||
User: "system",
|
||
Comment: null,
|
||
TimestampUtc: DateTime.UtcNow);
|
||
|
||
/// <summary>Tell <paramref name="actor"/> a <see cref="RedundancyStateChanged"/> snapshot marking
|
||
/// <see cref="LocalNode"/> with <paramref name="role"/> so the gate observes the local role.</summary>
|
||
private static void TellRedundancyRole(IActorRef actor, RedundancyRole role) =>
|
||
actor.Tell(new RedundancyStateChanged(
|
||
new[]
|
||
{
|
||
new NodeRedundancyState(
|
||
NodeId: LocalNode,
|
||
Role: role,
|
||
IsClusterLeader: role == RedundancyRole.Primary,
|
||
IsRoleLeaderForDriver: role == RedundancyRole.Primary,
|
||
AsOfUtc: DateTime.UtcNow),
|
||
},
|
||
CorrelationId.NewId()));
|
||
|
||
/// <summary>Default-write (T1): before any redundancy snapshot — the boot window and the steady
|
||
/// state for single-node deploys — the adapter MUST historize. Constructed WITH a localNode but
|
||
/// no snapshot sent, so the cached role is unknown ⇒ default-write.</summary>
|
||
[Fact]
|
||
public void Default_before_redundancy_state_historizes()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
actor.Tell(SampleEvent());
|
||
|
||
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||
}
|
||
|
||
/// <summary>Secondary suppression (T2): when the cached local role is Secondary, the adapter MUST
|
||
/// NOT enqueue to the durable sink (the Primary writes the single copy).</summary>
|
||
[Fact]
|
||
public void Secondary_node_does_not_historize()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Secondary);
|
||
actor.Tell(SampleEvent());
|
||
|
||
// Give the (suppressed) fire-and-forget a stable window, then assert nothing landed.
|
||
ExpectNoMsg(Settle);
|
||
sink.EnqueueCount.ShouldBe(0);
|
||
}
|
||
|
||
/// <summary>Detached suppression (T3): a Detached node likewise MUST NOT historize.</summary>
|
||
[Fact]
|
||
public void Detached_node_does_not_historize()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Detached);
|
||
actor.Tell(SampleEvent());
|
||
|
||
ExpectNoMsg(Settle);
|
||
sink.EnqueueCount.ShouldBe(0);
|
||
}
|
||
|
||
/// <summary>Primary writes (T4): when the cached local role is Primary, the adapter historizes as
|
||
/// normal (this is the single copy the durable sink sees).</summary>
|
||
[Fact]
|
||
public void Primary_node_historizes()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Primary);
|
||
actor.Tell(SampleEvent());
|
||
|
||
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||
}
|
||
|
||
/// <summary>Absent-node default-historize (T5): a snapshot that mentions only a DIFFERENT node
|
||
/// must NOT update the local cached role — the actor's own node is absent, so the role stays
|
||
/// null/unknown and the default-historize path must fire. Partial/stale snapshots MUST NOT
|
||
/// silently suppress historization for nodes not yet observed.</summary>
|
||
[Fact]
|
||
public void Redundancy_snapshot_without_local_node_leaves_role_unknown_and_historizes()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
// Send a snapshot that only describes a peer node — the local node is absent.
|
||
actor.Tell(new RedundancyStateChanged(
|
||
new[]
|
||
{
|
||
new NodeRedundancyState(
|
||
NodeId: new NodeId("some-other-node"),
|
||
Role: RedundancyRole.Secondary,
|
||
IsClusterLeader: false,
|
||
IsRoleLeaderForDriver: false,
|
||
AsOfUtc: DateTime.UtcNow),
|
||
},
|
||
CorrelationId.NewId()));
|
||
|
||
actor.Tell(SampleEvent());
|
||
|
||
// Local role is still unknown ⇒ default-historize path: sink must record exactly one enqueue.
|
||
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||
}
|
||
|
||
/// <summary>Builds an <see cref="AlarmTransitionEvent"/> (the shape published on the <c>alerts</c>
|
||
/// DPS topic) for the translate tests, with overridable severity / type / comment / kind.
|
||
/// <paramref name="historizeToAveva"/> is <c>bool?</c> so tests can pass <c>null</c> to simulate the
|
||
/// rolling-restart / cross-version case (missing field → CLR default null).</summary>
|
||
private static AlarmTransitionEvent SampleTransition(
|
||
int severity = 750,
|
||
string alarmTypeName = "LimitAlarm",
|
||
string? comment = "note",
|
||
string transitionKind = "Activated",
|
||
bool? historizeToAveva = true) => new(
|
||
AlarmId: "alm-9",
|
||
EquipmentPath: "Area/Line/Equip",
|
||
AlarmName: "HiHi",
|
||
TransitionKind: transitionKind,
|
||
Severity: severity,
|
||
Message: "level high",
|
||
User: "operator1",
|
||
TimestampUtc: DateTime.UtcNow,
|
||
AlarmTypeName: alarmTypeName,
|
||
Comment: comment,
|
||
HistorizeToAveva: historizeToAveva);
|
||
|
||
/// <summary>Alerts translate (T6): an <see cref="AlarmTransitionEvent"/> off the <c>alerts</c> topic
|
||
/// is translated to an <see cref="AlarmHistorianEvent"/> and historized by default (unknown role).
|
||
/// The translation must carry AlarmId, AlarmTypeName, EventKind (← TransitionKind), Severity bucket,
|
||
/// and Comment through faithfully.</summary>
|
||
[Fact]
|
||
public void Alerts_transition_is_historized_by_default()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
|
||
|
||
actor.Tell(SampleTransition());
|
||
|
||
AwaitAssert(
|
||
() =>
|
||
{
|
||
sink.EnqueueCount.ShouldBe(1);
|
||
var e = sink.Events.ShouldHaveSingleItem();
|
||
e.AlarmId.ShouldBe("alm-9");
|
||
e.AlarmTypeName.ShouldBe("LimitAlarm");
|
||
e.EventKind.ShouldBe("Activated");
|
||
e.Severity.ShouldBe(AlarmSeverity.High);
|
||
e.Comment.ShouldBe("note");
|
||
},
|
||
Settle);
|
||
}
|
||
|
||
/// <summary>Secondary suppression for alerts (T7): a Secondary node must NOT historize a transition
|
||
/// off the <c>alerts</c> topic — the Primary writes the single copy (DistributedPubSub fans the
|
||
/// single publish to BOTH nodes' historian adapters, so the gate is what makes it exactly-once).</summary>
|
||
[Fact]
|
||
public void Secondary_node_does_not_historize_alerts_transition()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Secondary);
|
||
actor.Tell(SampleTransition());
|
||
|
||
ExpectNoMsg(Settle);
|
||
sink.EnqueueCount.ShouldBe(0);
|
||
}
|
||
|
||
/// <summary>Primary writes alerts (T8): a Primary node historizes a transition off the
|
||
/// <c>alerts</c> topic (the single copy the durable sink sees).</summary>
|
||
[Fact]
|
||
public void Primary_node_historizes_alerts_transition()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Primary);
|
||
actor.Tell(SampleTransition());
|
||
|
||
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||
}
|
||
|
||
/// <summary>Per-alarm opt-out (T8b): a Primary node must NOT historize a transition whose
|
||
/// <c>HistorizeToAveva</c> is <c>false</c> — that flag is a per-alarm opt-out of DURABLE
|
||
/// historization only. The live <c>alerts</c> fan-out already happened upstream (the publish is NOT
|
||
/// gated on this flag), so only the durable sink write is suppressed.</summary>
|
||
[Fact]
|
||
public void Primary_node_does_not_historize_when_opted_out()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Primary);
|
||
actor.Tell(SampleTransition(historizeToAveva: false));
|
||
|
||
ExpectNoMsg(Settle);
|
||
sink.EnqueueCount.ShouldBe(0);
|
||
}
|
||
|
||
/// <summary>Rolling-restart default-on (T8c): when <c>HistorizeToAveva</c> is <c>null</c> — the shape
|
||
/// a cross-version / rolling-restart deserialize produces (old-format message missing the field maps to
|
||
/// the CLR default <c>null</c> for <c>bool?</c>) — a Primary node MUST historize. <c>null</c> is the
|
||
/// safe default-on posture: no audit row is dropped at a handover, matching the <c>AlarmTypeName</c>
|
||
/// null-coalesce precedent in the same <c>HistorianAdapterActor.Translate</c>.</summary>
|
||
[Fact]
|
||
public void Primary_historizes_when_flag_is_null()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||
|
||
TellRedundancyRole(actor, RedundancyRole.Primary);
|
||
actor.Tell(SampleTransition(historizeToAveva: null));
|
||
|
||
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||
}
|
||
|
||
/// <summary>Severity buckets (T9): the OPC UA 1–1000 numeric severity on the transition maps onto
|
||
/// the coarse <see cref="AlarmSeverity"/> at the same ceilings <c>ScriptedAlarmHostActor.SeverityToInt</c>
|
||
/// emits (Low≤250, Medium≤500, High≤750, Critical otherwise). Driven end-to-end through the enqueue.</summary>
|
||
[Theory]
|
||
[InlineData(250, AlarmSeverity.Low)]
|
||
[InlineData(251, AlarmSeverity.Medium)]
|
||
[InlineData(500, AlarmSeverity.Medium)]
|
||
[InlineData(750, AlarmSeverity.High)]
|
||
[InlineData(751, AlarmSeverity.Critical)]
|
||
[InlineData(1000, AlarmSeverity.Critical)]
|
||
public void Alerts_transition_severity_buckets(int severity, AlarmSeverity expected)
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
|
||
|
||
actor.Tell(SampleTransition(severity: severity));
|
||
|
||
AwaitAssert(
|
||
() => sink.Events.ShouldHaveSingleItem().Severity.ShouldBe(expected),
|
||
Settle);
|
||
}
|
||
|
||
/// <summary>Rolling-restart null default (T10): an old-format transition deserialized by Akka's JSON
|
||
/// serializer applies the CLR default (null) to <c>AlarmTypeName</c> rather than the record's
|
||
/// "AlarmCondition" call-site default. The translation must null-coalesce that back to
|
||
/// "AlarmCondition" so the historian never stores a null alarm type. Forced here by constructing the
|
||
/// transition with <c>AlarmTypeName: null!</c> (simulating the post-deserialization shape).</summary>
|
||
[Fact]
|
||
public void Alerts_transition_with_missing_AlarmTypeName_defaults()
|
||
{
|
||
var sink = new RecordingSink();
|
||
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink));
|
||
|
||
actor.Tell(SampleTransition(alarmTypeName: null!));
|
||
|
||
AwaitAssert(
|
||
() => sink.Events.ShouldHaveSingleItem().AlarmTypeName.ShouldBe("AlarmCondition"),
|
||
Settle);
|
||
}
|
||
}
|