From dafaf2faecc7d75c3b7bfb8bf52af22df77f53bb Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 10 Jun 2026 15:08:54 -0400 Subject: [PATCH] =?UTF-8?q?fix(scripted-alarms):=20ScriptedAlarmHostActor?= =?UTF-8?q?=20review=20fixes=20=E2=80=94=20load-gen=20guard,=20quiet=20can?= =?UTF-8?q?cel,=20parse=20guard=20(T9=20review)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ScriptedAlarms/ScriptedAlarmHostActor.cs | 89 ++++++++++++++----- .../ScriptedAlarmHostActorTests.cs | 67 ++++++++++++++ 2 files changed, 133 insertions(+), 23 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs index c170aa33..cb72b329 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs @@ -69,18 +69,37 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor /// Pipe-back completion of an in-flight : /// carries the union of dependency refs to register mux interest for AFTER the load completed - /// (so the engine's upstream subscriptions exist before any value can arrive). - private sealed record AlarmsLoaded(IReadOnlyList DepRefs); + /// (so the engine's upstream subscriptions exist before any value can arrive). + /// is the load generation captured when kicked the load off; a stale + /// continuation (an earlier generation completing after a newer apply) is discarded in + /// so out-of-order completions can't register stale dep refs. + private sealed record AlarmsLoaded(IReadOnlyList DepRefs, int Generation); + + /// Pipe-back marker for a that was cancelled + /// because the actor is stopping (its fired in ). It is a + /// no-op — there is nothing to register and no fault to log — and exists only so a cancelled load + /// pipes back a quiet message instead of a that would log a spurious + /// Warning on every clean shutdown. + private sealed record AlarmsLoadCanceled; private readonly IActorRef _publishActor; private readonly IActorRef? _mux; private readonly DependencyMuxTagUpstreamSource _upstream; private readonly ScriptedAlarmEngine _engine; - private readonly Func _clock; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly CancellationTokenSource _cts = new(); private readonly EventHandler _onEngineEvent; + /// Monotonic load generation, bumped on every . The continuation that + /// pipes back an captures the generation it was started under; a stale + /// completion (an earlier generation arriving after a newer apply) is discarded in + /// so out-of-order loads never register superseded dep refs. + private int _loadGeneration; + + /// Cached cluster DistributedPubSub mediator, resolved once in (on the + /// actor thread) and reused for every emission instead of re-resolving it per-publish. + private IActorRef _mediator = null!; + /// Factory method to create Props for a . /// The OPC UA publish actor that consumes /// bridged from engine emissions. @@ -90,27 +109,23 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor /// The mux-fed upstream the engine reads + subscribes from. MUST be the /// same instance the was constructed around. /// The scripted-alarm engine this host owns + disposes. - /// Optional UTC clock; defaults to . public static Props Props( IActorRef publishActor, IActorRef? mux, DependencyMuxTagUpstreamSource upstream, - ScriptedAlarmEngine engine, - Func? clock = null) => - Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock)); + ScriptedAlarmEngine engine) => + Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine)); /// Initializes a new instance of the class. /// The OPC UA publish actor emissions are bridged to. /// Optional dependency multiplexer the host registers dependency interest with. /// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps). /// The scripted-alarm engine this host owns + disposes. - /// Optional UTC clock; defaults to . public ScriptedAlarmHostActor( IActorRef publishActor, IActorRef? mux, DependencyMuxTagUpstreamSource upstream, - ScriptedAlarmEngine engine, - Func? clock = null) + ScriptedAlarmEngine engine) { ArgumentNullException.ThrowIfNull(publishActor); ArgumentNullException.ThrowIfNull(upstream); @@ -119,7 +134,6 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor _mux = mux; _upstream = upstream; _engine = engine; - _clock = clock ?? (() => DateTime.UtcNow); // OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here — // marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field @@ -135,10 +149,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor // A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the // failure doesn't hit the dead-letter log. Receive(OnLoadFailed); + // A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so + // there's nothing to do — swallow it quietly (no Warning, no dead letter). + Receive(_ => { }); } private void OnApply(ApplyScriptedAlarms msg) { + // Bump the load generation and capture it into this apply's continuation. Rapid back-to-back + // applies kick off concurrent LoadAsyncs whose continuations may complete out of order; tagging + // each with its generation lets OnAlarmsLoaded discard any but the latest. + var gen = ++_loadGeneration; + // Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply // not loaded (no predicate, no upstream subscription, no events). var enabled = msg.Plans.Where(p => p.Enabled).ToList(); @@ -153,15 +175,18 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor .Distinct(StringComparer.Ordinal) .ToList(); - // PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and - // do NOT register interest (the prior generation's subscription, if any, stays — a failed - // reload should not silently drop live values). Self-message AlarmsLoaded carries the refs. + // PipeTo marshals the completion back onto the actor thread. The continuation runs with + // CancellationToken.None so it ALWAYS executes (even after _cts fired on stop) and branches on + // the load's outcome: a clean cancel (actor stopping) pipes back a quiet AlarmsLoadCanceled; a + // genuine fault pipes back a Status.Failure we log as Warning (the prior generation's mux + // subscription stays — a failed reload should not silently drop live values); success pipes back + // AlarmsLoaded carrying the refs + the generation it was loaded under. _engine.LoadAsync(defs, _cts.Token) .ContinueWith( - t => t.IsFaulted - ? (object)new Status.Failure(t.Exception!) - : new AlarmsLoaded(depRefs), - _cts.Token, + t => t.IsCanceled ? (object)new AlarmsLoadCanceled() + : t.IsFaulted ? new Status.Failure(t.Exception!.GetBaseException()) + : new AlarmsLoaded(depRefs, gen), + CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) .PipeTo(Self); @@ -175,6 +200,14 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor private void OnAlarmsLoaded(AlarmsLoaded msg) { + // Discard a stale completion: if a newer apply has since bumped the generation, this load's + // dep refs are superseded and registering them would re-introduce the old set. The latest + // generation's continuation will (or already did) register the current interest set. + if (msg.Generation != _loadGeneration) + { + return; + } + // Register mux interest only now that LoadAsync has returned — the engine's upstream // subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the // engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor), @@ -211,7 +244,7 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor TimestampUtc: e.TimestampUtc)); // Publish the transition to the cluster `alerts` topic — the single historization + live - // fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread. + // fan-out path. The mediator was cached on the ACTOR thread in PreStart; we only Tell it here. var evt = new AlarmTransitionEvent( AlarmId: e.AlarmId, EquipmentPath: e.EquipmentPath, @@ -222,7 +255,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor User: TransitionUser(e), TimestampUtc: e.TimestampUtc); - DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt)); + _mediator.Tell(new Publish(AlertsTopic, evt)); + } + + /// + protected override void PreStart() + { + // Resolve the cluster DPS mediator once, on the actor thread, so emissions only Tell it. + _mediator = DistributedPubSub.Get(Context.System).Mediator; + base.PreStart(); } /// @@ -241,13 +282,15 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor /// Maps an to the engine's /// . is a - /// string that must parse to an ; an unrecognised type falls back to - /// rather than dropping the alarm. + /// string that must parse to a defined NAME; an unrecognised type — including + /// a numeric string like "5" or "-1", which + /// would otherwise accept as an undefined value — falls back to + /// rather than dropping the alarm. private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new( AlarmId: p.ScriptedAlarmId, EquipmentPath: p.EquipmentId, AlarmName: p.Name, - Kind: Enum.TryParse(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition, + Kind: Enum.TryParse(p.AlarmType, out var k) && Enum.IsDefined(k) ? k : AlarmKind.AlarmCondition, Severity: SeverityFromInt(p.Severity), MessageTemplate: p.MessageTemplate, PredicateScriptSource: p.PredicateSource, diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs index 4229f27e..d23fd9f3 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs @@ -50,6 +50,23 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase Retain: true, Enabled: enabled); + /// Plan whose predicate references an unknown identifier so it fails to compile — applying it + /// faults the engine's LoadAsync. Used to prove a faulted load doesn't crash the host. + private static EquipmentScriptedAlarmPlan BadPlan(string id = "bad-1") => + new( + ScriptedAlarmId: id, + EquipmentId: "Plant/Line1/M", + Name: "Broken", + AlarmType: "AlarmCondition", + Severity: 500, + MessageTemplate: "broken", + PredicateScriptId: $"{id}-script", + PredicateSource: "return unknownIdentifier;", // uncompilable → LoadAsync throws + DependencyRefs: Array.Empty(), + HistorizeToAveva: false, + Retain: false, + Enabled: true); + private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream) { var logger = new LoggerConfiguration().CreateLogger(); @@ -170,4 +187,54 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase second.TagRefs.ShouldContain("M.Y"); second.TagRefs.ShouldNotContain("M.T"); } + + /// Faulted load resilience: applying an alarm whose predicate doesn't compile faults the + /// engine's LoadAsync. The host logs a Warning (Status.Failure handler) and stays alive — it must + /// still process a subsequent valid apply, registering interest for that apply's dep refs. + [Fact] + public void Faulted_load_does_not_crash_host() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var (host, _) = Spawn(publish, mux); + + // Apply a plan that fails to compile — LoadAsync faults, the host swallows it as a Warning and + // does NOT register interest (no dep refs to register, and the load never completed). + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { BadPlan() })); + mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + + // Prove the actor is still responsive: a later valid apply loads + registers interest as normal. + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-ok", depRef: "M.T") })); + var reg = mux.ExpectMsg(Timeout); + reg.TagRefs.ShouldContain("M.T"); + } + + /// Stale-load guard (fix A): two back-to-back applies with different dep-ref sets must end + /// with the mux holding the SECOND (latest-generation) set — a stale earlier completion must never + /// re-introduce the first set's refs. + /// + /// Limitation: forcing the two LoadAsync continuations to complete out of order is not + /// deterministic via real async timing (both loads are short + run on the thread pool). This test + /// therefore validates the guard's observable contract rather than the race itself: it fishes for + /// the RegisterInterest carrying the second apply's refs, then asserts no LATER RegisterInterest + /// re-introduces the first apply's refs. With the generation guard in place the latest apply always + /// wins; without it, an out-of-order stale completion could land a "M.A"-bearing RegisterInterest + /// after the "M.B" one — which this assertion would catch. + [Fact] + public void Stale_load_does_not_register_superseded_dep_refs() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var (host, _) = Spawn(publish, mux); + + // Fire two applies in quick succession with disjoint dep refs; the second supersedes the first. + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-a", depRef: "M.A") })); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-b", depRef: "M.B") })); + + // The latest generation must win: fish for the RegisterInterest reflecting the second apply. + mux.FishForMessage(r => r.TagRefs.Contains("M.B"), Timeout); + + // No LATER RegisterInterest may re-introduce the first (superseded) apply's "M.A" ref. + mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } }