feat(historian): honor per-alarm HistorizeToAveva opt-out at the durable write

This commit is contained in:
Joseph Doherty
2026-06-11 12:48:13 -04:00
parent fa839d1dbf
commit 8012509584
6 changed files with 78 additions and 9 deletions
@@ -16,6 +16,7 @@ namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
/// <param name="TimestampUtc">When the transition occurred.</param>
/// <param name="AlarmTypeName">OPC UA Part 9 condition subtype name — one of <c>LimitAlarm</c> / <c>DiscreteAlarm</c> / <c>OffNormalAlarm</c> / <c>AlarmCondition</c> (the base type, used as the default). The historian feed maps this onto the durable alarm-type column.</param>
/// <param name="Comment">Operator-supplied comment on ack / confirm / comment transitions; <c>null</c> for engine-driven transitions (Activated / Cleared / Shelved / …) that carry no comment.</param>
/// <param name="HistorizeToAveva">When <c>false</c>, the durable historian sink suppresses this transition (the live <c>alerts</c> fan-out is unaffected). Defaults to <c>true</c>. On a rolling restart an old-format message deserializes this as <c>false</c> (CLR default); that is safe because the writing node is always the same-version publisher — see HistorianAdapterActor.</param>
public sealed record AlarmTransitionEvent(
string AlarmId,
string EquipmentPath,
@@ -26,4 +27,5 @@ public sealed record AlarmTransitionEvent(
string User,
DateTime TimestampUtc,
string AlarmTypeName = "AlarmCondition",
string? Comment = null);
string? Comment = null,
bool HistorizeToAveva = true);
@@ -608,7 +608,11 @@ public sealed class ScriptedAlarmEngine : IDisposable
EmissionKind.Confirmed => condition.LastConfirmComment,
EmissionKind.CommentAdded => condition.Comments.Count == 0 ? null : condition.Comments[^1].Text,
_ => null,
});
},
// Carry the per-alarm durable-historization opt-out through to subscribers. The historian
// adapter honors it to suppress ONLY the durable sink write; the live alerts fan-out is
// unaffected (it is not gated on this flag).
HistorizeToAveva: state.Definition.HistorizeToAveva);
}
/// <summary>
@@ -834,7 +838,8 @@ public sealed record ScriptedAlarmEvent(
AlarmConditionState Condition,
EmissionKind Emission,
DateTime TimestampUtc,
string? Comment = null);
string? Comment = null,
bool HistorizeToAveva = true);
/// <summary>
/// Upstream source abstraction — intentionally identical shape to the virtual-tag
@@ -71,7 +71,13 @@ public sealed class HistorianAdapterActor : ReceiveActor
// ShouldHistorize gate keeps only the Primary writing ⇒ exactly-once across the warm pair.
// NOTE: Translate is intentionally inside the gate so Secondary/Detached nodes never allocate a
// discarded AlarmHistorianEvent.
Receive<AlarmTransitionEvent>(t => { if (ShouldHistorize()) _ = EnqueueAsync(Translate(t)); });
// t.HistorizeToAveva=false is a per-alarm opt-out of DURABLE historization only — the live `alerts`
// fan-out already happened upstream (the publish is NOT gated on this flag), so we gate the SINK
// write here, not the publish. Rolling-restart-safe: the node that WRITES is always the same-version
// node that PUBLISHED (Primary or boot window), so a cross-version old→new flow only reaches the
// Secondary, which never writes — an old-format message deserializing HistorizeToAveva as the CLR
// default (false) cannot drop a Primary's historization.
Receive<AlarmTransitionEvent>(t => { if (ShouldHistorize() && t.HistorizeToAveva) _ = EnqueueAsync(Translate(t)); });
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
@@ -298,7 +298,10 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
// Historian feed prep: carry the Part-9 subtype name (e.Kind.ToString() yields
// LimitAlarm/DiscreteAlarm/OffNormalAlarm/AlarmCondition) + any operator comment.
AlarmTypeName: e.Kind.ToString(),
Comment: e.Comment);
Comment: e.Comment,
// Per-alarm DURABLE-historization opt-out — honored downstream by HistorianAdapterActor to
// suppress only the sink write. This publish (and the live `/alerts` fan-out) is NOT gated on it.
HistorizeToAveva: e.HistorizeToAveva);
// Warm-standby dedup: only the Primary (driver-role leader) publishes the cluster-wide
// transition. Default-emit until told we are Secondary/Detached so single-node deploys + the
@@ -22,14 +22,16 @@ public sealed class ScriptedAlarmEngineTests
}
private static ScriptedAlarmDefinition Alarm(string id, string predicate,
string msg = "condition", AlarmSeverity sev = AlarmSeverity.High) =>
string msg = "condition", AlarmSeverity sev = AlarmSeverity.High,
bool historizeToAveva = true) =>
new(AlarmId: id,
EquipmentPath: "Plant/Line1/Reactor",
AlarmName: id,
Kind: AlarmKind.AlarmCondition,
Severity: sev,
MessageTemplate: msg,
PredicateScriptSource: predicate);
PredicateScriptSource: predicate,
HistorizeToAveva: historizeToAveva);
/// <summary>Verifies that LoadAsync compiles the alarm predicate and subscribes to all referenced upstream tags.</summary>
[Fact]
@@ -479,6 +481,38 @@ public sealed class ScriptedAlarmEngineTests
events.First(e => e.Emission == EmissionKind.CommentAdded).Comment.ShouldBe("second look");
}
/// <summary>Verifies the emitted <see cref="ScriptedAlarmEvent.HistorizeToAveva"/> carries the
/// per-alarm opt-out flag from the <see cref="ScriptedAlarmDefinition"/> through to the event so the
/// historian adapter can suppress the durable write while the live alerts fan-out is unaffected.
/// An alarm defined <c>HistorizeToAveva: false</c> emits <c>false</c>; the default (<c>true</c>)
/// emits <c>true</c>.</summary>
[Fact]
public async Task Emission_carries_HistorizeToAveva_flag_from_definition()
{
var up = new FakeUpstream();
up.Set("Temp", 50);
using var eng = Build(up, out _);
await eng.LoadAsync(
[
Alarm("OptOut", """return (int)ctx.GetTag("Temp").Value > 100;""", historizeToAveva: false),
Alarm("OptIn", """return (int)ctx.GetTag("Temp").Value > 100;""" /* default true */),
],
TestContext.Current.CancellationToken);
var events = new List<ScriptedAlarmEvent>();
eng.OnEvent += (_, e) => events.Add(e);
up.Push("Temp", 150);
await WaitForAsync(() =>
events.Any(e => e.AlarmId == "OptOut" && e.Emission == EmissionKind.Activated) &&
events.Any(e => e.AlarmId == "OptIn" && e.Emission == EmissionKind.Activated));
events.First(e => e.AlarmId == "OptOut" && e.Emission == EmissionKind.Activated)
.HistorizeToAveva.ShouldBeFalse("opt-out alarm carries HistorizeToAveva=false on its emission");
events.First(e => e.AlarmId == "OptIn" && e.Emission == EmissionKind.Activated)
.HistorizeToAveva.ShouldBeTrue("opt-in (default) alarm carries HistorizeToAveva=true on its emission");
}
// (2b) TimedShelveAsync / UnshelveAsync end-to-end through the engine.
/// <summary>Verifies that TimedShelveAsync shelves with a deadline and UnshelveAsync removes the shelve before the timer expires.</summary>
[Fact]
@@ -183,7 +183,8 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
int severity = 750,
string alarmTypeName = "LimitAlarm",
string? comment = "note",
string transitionKind = "Activated") => new(
string transitionKind = "Activated",
bool historizeToAveva = true) => new(
AlarmId: "alm-9",
EquipmentPath: "Area/Line/Equip",
AlarmName: "HiHi",
@@ -193,7 +194,8 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
User: "operator1",
TimestampUtc: DateTime.UtcNow,
AlarmTypeName: alarmTypeName,
Comment: comment);
Comment: comment,
HistorizeToAveva: historizeToAveva);
/// <summary>Alerts translate (T6): an <see cref="AlarmTransitionEvent"/> off the <c>alerts</c> topic
/// is translated to an <see cref="AlarmHistorianEvent"/> and historized by default (unknown role).
@@ -251,6 +253,23 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
AwaitAssert(() => sink.EnqueueCount.ShouldBe(1), Settle);
}
/// <summary>Per-alarm opt-out (T8b): a Primary node must NOT historize a transition whose
/// <c>HistorizeToAveva</c> is <c>false</c> — that flag is a per-alarm opt-out of DURABLE
/// historization only. The live <c>alerts</c> fan-out already happened upstream (the publish is NOT
/// gated on this flag), so only the durable sink write is suppressed.</summary>
[Fact]
public void Primary_node_does_not_historize_when_opted_out()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(HistorianAdapterActor.Props(sink, LocalNode));
TellRedundancyRole(actor, RedundancyRole.Primary);
actor.Tell(SampleTransition(historizeToAveva: false));
ExpectNoMsg(Settle);
sink.EnqueueCount.ShouldBe(0);
}
/// <summary>Severity buckets (T9): the OPC UA 11000 numeric severity on the transition maps onto
/// the coarse <see cref="AlarmSeverity"/> at the same ceilings <c>ScriptedAlarmHostActor.SeverityToInt</c>
/// emits (Low≤250, Medium≤500, High≤750, Critical otherwise). Driven end-to-end through the enqueue.</summary>