diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs
index c170aa33..cb72b329 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs
@@ -69,18 +69,37 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// Pipe-back completion of an in-flight :
/// carries the union of dependency refs to register mux interest for AFTER the load completed
- /// (so the engine's upstream subscriptions exist before any value can arrive).
- private sealed record AlarmsLoaded(IReadOnlyList DepRefs);
+ /// (so the engine's upstream subscriptions exist before any value can arrive).
+ /// is the load generation captured when kicked the load off; a stale
+ /// continuation (an earlier generation completing after a newer apply) is discarded in
+ /// so out-of-order completions can't register stale dep refs.
+ private sealed record AlarmsLoaded(IReadOnlyList DepRefs, int Generation);
+
+ /// Pipe-back marker for a that was cancelled
+ /// because the actor is stopping (its fired in ). It is a
+ /// no-op — there is nothing to register and no fault to log — and exists only so a cancelled load
+ /// pipes back a quiet message instead of a that would log a spurious
+ /// Warning on every clean shutdown.
+ private sealed record AlarmsLoadCanceled;
private readonly IActorRef _publishActor;
private readonly IActorRef? _mux;
private readonly DependencyMuxTagUpstreamSource _upstream;
private readonly ScriptedAlarmEngine _engine;
- private readonly Func _clock;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly CancellationTokenSource _cts = new();
private readonly EventHandler _onEngineEvent;
+ /// Monotonic load generation, bumped on every . The continuation that
+ /// pipes back an captures the generation it was started under; a stale
+ /// completion (an earlier generation arriving after a newer apply) is discarded in
+ /// so out-of-order loads never register superseded dep refs.
+ private int _loadGeneration;
+
+ /// Cached cluster DistributedPubSub mediator, resolved once in (on the
+ /// actor thread) and reused for every emission instead of re-resolving it per-publish.
+ private IActorRef _mediator = null!;
+
/// Factory method to create Props for a .
/// The OPC UA publish actor that consumes
/// bridged from engine emissions.
@@ -90,27 +109,23 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// The mux-fed upstream the engine reads + subscribes from. MUST be the
/// same instance the was constructed around.
/// The scripted-alarm engine this host owns + disposes.
- /// Optional UTC clock; defaults to .
public static Props Props(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
- ScriptedAlarmEngine engine,
- Func? clock = null) =>
- Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock));
+ ScriptedAlarmEngine engine) =>
+ Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine));
/// Initializes a new instance of the class.
/// The OPC UA publish actor emissions are bridged to.
/// Optional dependency multiplexer the host registers dependency interest with.
/// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).
/// The scripted-alarm engine this host owns + disposes.
- /// Optional UTC clock; defaults to .
public ScriptedAlarmHostActor(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
- ScriptedAlarmEngine engine,
- Func? clock = null)
+ ScriptedAlarmEngine engine)
{
ArgumentNullException.ThrowIfNull(publishActor);
ArgumentNullException.ThrowIfNull(upstream);
@@ -119,7 +134,6 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
_mux = mux;
_upstream = upstream;
_engine = engine;
- _clock = clock ?? (() => DateTime.UtcNow);
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
@@ -135,10 +149,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
// A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the
// failure doesn't hit the dead-letter log.
Receive(OnLoadFailed);
+ // A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so
+ // there's nothing to do — swallow it quietly (no Warning, no dead letter).
+ Receive(_ => { });
}
private void OnApply(ApplyScriptedAlarms msg)
{
+ // Bump the load generation and capture it into this apply's continuation. Rapid back-to-back
+ // applies kick off concurrent LoadAsyncs whose continuations may complete out of order; tagging
+ // each with its generation lets OnAlarmsLoaded discard any but the latest.
+ var gen = ++_loadGeneration;
+
// Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply
// not loaded (no predicate, no upstream subscription, no events).
var enabled = msg.Plans.Where(p => p.Enabled).ToList();
@@ -153,15 +175,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
.Distinct(StringComparer.Ordinal)
.ToList();
- // PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and
- // do NOT register interest (the prior generation's subscription, if any, stays — a failed
- // reload should not silently drop live values). Self-message AlarmsLoaded carries the refs.
+ // PipeTo marshals the completion back onto the actor thread. The continuation runs with
+ // CancellationToken.None so it ALWAYS executes (even after _cts fired on stop) and branches on
+ // the load's outcome: a clean cancel (actor stopping) pipes back a quiet AlarmsLoadCanceled; a
+ // genuine fault pipes back a Status.Failure we log as Warning (the prior generation's mux
+ // subscription stays — a failed reload should not silently drop live values); success pipes back
+ // AlarmsLoaded carrying the refs + the generation it was loaded under.
_engine.LoadAsync(defs, _cts.Token)
.ContinueWith(
- t => t.IsFaulted
- ? (object)new Status.Failure(t.Exception!)
- : new AlarmsLoaded(depRefs),
- _cts.Token,
+ t => t.IsCanceled ? (object)new AlarmsLoadCanceled()
+ : t.IsFaulted ? new Status.Failure(t.Exception!.GetBaseException())
+ : new AlarmsLoaded(depRefs, gen),
+ CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default)
.PipeTo(Self);
@@ -175,6 +200,14 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
private void OnAlarmsLoaded(AlarmsLoaded msg)
{
+ // Discard a stale completion: if a newer apply has since bumped the generation, this load's
+ // dep refs are superseded and registering them would re-introduce the old set. The latest
+ // generation's continuation will (or already did) register the current interest set.
+ if (msg.Generation != _loadGeneration)
+ {
+ return;
+ }
+
// Register mux interest only now that LoadAsync has returned — the engine's upstream
// subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the
// engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor),
@@ -211,7 +244,7 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
TimestampUtc: e.TimestampUtc));
// Publish the transition to the cluster `alerts` topic — the single historization + live
- // fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread.
+ // fan-out path. The mediator was cached on the ACTOR thread in PreStart; we only Tell it here.
var evt = new AlarmTransitionEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
@@ -222,7 +255,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
User: TransitionUser(e),
TimestampUtc: e.TimestampUtc);
- DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt));
+ _mediator.Tell(new Publish(AlertsTopic, evt));
+ }
+
+ ///
+ protected override void PreStart()
+ {
+ // Resolve the cluster DPS mediator once, on the actor thread, so emissions only Tell it.
+ _mediator = DistributedPubSub.Get(Context.System).Mediator;
+ base.PreStart();
}
///
@@ -241,13 +282,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
/// Maps an to the engine's
/// . is a
- /// string that must parse to an ; an unrecognised type falls back to
- /// rather than dropping the alarm.
+ /// string that must parse to a defined NAME; an unrecognised type — including
+ /// a numeric string like "5" or "-1", which
+ /// would otherwise accept as an undefined value — falls back to
+ /// rather than dropping the alarm.
private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new(
AlarmId: p.ScriptedAlarmId,
EquipmentPath: p.EquipmentId,
AlarmName: p.Name,
- Kind: Enum.TryParse(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition,
+ Kind: Enum.TryParse(p.AlarmType, out var k) && Enum.IsDefined(k) ? k : AlarmKind.AlarmCondition,
Severity: SeverityFromInt(p.Severity),
MessageTemplate: p.MessageTemplate,
PredicateScriptSource: p.PredicateSource,
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs
index 4229f27e..d23fd9f3 100644
--- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs
@@ -50,6 +50,23 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
Retain: true,
Enabled: enabled);
+ /// Plan whose predicate references an unknown identifier so it fails to compile — applying it
+ /// faults the engine's LoadAsync. Used to prove a faulted load doesn't crash the host.
+ private static EquipmentScriptedAlarmPlan BadPlan(string id = "bad-1") =>
+ new(
+ ScriptedAlarmId: id,
+ EquipmentId: "Plant/Line1/M",
+ Name: "Broken",
+ AlarmType: "AlarmCondition",
+ Severity: 500,
+ MessageTemplate: "broken",
+ PredicateScriptId: $"{id}-script",
+ PredicateSource: "return unknownIdentifier;", // uncompilable → LoadAsync throws
+ DependencyRefs: Array.Empty(),
+ HistorizeToAveva: false,
+ Retain: false,
+ Enabled: true);
+
private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream)
{
var logger = new LoggerConfiguration().CreateLogger();
@@ -170,4 +187,54 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
second.TagRefs.ShouldContain("M.Y");
second.TagRefs.ShouldNotContain("M.T");
}
+
+ /// Faulted load resilience: applying an alarm whose predicate doesn't compile faults the
+ /// engine's LoadAsync. The host logs a Warning (Status.Failure handler) and stays alive — it must
+ /// still process a subsequent valid apply, registering interest for that apply's dep refs.
+ [Fact]
+ public void Faulted_load_does_not_crash_host()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var (host, _) = Spawn(publish, mux);
+
+ // Apply a plan that fails to compile — LoadAsync faults, the host swallows it as a Warning and
+ // does NOT register interest (no dep refs to register, and the load never completed).
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { BadPlan() }));
+ mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
+
+ // Prove the actor is still responsive: a later valid apply loads + registers interest as normal.
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-ok", depRef: "M.T") }));
+ var reg = mux.ExpectMsg(Timeout);
+ reg.TagRefs.ShouldContain("M.T");
+ }
+
+ /// Stale-load guard (fix A): two back-to-back applies with different dep-ref sets must end
+ /// with the mux holding the SECOND (latest-generation) set — a stale earlier completion must never
+ /// re-introduce the first set's refs.
+ ///
+ /// Limitation: forcing the two LoadAsync continuations to complete out of order is not
+ /// deterministic via real async timing (both loads are short + run on the thread pool). This test
+ /// therefore validates the guard's observable contract rather than the race itself: it fishes for
+ /// the RegisterInterest carrying the second apply's refs, then asserts no LATER RegisterInterest
+ /// re-introduces the first apply's refs. With the generation guard in place the latest apply always
+ /// wins; without it, an out-of-order stale completion could land a "M.A"-bearing RegisterInterest
+ /// after the "M.B" one — which this assertion would catch.
+ [Fact]
+ public void Stale_load_does_not_register_superseded_dep_refs()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var (host, _) = Spawn(publish, mux);
+
+ // Fire two applies in quick succession with disjoint dep refs; the second supersedes the first.
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-a", depRef: "M.A") }));
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-b", depRef: "M.B") }));
+
+ // The latest generation must win: fish for the RegisterInterest reflecting the second apply.
+ mux.FishForMessage(r => r.TagRefs.Contains("M.B"), Timeout);
+
+ // No LATER RegisterInterest may re-introduce the first (superseded) apply's "M.A" ref.
+ mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
+ }
}