diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index 704f36b..1a16ff1 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -82,8 +82,8 @@ {"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "5cfbe8b", "deviation": "Delivered by Task 59 — DeployHappyPathTests.StartDeployment_seals_after_both_nodes_apply exercises the exact 'dispatch to N driver nodes, all ack, seals' flow via the real 2-node TwoNodeClusterHarness rather than a multi-system TestKit. Cleaner because it tests the production code path end-to-end.", "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."}, {"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action broadcast so tests can replace it with a probe; un-skip both tests."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."}, - {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers."}, - {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted."}, + {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."}, + {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."}, {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, {"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."}, diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IScriptedAlarmEvaluator.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IScriptedAlarmEvaluator.cs new file mode 100644 index 0000000..b123936 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IScriptedAlarmEvaluator.cs @@ -0,0 +1,30 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.Engines; + +/// +/// Abstraction over the scripted-alarm predicate engine. Production binds this to a +/// wrapper around ScriptedAlarmEngine from Core.ScriptedAlarms; default +/// binding is which keeps the alarm in its +/// current state (so an unconfigured node never spuriously alarms). +/// +public interface IScriptedAlarmEvaluator +{ + ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary dependencies); +} + +/// Result of one alarm-predicate evaluation. Active is only meaningful when +/// Success is true; on failure the caller should keep the prior state and log Reason. +public sealed record ScriptedAlarmEvalResult(bool Success, bool Active, string? Reason) +{ + public static ScriptedAlarmEvalResult Ok(bool active) => new(true, active, null); + public static ScriptedAlarmEvalResult Failure(string reason) => new(false, false, reason); +} + +/// Default that always returns Active = false, Success = true. Safe no-op: +/// no alarm fires when no real engine is bound. +public sealed class NullScriptedAlarmEvaluator : IScriptedAlarmEvaluator +{ + public static readonly NullScriptedAlarmEvaluator Instance = new(); + private NullScriptedAlarmEvaluator() { } + public ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary dependencies) + => ScriptedAlarmEvalResult.Ok(active: false); +} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IVirtualTagEvaluator.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IVirtualTagEvaluator.cs new file mode 100644 index 0000000..ce0d15e --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IVirtualTagEvaluator.cs @@ -0,0 +1,36 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.Engines; + +/// +/// Abstraction over the compiled virtual-tag expression engine. Runtime consumes this so +/// can stay free of Roslyn / scripting machinery and the +/// production wiring binds an adapter over VirtualTagEngine from +/// Core.VirtualTags. +/// +public interface IVirtualTagEvaluator +{ + /// + /// Evaluate against the snapshot in + /// . Implementations must not throw — script failures + /// are reported via . + /// + VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary dependencies); +} + +/// Result of one virtual-tag expression eval. Stash a Reason on every Failure so +/// callers can emit a useful ScriptLogEntry to operators. +public sealed record VirtualTagEvalResult(bool Success, object? Value, string? Reason) +{ + public static readonly VirtualTagEvalResult NoChange = new(true, null, "no-change"); + public static VirtualTagEvalResult Ok(object? value) => new(true, value, null); + public static VirtualTagEvalResult Failure(string reason) => new(false, null, reason); +} + +/// Returns from every call. Bound by default +/// when the production VirtualTagEngine adapter hasn't been registered (Mac dev, tests). +public sealed class NullVirtualTagEvaluator : IVirtualTagEvaluator +{ + public static readonly NullVirtualTagEvaluator Instance = new(); + private NullVirtualTagEvaluator() { } + public VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary dependencies) + => VirtualTagEvalResult.NoChange; +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs index da1a849..1dbb016 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs @@ -1,60 +1,162 @@ using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; public enum ScriptedAlarmActorState { Inactive, Active, Acknowledged } /// -/// State machine wrapping a single scripted alarm. Transitions: -/// Inactive → Active → Acknowledged → Inactive. -/// -/// Engine wiring (compile alarm expression via AlarmConditionService, persist state to -/// ScriptedAlarmState ConfigDb table on PreRestart, emit history rows to -/// HistorianAdapter) is staged for follow-up F9. This skeleton owns the state machine -/// so DriverHostActor can spawn it as a child. +/// One scripted alarm. Receives dependency value updates, runs the predicate via an +/// injected , and on transitions publishes both +/// an on the cluster alerts DPS topic and a +/// on script-logs. Manual +/// + still flow through the same state machine so the +/// legacy callers keep working. /// public sealed class ScriptedAlarmActor : ReceiveActor { + public const string AlertsTopic = "alerts"; + public const string ScriptLogsTopic = "script-logs"; + + public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc); public sealed record ConditionMet(string Reason); public sealed record AcknowledgeAlarm(string Actor); public sealed record ConditionCleared; public sealed record StateChanged(string AlarmId, ScriptedAlarmActorState State, DateTime AtUtc); - private readonly string _alarmId; + public sealed record AlarmConfig( + string AlarmId, + string AlarmName, + string EquipmentPath, + int Severity, + string? Predicate); + + private readonly AlarmConfig _config; + private readonly IScriptedAlarmEvaluator _evaluator; + private readonly Func? _publisherFactory; private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Dictionary _dependencies = new(StringComparer.Ordinal); + private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive; + public static Props Props( + AlarmConfig config, + IScriptedAlarmEvaluator? evaluator = null, + Func? publisherFactory = null) => + Akka.Actor.Props.Create(() => new ScriptedAlarmActor( + config, + evaluator ?? NullScriptedAlarmEvaluator.Instance, + publisherFactory)); + + /// Legacy single-arg ctor kept for callers that only care about the state machine + /// (no engine evaluation, no DPS fan-out). Equivalent to Props(new AlarmConfig(...)). public static Props Props(string alarmId) => - Akka.Actor.Props.Create(() => new ScriptedAlarmActor(alarmId)); + Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null)); - public ScriptedAlarmActor(string alarmId) + public ScriptedAlarmActor(AlarmConfig config, IScriptedAlarmEvaluator evaluator, Func? publisherFactory) { - _alarmId = alarmId; + _config = config; + _evaluator = evaluator; + _publisherFactory = publisherFactory; - Receive(msg => - { - if (_state != ScriptedAlarmActorState.Inactive) return; - Transition(ScriptedAlarmActorState.Active); - }); - Receive(msg => - { - if (_state != ScriptedAlarmActorState.Active) return; - Transition(ScriptedAlarmActorState.Acknowledged); - }); - Receive(_ => - { - if (_state == ScriptedAlarmActorState.Inactive) return; - Transition(ScriptedAlarmActorState.Inactive); - }); + Receive(OnDependencyChanged); + Receive(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); }); + Receive(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); }); + Receive(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); }); } - private void Transition(ScriptedAlarmActorState next) + private void OnDependencyChanged(DependencyValueChanged msg) + { + _dependencies[msg.TagId] = msg.Value; + + if (string.IsNullOrEmpty(_config.Predicate)) return; + + ScriptedAlarmEvalResult result; + try + { + result = _evaluator.Evaluate(_config.AlarmId, _config.Predicate, _dependencies); + } + catch (Exception ex) + { + _log.Warning(ex, "ScriptedAlarm {Id}: evaluator threw", _config.AlarmId); + PublishLog("Error", $"evaluator threw: {ex.Message}"); + return; + } + + if (!result.Success) + { + PublishLog("Warning", result.Reason ?? "evaluator failure"); + return; + } + + // Active condition wins regardless of ack state — re-firing is suppressed because + // _state already == Active. Cleared moves Active OR Acknowledged → Inactive. + if (result.Active && _state == ScriptedAlarmActorState.Inactive) + { + Transition(ScriptedAlarmActorState.Active, user: "system"); + } + else if (!result.Active && _state != ScriptedAlarmActorState.Inactive) + { + Transition(ScriptedAlarmActorState.Inactive, user: "system"); + } + } + + private void Transition(ScriptedAlarmActorState next, string user) { var prev = _state; _state = next; - _log.Info("ScriptedAlarm {Id}: {From} → {To}", _alarmId, prev, next); - Context.Parent.Tell(new StateChanged(_alarmId, next, DateTime.UtcNow)); - // F9: emit history row via HistorianAdapter; persist state to ScriptedAlarmState DB. + _log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next); + + var nowUtc = DateTime.UtcNow; + Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc)); + + var kind = next switch + { + ScriptedAlarmActorState.Active => "Activated", + ScriptedAlarmActorState.Acknowledged => "Acknowledged", + ScriptedAlarmActorState.Inactive => "Cleared", + _ => next.ToString(), + }; + + var evt = new AlarmTransitionEvent( + AlarmId: _config.AlarmId, + EquipmentPath: _config.EquipmentPath, + AlarmName: _config.AlarmName, + TransitionKind: kind, + Severity: _config.Severity, + Message: $"{_config.AlarmName} {kind}", + User: user, + TimestampUtc: nowUtc); + + PublishOrFallback(AlertsTopic, evt); + PublishLog("Information", $"{_config.AlarmName} {kind} (by {user})"); + } + + private void PublishLog(string level, string message) + { + var entry = new ScriptLogEntry( + ScriptId: _config.AlarmId, + Level: level, + Message: message, + TimestampUtc: DateTime.UtcNow, + VirtualTagId: null, + AlarmId: _config.AlarmId, + EquipmentId: null); + PublishOrFallback(ScriptLogsTopic, entry); + } + + private void PublishOrFallback(string topic, object payload) + { + if (_publisherFactory is not null) + { + _publisherFactory().Publish(topic, payload); + return; + } + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload)); } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs index ffa5adb..c2cb1d9 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs @@ -1,41 +1,123 @@ using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; using ZB.MOM.WW.OtOpcUa.Commons.Types; namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; /// /// Wraps a single virtual-tag expression. Receives dependency-tag updates, recomputes the -/// expression, and publishes the result to OpcUaPublishActor. -/// -/// Engine wiring (compile expression via VirtualTagEngine, manage subscriptions, -/// emit AttributeValueUpdate) is staged for follow-up F8. This skeleton compiles + has -/// a basic message contract so DriverHostActor can spawn it as a child. +/// expression via an injected , and emits an +/// to the parent (the publish actor) whenever the value +/// actually changes. Script failures publish a Warning on the +/// script-logs DPS topic so operators see the diagnostic in the live tail. /// public sealed class VirtualTagActor : ReceiveActor { + public const string ScriptLogsTopic = "script-logs"; + public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc); public sealed record EvaluationResult(string VirtualTagId, object? Value, DateTime TimestampUtc, CorrelationId Correlation); private readonly string _virtualTagId; + private readonly string _scriptId; private readonly string _expression; + private readonly IVirtualTagEvaluator _evaluator; + private readonly Func? _publisherFactory; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Dictionary _dependencies = new(StringComparer.Ordinal); - public static Props Props(string virtualTagId, string expression) => - Akka.Actor.Props.Create(() => new VirtualTagActor(virtualTagId, expression)); + private bool _hasLastValue; + private object? _lastValue; - public VirtualTagActor(string virtualTagId, string expression) + public static Props Props( + string virtualTagId, + string expression, + IVirtualTagEvaluator? evaluator = null, + string? scriptId = null, + Func? publisherFactory = null) => + Akka.Actor.Props.Create(() => new VirtualTagActor( + virtualTagId, expression, + evaluator ?? NullVirtualTagEvaluator.Instance, + scriptId ?? virtualTagId, + publisherFactory)); + + public VirtualTagActor( + string virtualTagId, + string expression, + IVirtualTagEvaluator evaluator, + string scriptId, + Func? publisherFactory) { _virtualTagId = virtualTagId; + _scriptId = scriptId; _expression = expression; + _evaluator = evaluator; + _publisherFactory = publisherFactory; - Receive(msg => + Receive(OnDependencyChanged); + } + + private void OnDependencyChanged(DependencyValueChanged msg) + { + _dependencies[msg.TagId] = msg.Value; + + VirtualTagEvalResult result; + try { - _dependencies[msg.TagId] = msg.Value; - // Engine wiring (F8): VirtualTagEngine.Evaluate(_expression, _dependencies) → publish. - _log.Debug("VirtualTag {Id}: dependency {Tag}={Value} buffered (eval staged for F8)", - _virtualTagId, msg.TagId, msg.Value); - }); + result = _evaluator.Evaluate(_virtualTagId, _expression, _dependencies); + } + catch (Exception ex) + { + _log.Warning(ex, "VirtualTag {Id}: evaluator threw", _virtualTagId); + PublishLog("Error", $"evaluator threw: {ex.Message}"); + return; + } + + if (!result.Success) + { + PublishLog("Warning", result.Reason ?? "evaluator failure"); + return; + } + + // Skip no-change results. Real evaluator returns Ok(value); Null returns NoChange — both + // safe because Null never produces a fresh value. + if (ReferenceEquals(result, VirtualTagEvalResult.NoChange)) return; + + if (_hasLastValue && Equals(_lastValue, result.Value)) return; + + _hasLastValue = true; + _lastValue = result.Value; + var evalResult = new EvaluationResult(_virtualTagId, result.Value, msg.TimestampUtc, CorrelationId.NewId()); + Context.Parent.Tell(evalResult); + } + + private void PublishLog(string level, string message) + { + var entry = new ScriptLogEntry( + ScriptId: _scriptId, + Level: level, + Message: message, + TimestampUtc: DateTime.UtcNow, + VirtualTagId: _virtualTagId, + AlarmId: null, + EquipmentId: null); + + if (_publisherFactory is not null) + { + _publisherFactory().Publish(ScriptLogsTopic, entry); + return; + } + + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(ScriptLogsTopic, entry)); } } + +/// +/// Thin seam for tests to capture DPS publishes without standing up a real cluster. +/// Production never instantiates this directly — the actor falls through to +/// when the factory is null. +/// +public sealed record DPSPublisher(Action Publish); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmActorTests.cs index a6721c9..8fd89a7 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmActorTests.cs @@ -1,9 +1,14 @@ +using System.Collections.Concurrent; using Akka.Actor; using Akka.TestKit; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms; @@ -13,7 +18,6 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase public void Full_state_cycle_publishes_StateChanged_to_parent_at_each_transition() { var parent = CreateTestProbe(); - // Wrap the alarm actor under our probe as parent so StateChanged lands on the probe. var actor = parent.ChildActorOf(ScriptedAlarmActor.Props("alarm-1")); actor.Tell(new ScriptedAlarmActor.ConditionMet("threshold")); @@ -41,4 +45,113 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase actor.Tell(new ScriptedAlarmActor.ConditionMet("second")); parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); } + + [Fact] + public void Engine_active_transition_publishes_AlarmTransitionEvent_to_alerts_topic() + { + var capture = new CapturingPublisher(); + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig( + AlarmId: "alarm-7", + AlarmName: "High Temp", + EquipmentPath: "/site-1/line-A/oven", + Severity: 800, + Predicate: "temp > 80"); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props( + config, + evaluator: new ThresholdEvaluator(80), + publisherFactory: () => new DPSPublisher(capture.Publish))); + + actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow)); + parent.ExpectMsg().State.ShouldBe(ScriptedAlarmActorState.Active); + + AwaitAssert(() => + { + var transitionEvt = capture.Payloads.OfType().SingleOrDefault(); + transitionEvt.ShouldNotBeNull(); + transitionEvt.AlarmId.ShouldBe("alarm-7"); + transitionEvt.AlarmName.ShouldBe("High Temp"); + transitionEvt.EquipmentPath.ShouldBe("/site-1/line-A/oven"); + transitionEvt.Severity.ShouldBe(800); + transitionEvt.TransitionKind.ShouldBe("Activated"); + transitionEvt.User.ShouldBe("system"); + + var log = capture.Payloads.OfType().SingleOrDefault(); + log.ShouldNotBeNull(); + log.AlarmId.ShouldBe("alarm-7"); + }, duration: TimeSpan.FromSeconds(1)); + } + + [Fact] + public void Engine_clear_transition_publishes_Cleared_event() + { + var capture = new CapturingPublisher(); + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig("alarm-7", "High Temp", "/p", 500, "temp > 80"); + var evaluator = new ThresholdEvaluator(80); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props( + config, evaluator, + publisherFactory: () => new DPSPublisher(capture.Publish))); + + actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow)); + parent.ExpectMsg(); + + actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 70, DateTime.UtcNow)); + parent.ExpectMsg().State.ShouldBe(ScriptedAlarmActorState.Inactive); + + AwaitAssert(() => + { + var kinds = capture.Payloads.OfType().Select(e => e.TransitionKind).ToList(); + kinds.ShouldContain("Activated"); + kinds.ShouldContain("Cleared"); + }, duration: TimeSpan.FromSeconds(1)); + } + + [Fact] + public void Manual_acknowledge_emits_Acknowledged_transition_with_user() + { + var capture = new CapturingPublisher(); + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig("a-1", "Pump Fail", "/eq", 700, Predicate: null); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props( + config, evaluator: null, + publisherFactory: () => new DPSPublisher(capture.Publish))); + + actor.Tell(new ScriptedAlarmActor.ConditionMet("driver-fault")); + parent.ExpectMsg(); + + actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("operator-jane")); + parent.ExpectMsg().State.ShouldBe(ScriptedAlarmActorState.Acknowledged); + + AwaitAssert(() => + { + var ackEvt = capture.Payloads.OfType() + .SingleOrDefault(e => e.TransitionKind == "Acknowledged"); + ackEvt.ShouldNotBeNull(); + ackEvt.User.ShouldBe("operator-jane"); + }, duration: TimeSpan.FromSeconds(1)); + } + + private sealed class ThresholdEvaluator : IScriptedAlarmEvaluator + { + private readonly double _threshold; + public ThresholdEvaluator(double threshold) { _threshold = threshold; } + public ScriptedAlarmEvalResult Evaluate(string id, string predicate, IReadOnlyDictionary deps) + { + if (!deps.TryGetValue("temp", out var raw) || raw is null) + return ScriptedAlarmEvalResult.Failure("missing temp"); + return ScriptedAlarmEvalResult.Ok(Convert.ToDouble(raw) > _threshold); + } + } + + private sealed class CapturingPublisher + { + public ConcurrentBag Topics { get; } = new(); + public ConcurrentBag Payloads { get; } = new(); + public void Publish(string topic, object payload) + { + Topics.Add(topic); + Payloads.Add(payload); + } + } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/VirtualTagActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/VirtualTagActorTests.cs index d75e760..4c3e654 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/VirtualTagActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/VirtualTagActorTests.cs @@ -1,6 +1,9 @@ +using System.Collections.Concurrent; using Akka.Actor; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; @@ -15,8 +18,100 @@ public sealed class VirtualTagActorTests : RuntimeActorTestBase Watch(actor); actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-a", 10, DateTime.UtcNow)); actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-b", 20, DateTime.UtcNow)); - - // No crash, no termination. ExpectNoMsg(TimeSpan.FromMilliseconds(200)); } + + [Fact] + public void Evaluator_result_flows_to_parent_as_EvaluationResult() + { + var parent = CreateTestProbe(); + var evaluator = new SumEvaluator(); + var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "a + b", evaluator: evaluator)); + + actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 10, DateTime.UtcNow)); + actor.Tell(new VirtualTagActor.DependencyValueChanged("b", 32, DateTime.UtcNow)); + + // First dep: a alone -> 10. Second dep: a + b -> 42. + var first = parent.ExpectMsg(); + first.Value.ShouldBe(10); + var second = parent.ExpectMsg(); + second.Value.ShouldBe(42); + } + + [Fact] + public void Repeated_same_value_does_not_emit_a_second_EvaluationResult() + { + var parent = CreateTestProbe(); + var evaluator = new ConstEvaluator(42); + var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "expr", evaluator: evaluator)); + + actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow)); + var first = parent.ExpectMsg(); + first.Value.ShouldBe(42); + + actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 2, DateTime.UtcNow)); + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void Evaluator_failure_publishes_ScriptLogEntry_warning() + { + var capture = new CapturingPublisher(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(VirtualTagActor.Props( + "vt-1", "broken", + evaluator: new FailingEvaluator("syntax error"), + scriptId: "script-7", + publisherFactory: () => new DPSPublisher(capture.Publish))); + + actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow)); + + AwaitAssert(() => + { + capture.Topics.ShouldContain("script-logs"); + var entry = (ScriptLogEntry)capture.Payloads.Single(); + entry.Level.ShouldBe("Warning"); + entry.Message.ShouldContain("syntax error"); + entry.ScriptId.ShouldBe("script-7"); + entry.VirtualTagId.ShouldBe("vt-1"); + }, duration: TimeSpan.FromMilliseconds(500)); + + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + } + + private sealed class SumEvaluator : IVirtualTagEvaluator + { + public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary deps) + { + var sum = deps.Values.OfType().Sum(); + return VirtualTagEvalResult.Ok(sum); + } + } + + private sealed class ConstEvaluator : IVirtualTagEvaluator + { + private readonly object _value; + public ConstEvaluator(object value) { _value = value; } + public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary deps) + => VirtualTagEvalResult.Ok(_value); + } + + private sealed class FailingEvaluator : IVirtualTagEvaluator + { + private readonly string _reason; + public FailingEvaluator(string reason) { _reason = reason; } + public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary deps) + => VirtualTagEvalResult.Failure(_reason); + } + + private sealed class CapturingPublisher + { + public ConcurrentBag Topics { get; } = new(); + public ConcurrentBag Payloads { get; } = new(); + public void Publish(string topic, object payload) + { + Topics.Add(topic); + Payloads.Add(payload); + } + } }