fix(scripted-alarms): ScriptedAlarmHostActor review fixes — load-gen guard, quiet cancel, parse guard (T9 review)

This commit is contained in:
Joseph Doherty
2026-06-10 15:08:54 -04:00
parent 3b418a54f1
commit dafaf2faec
2 changed files with 133 additions and 23 deletions
@@ -69,18 +69,37 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// <summary>Pipe-back completion of an in-flight <see cref="ScriptedAlarmEngine.LoadAsync"/>:
/// carries the union of dependency refs to register mux interest for AFTER the load completed
/// (so the engine's upstream subscriptions exist before any value can arrive).</summary>
private sealed record AlarmsLoaded(IReadOnlyList<string> DepRefs);
/// (so the engine's upstream subscriptions exist before any value can arrive). <see cref="Generation"/>
/// is the load generation captured when <see cref="OnApply"/> kicked the load off; a stale
/// continuation (an earlier generation completing after a newer apply) is discarded in
/// <see cref="OnAlarmsLoaded"/> so out-of-order completions can't register stale dep refs.</summary>
private sealed record AlarmsLoaded(IReadOnlyList<string> DepRefs, int Generation);
/// <summary>Pipe-back marker for a <see cref="ScriptedAlarmEngine.LoadAsync"/> that was cancelled
/// because the actor is stopping (its <see cref="_cts"/> fired in <see cref="PostStop"/>). It is a
/// no-op — there is nothing to register and no fault to log — and exists only so a cancelled load
/// pipes back a quiet message instead of a <see cref="Status.Failure"/> that would log a spurious
/// Warning on every clean shutdown.</summary>
private sealed record AlarmsLoadCanceled;
private readonly IActorRef _publishActor;
private readonly IActorRef? _mux;
private readonly DependencyMuxTagUpstreamSource _upstream;
private readonly ScriptedAlarmEngine _engine;
private readonly Func<DateTime> _clock;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly CancellationTokenSource _cts = new();
private readonly EventHandler<ScriptedAlarmEvent> _onEngineEvent;
/// <summary>Monotonic load generation, bumped on every <see cref="OnApply"/>. The continuation that
/// pipes back an <see cref="AlarmsLoaded"/> captures the generation it was started under; a stale
/// completion (an earlier generation arriving after a newer apply) is discarded in
/// <see cref="OnAlarmsLoaded"/> so out-of-order loads never register superseded dep refs.</summary>
private int _loadGeneration;
/// <summary>Cached cluster DistributedPubSub mediator, resolved once in <see cref="PreStart"/> (on the
/// actor thread) and reused for every emission instead of re-resolving it per-publish.</summary>
private IActorRef _mediator = null!;
/// <summary>Factory method to create Props for a <see cref="ScriptedAlarmHostActor"/>.</summary>
/// <param name="publishActor">The OPC UA publish actor that consumes
/// <see cref="OpcUaPublishActor.AlarmStateUpdate"/> bridged from engine emissions.</param>
@@ -90,27 +109,23 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from. MUST be the
/// same instance the <paramref name="engine"/> was constructed around.</param>
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
/// <param name="clock">Optional UTC clock; defaults to <see cref="DateTime.UtcNow"/>.</param>
public static Props Props(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine,
Func<DateTime>? clock = null) =>
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock));
ScriptedAlarmEngine engine) =>
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine));
/// <summary>Initializes a new instance of the <see cref="ScriptedAlarmHostActor"/> class.</summary>
/// <param name="publishActor">The OPC UA publish actor emissions are bridged to.</param>
/// <param name="mux">Optional dependency multiplexer the host registers dependency interest with.</param>
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).</param>
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
/// <param name="clock">Optional UTC clock; defaults to <see cref="DateTime.UtcNow"/>.</param>
public ScriptedAlarmHostActor(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine,
Func<DateTime>? clock = null)
ScriptedAlarmEngine engine)
{
ArgumentNullException.ThrowIfNull(publishActor);
ArgumentNullException.ThrowIfNull(upstream);
@@ -119,7 +134,6 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
_mux = mux;
_upstream = upstream;
_engine = engine;
_clock = clock ?? (() => DateTime.UtcNow);
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
@@ -135,10 +149,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
// A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the
// failure doesn't hit the dead-letter log.
Receive<Status.Failure>(OnLoadFailed);
// A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so
// there's nothing to do — swallow it quietly (no Warning, no dead letter).
Receive<AlarmsLoadCanceled>(_ => { });
}
private void OnApply(ApplyScriptedAlarms msg)
{
// Bump the load generation and capture it into this apply's continuation. Rapid back-to-back
// applies kick off concurrent LoadAsyncs whose continuations may complete out of order; tagging
// each with its generation lets OnAlarmsLoaded discard any but the latest.
var gen = ++_loadGeneration;
// Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply
// not loaded (no predicate, no upstream subscription, no events).
var enabled = msg.Plans.Where(p => p.Enabled).ToList();
@@ -153,15 +175,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
.Distinct(StringComparer.Ordinal)
.ToList();
// PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and
// do NOT register interest (the prior generation's subscription, if any, stays — a failed
// reload should not silently drop live values). Self-message AlarmsLoaded carries the refs.
// PipeTo marshals the completion back onto the actor thread. The continuation runs with
// CancellationToken.None so it ALWAYS executes (even after _cts fired on stop) and branches on
// the load's outcome: a clean cancel (actor stopping) pipes back a quiet AlarmsLoadCanceled; a
// genuine fault pipes back a Status.Failure we log as Warning (the prior generation's mux
// subscription stays — a failed reload should not silently drop live values); success pipes back
// AlarmsLoaded carrying the refs + the generation it was loaded under.
_engine.LoadAsync(defs, _cts.Token)
.ContinueWith(
t => t.IsFaulted
? (object)new Status.Failure(t.Exception!)
: new AlarmsLoaded(depRefs),
_cts.Token,
t => t.IsCanceled ? (object)new AlarmsLoadCanceled()
: t.IsFaulted ? new Status.Failure(t.Exception!.GetBaseException())
: new AlarmsLoaded(depRefs, gen),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default)
.PipeTo(Self);
@@ -175,6 +200,14 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
private void OnAlarmsLoaded(AlarmsLoaded msg)
{
// Discard a stale completion: if a newer apply has since bumped the generation, this load's
// dep refs are superseded and registering them would re-introduce the old set. The latest
// generation's continuation will (or already did) register the current interest set.
if (msg.Generation != _loadGeneration)
{
return;
}
// Register mux interest only now that LoadAsync has returned — the engine's upstream
// subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the
// engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor),
@@ -211,7 +244,7 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
TimestampUtc: e.TimestampUtc));
// Publish the transition to the cluster `alerts` topic — the single historization + live
// fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread.
// fan-out path. The mediator was cached on the ACTOR thread in PreStart; we only Tell it here.
var evt = new AlarmTransitionEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
@@ -222,7 +255,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
User: TransitionUser(e),
TimestampUtc: e.TimestampUtc);
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt));
_mediator.Tell(new Publish(AlertsTopic, evt));
}
/// <inheritdoc />
protected override void PreStart()
{
// Resolve the cluster DPS mediator once, on the actor thread, so emissions only Tell it.
_mediator = DistributedPubSub.Get(Context.System).Mediator;
base.PreStart();
}
/// <inheritdoc />
@@ -241,13 +282,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// <summary>Maps an <see cref="EquipmentScriptedAlarmPlan"/> to the engine's
/// <see cref="ScriptedAlarmDefinition"/>. <see cref="EquipmentScriptedAlarmPlan.AlarmType"/> is a
/// string that must parse to an <see cref="AlarmKind"/>; an unrecognised type falls back to
/// <see cref="AlarmKind.AlarmCondition"/> rather than dropping the alarm.</summary>
/// string that must parse to a defined <see cref="AlarmKind"/> NAME; an unrecognised type — including
/// a numeric string like "5" or "-1", which <see cref="Enum.TryParse{TEnum}(string, out TEnum)"/>
/// would otherwise accept as an undefined value — falls back to <see cref="AlarmKind.AlarmCondition"/>
/// rather than dropping the alarm.</summary>
private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new(
AlarmId: p.ScriptedAlarmId,
EquipmentPath: p.EquipmentId,
AlarmName: p.Name,
Kind: Enum.TryParse<AlarmKind>(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition,
Kind: Enum.TryParse<AlarmKind>(p.AlarmType, out var k) && Enum.IsDefined(k) ? k : AlarmKind.AlarmCondition,
Severity: SeverityFromInt(p.Severity),
MessageTemplate: p.MessageTemplate,
PredicateScriptSource: p.PredicateSource,
@@ -50,6 +50,23 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
Retain: true,
Enabled: enabled);
/// <summary>Plan whose predicate references an unknown identifier so it fails to compile — applying it
/// faults the engine's LoadAsync. Used to prove a faulted load doesn't crash the host.</summary>
private static EquipmentScriptedAlarmPlan BadPlan(string id = "bad-1") =>
new(
ScriptedAlarmId: id,
EquipmentId: "Plant/Line1/M",
Name: "Broken",
AlarmType: "AlarmCondition",
Severity: 500,
MessageTemplate: "broken",
PredicateScriptId: $"{id}-script",
PredicateSource: "return unknownIdentifier;", // uncompilable → LoadAsync throws
DependencyRefs: Array.Empty<string>(),
HistorizeToAveva: false,
Retain: false,
Enabled: true);
private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream)
{
var logger = new LoggerConfiguration().CreateLogger();
@@ -170,4 +187,54 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
second.TagRefs.ShouldContain("M.Y");
second.TagRefs.ShouldNotContain("M.T");
}
/// <summary>Faulted load resilience: applying an alarm whose predicate doesn't compile faults the
/// engine's LoadAsync. The host logs a Warning (Status.Failure handler) and stays alive — it must
/// still process a subsequent valid apply, registering interest for that apply's dep refs.</summary>
[Fact]
public void Faulted_load_does_not_crash_host()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var (host, _) = Spawn(publish, mux);
// Apply a plan that fails to compile — LoadAsync faults, the host swallows it as a Warning and
// does NOT register interest (no dep refs to register, and the load never completed).
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { BadPlan() }));
mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
// Prove the actor is still responsive: a later valid apply loads + registers interest as normal.
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-ok", depRef: "M.T") }));
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout);
reg.TagRefs.ShouldContain("M.T");
}
/// <summary>Stale-load guard (fix A): two back-to-back applies with different dep-ref sets must end
/// with the mux holding the SECOND (latest-generation) set — a stale earlier completion must never
/// re-introduce the first set's refs.
///
/// <para>Limitation: forcing the two LoadAsync continuations to complete out of order is not
/// deterministic via real async timing (both loads are short + run on the thread pool). This test
/// therefore validates the guard's observable contract rather than the race itself: it fishes for
/// the RegisterInterest carrying the second apply's refs, then asserts no LATER RegisterInterest
/// re-introduces the first apply's refs. With the generation guard in place the latest apply always
/// wins; without it, an out-of-order stale completion could land a "M.A"-bearing RegisterInterest
/// after the "M.B" one — which this assertion would catch.</para></summary>
[Fact]
public void Stale_load_does_not_register_superseded_dep_refs()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var (host, _) = Spawn(publish, mux);
// Fire two applies in quick succession with disjoint dep refs; the second supersedes the first.
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-a", depRef: "M.A") }));
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-b", depRef: "M.B") }));
// The latest generation must win: fish for the RegisterInterest reflecting the second apply.
mux.FishForMessage<DependencyMuxActor.RegisterInterest>(r => r.TagRefs.Contains("M.B"), Timeout);
// No LATER RegisterInterest may re-introduce the first (superseded) apply's "M.A" ref.
mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
}