168 lines
6.8 KiB
C#
168 lines
6.8 KiB
C#
using Akka.Actor;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian;
|
|
|
|
/// <summary>
|
|
/// TestKit coverage for <see cref="HistorianAdapterActor"/>'s Primary-only historization gate.
|
|
/// The actor caches this node's <see cref="RedundancyRole"/> from the <c>redundancy-state</c>
|
|
/// topic and SKIPS the sink enqueue when the local node is Secondary/Detached so a future
|
|
/// per-node feeder writes exactly once across the warm-redundant pair. Unknown/null role
|
|
/// default-writes (single-node deploys + the boot window must never silently drop historization).
|
|
/// </summary>
|
|
public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
|
|
{
|
|
/// <summary>The local node id the gating tests construct the adapter with.</summary>
|
|
private static readonly NodeId LocalNode = new("node-A");
|
|
|
|
/// <summary>A short window we allow the fire-and-forget enqueue to land within.</summary>
|
|
private static readonly TimeSpan Settle = TimeSpan.FromMilliseconds(500);
|
|
|
|
/// <summary>Thread-safe fake sink that records every <see cref="EnqueueAsync"/> call.</summary>
|
|
private sealed class RecordingSink : IAlarmHistorianSink
|
|
{
|
|
private int _count;
|
|
|
|
/// <summary>The number of <see cref="EnqueueAsync"/> calls observed so far.</summary>
|
|
public int EnqueueCount => Volatile.Read(ref _count);
|
|
|
|
/// <inheritdoc />
|
|
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
|
{
|
|
Interlocked.Increment(ref _count);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public HistorianSinkStatus GetStatus() => new(
|
|
QueueDepth: 0,
|
|
DeadLetterDepth: 0,
|
|
LastDrainUtc: null,
|
|
LastSuccessUtc: null,
|
|
LastError: null,
|
|
DrainState: HistorianDrainState.Idle);
|
|
}
|
|
|
|
/// <summary>Builds a minimal <see cref="AlarmHistorianEvent"/> for the gate tests.</summary>
|
|
private static AlarmHistorianEvent SampleEvent() => new(
|
|
AlarmId: "alm-1",
|
|
EquipmentPath: "Area/Line/Equip",
|
|
AlarmName: "HiHi",
|
|
AlarmTypeName: "LimitAlarm",
|
|
Severity: AlarmSeverity.High,
|
|
EventKind: "Activated",
|
|
Message: "level high",
|
|
User: "system",
|
|
Comment: null,
|
|
TimestampUtc: DateTime.UtcNow);
|
|
|
|
/// <summary>Tell <paramref name="actor"/> a <see cref="RedundancyStateChanged"/> snapshot marking
|
|
/// <see cref="LocalNode"/> with <paramref name="role"/> so the gate observes the local role.</summary>
|
|
private static void TellRedundancyRole(IActorRef actor, RedundancyRole role) =>
|
|
actor.Tell(new RedundancyStateChanged(
|
|
new[]
|
|
{
|
|
new NodeRedundancyState(
|
|
NodeId: LocalNode,
|
|
Role: role,
|
|
IsClusterLeader: role == RedundancyRole.Primary,
|
|
IsRoleLeaderForDriver: role == RedundancyRole.Primary,
|
|
AsOfUtc: DateTime.UtcNow),
|
|
},
|
|
CorrelationId.NewId()));
|
|
|
|
/// <summary>Default-write (T1): before any redundancy snapshot — the boot window and the steady
|
|
/// state for single-node deploys — the adapter MUST historize. Constructed WITH a localNode but
|
|
/// no snapshot sent, so the cached role is unknown ⇒ default-write.</summary>
|
|
[Fact]
|
|
public void Default_before_redundancy_state_historizes()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
|
|
|
actor.Tell(SampleEvent());
|
|
|
|
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
|
}
|
|
|
|
/// <summary>Secondary suppression (T2): when the cached local role is Secondary, the adapter MUST
|
|
/// NOT enqueue to the durable sink (the Primary writes the single copy).</summary>
|
|
[Fact]
|
|
public void Secondary_node_does_not_historize()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
|
|
|
TellRedundancyRole(actor, RedundancyRole.Secondary);
|
|
actor.Tell(SampleEvent());
|
|
|
|
// Give the (suppressed) fire-and-forget a stable window, then assert nothing landed.
|
|
ExpectNoMsg(Settle);
|
|
sink.EnqueueCount.ShouldBe(0);
|
|
}
|
|
|
|
/// <summary>Detached suppression (T3): a Detached node likewise MUST NOT historize.</summary>
|
|
[Fact]
|
|
public void Detached_node_does_not_historize()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
|
|
|
TellRedundancyRole(actor, RedundancyRole.Detached);
|
|
actor.Tell(SampleEvent());
|
|
|
|
ExpectNoMsg(Settle);
|
|
sink.EnqueueCount.ShouldBe(0);
|
|
}
|
|
|
|
/// <summary>Primary writes (T4): when the cached local role is Primary, the adapter historizes as
|
|
/// normal (this is the single copy the durable sink sees).</summary>
|
|
[Fact]
|
|
public void Primary_node_historizes()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
|
|
|
TellRedundancyRole(actor, RedundancyRole.Primary);
|
|
actor.Tell(SampleEvent());
|
|
|
|
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
|
}
|
|
|
|
/// <summary>Absent-node default-historize (T5): a snapshot that mentions only a DIFFERENT node
|
|
/// must NOT update the local cached role — the actor's own node is absent, so the role stays
|
|
/// null/unknown and the default-historize path must fire. Partial/stale snapshots MUST NOT
|
|
/// silently suppress historization for nodes not yet observed.</summary>
|
|
[Fact]
|
|
public void Redundancy_snapshot_without_local_node_leaves_role_unknown_and_historizes()
|
|
{
|
|
var sink = new RecordingSink();
|
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
|
|
|
// Send a snapshot that only describes a peer node — the local node is absent.
|
|
actor.Tell(new RedundancyStateChanged(
|
|
new[]
|
|
{
|
|
new NodeRedundancyState(
|
|
NodeId: new NodeId("some-other-node"),
|
|
Role: RedundancyRole.Secondary,
|
|
IsClusterLeader: false,
|
|
IsRoleLeaderForDriver: false,
|
|
AsOfUtc: DateTime.UtcNow),
|
|
},
|
|
CorrelationId.NewId()));
|
|
|
|
actor.Tell(SampleEvent());
|
|
|
|
// Local role is still unknown ⇒ default-historize path: sink must record exactly one enqueue.
|
|
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
|
}
|
|
}
|