feat(redundancy): gate alarm historization on Primary (A2, defensive — actor currently unfed)
HistorianAdapterActor now subscribes to the redundancy-state DPS topic,
caches the local node's RedundancyRole, and SKIPS the durable-sink enqueue
when the local node is Secondary or Detached. Unknown/null role default-writes
so single-node deploys and the boot window never silently drop historization.
GetStatus stays ungated.
PREMISE: verified the actor is registered but FED BY NOTHING in production —
there is no AlarmHistorianEvent producer and nothing resolves its registry key
to Tell it. This is a FORWARD-LOOKING / DEFENSIVE guard, not a fix for a live
double-write: the moment a per-node feeder lands (engine -> historian, expected
as a per-node cluster broadcast like the alerts topic), only the Primary will
write to the durable sink (exactly-once across all alarm sources).
Mirrors the sibling A1 treatment of ScriptedAlarmHostActor (06c4155) and
OpcUaPublishActor's redundancy-state handler. localNode threaded through
HistorianAdapterActor.Props from ServiceCollectionExtensions (roleInfo.LocalNode).
This commit is contained in:
@@ -1,6 +1,10 @@
|
|||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
|
using Akka.Cluster.Tools.PublishSubscribe;
|
||||||
using Akka.Event;
|
using Akka.Event;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||||
|
|
||||||
@@ -23,22 +27,47 @@ public sealed class HistorianAdapterActor : ReceiveActor
|
|||||||
}
|
}
|
||||||
|
|
||||||
private readonly IAlarmHistorianSink _sink;
|
private readonly IAlarmHistorianSink _sink;
|
||||||
|
private readonly NodeId? _localNode;
|
||||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
private IActorRef? _mediator;
|
||||||
|
|
||||||
|
/// <summary>Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
|
||||||
|
/// snapshot (null = unknown until the first snapshot arrives, or no <see cref="_localNode"/> wired). The
|
||||||
|
/// durable sink enqueue in the <see cref="AlarmHistorianEvent"/> handler is gated on this: only the
|
||||||
|
/// Primary historizes (default-write while unknown so single-node deploys + the boot window never drop
|
||||||
|
/// historization).</summary>
|
||||||
|
private RedundancyRole? _localRole;
|
||||||
|
|
||||||
/// <summary>Creates the props for a HistorianAdapterActor instance.</summary>
|
/// <summary>Creates the props for a HistorianAdapterActor instance.</summary>
|
||||||
/// <param name="sink">The alarm historian sink implementation, or null to use a null sink.</param>
|
/// <param name="sink">The alarm historian sink implementation, or null to use a null sink.</param>
|
||||||
|
/// <param name="localNode">The local cluster node id, used to read this node's <see cref="RedundancyRole"/>
|
||||||
|
/// from the <c>redundancy-state</c> topic so only the Primary historizes to the durable sink. Null (the
|
||||||
|
/// default) leaves the role unknown ⇒ default-write (single-node deploys + tests).</param>
|
||||||
/// <returns>Props configured for creating a HistorianAdapterActor.</returns>
|
/// <returns>Props configured for creating a HistorianAdapterActor.</returns>
|
||||||
public static Props Props(IAlarmHistorianSink? sink = null) =>
|
public static Props Props(IAlarmHistorianSink? sink = null, NodeId? localNode = null) =>
|
||||||
Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance));
|
Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance, localNode));
|
||||||
|
|
||||||
/// <summary>Initializes a new instance of the HistorianAdapterActor class.</summary>
|
/// <summary>Initializes a new instance of the HistorianAdapterActor class.</summary>
|
||||||
/// <param name="sink">The alarm historian sink to forward enqueued events to.</param>
|
/// <param name="sink">The alarm historian sink to forward enqueued events to.</param>
|
||||||
public HistorianAdapterActor(IAlarmHistorianSink sink)
|
/// <param name="localNode">The local cluster node id, used to read this node's <see cref="RedundancyRole"/>
|
||||||
|
/// from the <c>redundancy-state</c> topic so only the Primary historizes to the durable sink. Null leaves
|
||||||
|
/// the role unknown ⇒ default-write (single-node deploys + tests).</param>
|
||||||
|
public HistorianAdapterActor(IAlarmHistorianSink sink, NodeId? localNode = null)
|
||||||
{
|
{
|
||||||
_sink = sink;
|
_sink = sink;
|
||||||
|
_localNode = localNode;
|
||||||
|
|
||||||
Receive<AlarmHistorianEvent>(evt =>
|
Receive<AlarmHistorianEvent>(evt =>
|
||||||
{
|
{
|
||||||
|
// Warm-standby dedup (forward-looking): only the Primary historizes to the durable sink so a
|
||||||
|
// future per-node feeder writes exactly once. Default-write until told Secondary/Detached so
|
||||||
|
// single-node deploys + the boot window never drop historization. (Currently the actor has no
|
||||||
|
// production feeder — this is a defensive guard for when engine→historian wiring lands.)
|
||||||
|
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously
|
// Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously
|
||||||
// inside EnqueueAsync (it returns once the row is committed), so we don't block on
|
// inside EnqueueAsync (it returns once the row is committed), so we don't block on
|
||||||
// network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state.
|
// network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state.
|
||||||
@@ -46,6 +75,41 @@ public sealed class HistorianAdapterActor : ReceiveActor
|
|||||||
});
|
});
|
||||||
|
|
||||||
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
|
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
|
||||||
|
|
||||||
|
// Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart)
|
||||||
|
// cache this node's role so the AlarmHistorianEvent handler can gate the durable sink enqueue to the
|
||||||
|
// Primary. The PubSub Subscribe is acked back to Self (no-op below).
|
||||||
|
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||||
|
Receive<SubscribeAck>(_ => { });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Subscribes to the <c>redundancy-state</c> topic so cluster role changes land as
|
||||||
|
/// <see cref="RedundancyStateChanged"/> and cache this node's role — the historian enqueue is gated to
|
||||||
|
/// the Primary so a future per-node feeder doesn't double-write across the warm-redundant pair.</summary>
|
||||||
|
protected override void PreStart()
|
||||||
|
{
|
||||||
|
_mediator = DistributedPubSub.Get(Context.System).Mediator;
|
||||||
|
_mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self));
|
||||||
|
base.PreStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Caches this node's <see cref="RedundancyRole"/> from a cluster redundancy snapshot so the
|
||||||
|
/// <see cref="AlarmHistorianEvent"/> handler can gate the durable sink enqueue to the Primary. A snapshot
|
||||||
|
/// that doesn't mention <see cref="_localNode"/> (or no local node wired) leaves the cached role unchanged
|
||||||
|
/// ⇒ default-write. Mirrors <see cref="OpcUaPublishActor"/>'s handler.</summary>
|
||||||
|
/// <param name="msg">The cluster redundancy snapshot.</param>
|
||||||
|
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
|
||||||
|
{
|
||||||
|
if (_localNode is null)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||||
|
if (local is not null)
|
||||||
|
{
|
||||||
|
_localRole = local.Role;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task EnqueueAsync(AlarmHistorianEvent evt)
|
private async Task EnqueueAsync(AlarmHistorianEvent evt)
|
||||||
|
|||||||
@@ -143,7 +143,7 @@ public static class ServiceCollectionExtensions
|
|||||||
registry.Register<DriverHostActorKey>(driverHost);
|
registry.Register<DriverHostActorKey>(driverHost);
|
||||||
|
|
||||||
var historian = system.ActorOf(
|
var historian = system.ActorOf(
|
||||||
HistorianAdapterActor.Props(historianSink),
|
HistorianAdapterActor.Props(historianSink, roleInfo.LocalNode),
|
||||||
HistorianAdapterActorName);
|
HistorianAdapterActorName);
|
||||||
registry.Register<HistorianAdapterActorKey>(historian);
|
registry.Register<HistorianAdapterActorKey>(historian);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -0,0 +1,138 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// TestKit coverage for <see cref="HistorianAdapterActor"/>'s Primary-only historization gate.
|
||||||
|
/// The actor caches this node's <see cref="RedundancyRole"/> from the <c>redundancy-state</c>
|
||||||
|
/// topic and SKIPS the sink enqueue when the local node is Secondary/Detached so a future
|
||||||
|
/// per-node feeder writes exactly once across the warm-redundant pair. Unknown/null role
|
||||||
|
/// default-writes (single-node deploys + the boot window must never silently drop historization).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
|
||||||
|
{
|
||||||
|
/// <summary>The local node id the gating tests construct the adapter with.</summary>
|
||||||
|
private static readonly NodeId LocalNode = new("node-A");
|
||||||
|
|
||||||
|
/// <summary>A short window we allow the fire-and-forget enqueue to land within.</summary>
|
||||||
|
private static readonly TimeSpan Settle = TimeSpan.FromMilliseconds(500);
|
||||||
|
|
||||||
|
/// <summary>Thread-safe fake sink that records every <see cref="EnqueueAsync"/> call.</summary>
|
||||||
|
private sealed class RecordingSink : IAlarmHistorianSink
|
||||||
|
{
|
||||||
|
private int _count;
|
||||||
|
|
||||||
|
/// <summary>The number of <see cref="EnqueueAsync"/> calls observed so far.</summary>
|
||||||
|
public int EnqueueCount => Volatile.Read(ref _count);
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _count);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public HistorianSinkStatus GetStatus() => new(
|
||||||
|
QueueDepth: 0,
|
||||||
|
DeadLetterDepth: 0,
|
||||||
|
LastDrainUtc: null,
|
||||||
|
LastSuccessUtc: null,
|
||||||
|
LastError: null,
|
||||||
|
DrainState: HistorianDrainState.Idle);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Builds a minimal <see cref="AlarmHistorianEvent"/> for the gate tests.</summary>
|
||||||
|
private static AlarmHistorianEvent SampleEvent() => new(
|
||||||
|
AlarmId: "alm-1",
|
||||||
|
EquipmentPath: "Area/Line/Equip",
|
||||||
|
AlarmName: "HiHi",
|
||||||
|
AlarmTypeName: "LimitAlarm",
|
||||||
|
Severity: AlarmSeverity.High,
|
||||||
|
EventKind: "Activated",
|
||||||
|
Message: "level high",
|
||||||
|
User: "system",
|
||||||
|
Comment: null,
|
||||||
|
TimestampUtc: DateTime.UtcNow);
|
||||||
|
|
||||||
|
/// <summary>Tell <paramref name="actor"/> a <see cref="RedundancyStateChanged"/> snapshot marking
|
||||||
|
/// <see cref="LocalNode"/> with <paramref name="role"/> so the gate observes the local role.</summary>
|
||||||
|
private static void TellRedundancyRole(IActorRef actor, RedundancyRole role) =>
|
||||||
|
actor.Tell(new RedundancyStateChanged(
|
||||||
|
new[]
|
||||||
|
{
|
||||||
|
new NodeRedundancyState(
|
||||||
|
NodeId: LocalNode,
|
||||||
|
Role: role,
|
||||||
|
IsClusterLeader: role == RedundancyRole.Primary,
|
||||||
|
IsRoleLeaderForDriver: role == RedundancyRole.Primary,
|
||||||
|
AsOfUtc: DateTime.UtcNow),
|
||||||
|
},
|
||||||
|
CorrelationId.NewId()));
|
||||||
|
|
||||||
|
/// <summary>Default-write (T1): before any redundancy snapshot — the boot window and the steady
|
||||||
|
/// state for single-node deploys — the adapter MUST historize. Constructed WITH a localNode but
|
||||||
|
/// no snapshot sent, so the cached role is unknown ⇒ default-write.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Default_before_redundancy_state_historizes()
|
||||||
|
{
|
||||||
|
var sink = new RecordingSink();
|
||||||
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||||||
|
|
||||||
|
actor.Tell(SampleEvent());
|
||||||
|
|
||||||
|
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Secondary suppression (T2): when the cached local role is Secondary, the adapter MUST
|
||||||
|
/// NOT enqueue to the durable sink (the Primary writes the single copy).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Secondary_node_does_not_historize()
|
||||||
|
{
|
||||||
|
var sink = new RecordingSink();
|
||||||
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||||||
|
|
||||||
|
TellRedundancyRole(actor, RedundancyRole.Secondary);
|
||||||
|
actor.Tell(SampleEvent());
|
||||||
|
|
||||||
|
// Give the (suppressed) fire-and-forget a stable window, then assert nothing landed.
|
||||||
|
ExpectNoMsg(Settle);
|
||||||
|
sink.EnqueueCount.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Detached suppression (T3): a Detached node likewise MUST NOT historize.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Detached_node_does_not_historize()
|
||||||
|
{
|
||||||
|
var sink = new RecordingSink();
|
||||||
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||||||
|
|
||||||
|
TellRedundancyRole(actor, RedundancyRole.Detached);
|
||||||
|
actor.Tell(SampleEvent());
|
||||||
|
|
||||||
|
ExpectNoMsg(Settle);
|
||||||
|
sink.EnqueueCount.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Primary writes (T4): when the cached local role is Primary, the adapter historizes as
|
||||||
|
/// normal (this is the single copy the durable sink sees).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Primary_node_historizes()
|
||||||
|
{
|
||||||
|
var sink = new RecordingSink();
|
||||||
|
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
|
||||||
|
|
||||||
|
TellRedundancyRole(actor, RedundancyRole.Primary);
|
||||||
|
actor.Tell(SampleEvent());
|
||||||
|
|
||||||
|
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user