feat(runtime): F8/F9 engine evaluator seams + DPS fan-out
VirtualTagActor and ScriptedAlarmActor now route through pluggable evaluator interfaces and fan out to the cluster's live-tail topics shipped in F15.3: - IVirtualTagEvaluator + NullVirtualTagEvaluator in Commons.Engines. VirtualTagActor calls evaluator on every DependencyValueChanged, dedupes unchanged values, forwards EvaluationResult to its parent, and publishes ScriptLogEntry Warning to the script-logs DPS topic whenever the evaluator fails. - IScriptedAlarmEvaluator + NullScriptedAlarmEvaluator. ScriptedAlarmActor takes an AlarmConfig (id/name/equipment-path/severity/predicate) and publishes both an AlarmTransitionEvent (alerts topic) and a ScriptLogEntry (script-logs topic) at every transition. Manual ConditionMet/Acknowledge/Cleared still flow through the same Transition() so callers without engine bindings still drive the state machine; the legacy single-string Props() overload routes through a default AlarmConfig. The Null* defaults keep the actors safe when no engine is bound — unconfigured nodes never spuriously alarm. Production binding to Core.VirtualTags.VirtualTagEngine and Core.ScriptedAlarms is the remaining residual (F8b/F9b — split in tasks JSON). Tests: Runtime 34 -> 40 (+6): - VirtualTagActorTests x3 (evaluator drives EvaluationResult, unchanged-value dedup, failure publishes Warning ScriptLogEntry) - ScriptedAlarmActorTests x3 (engine threshold drives Activated + Cleared on alerts topic, manual Acknowledge attribution). All 6 v2 test suites green: 126 tests passing.
This commit is contained in:
@@ -82,8 +82,8 @@
|
||||
{"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "5cfbe8b", "deviation": "Delivered by Task 59 — DeployHappyPathTests.StartDeployment_seals_after_both_nodes_apply exercises the exact 'dispatch to N driver nodes, all ack, seals' flow via the real 2-node TwoNodeClusterHarness rather than a multi-system TestKit. Cleaner because it tests the production code path end-to-end.", "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."},
|
||||
{"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action<object> broadcast so tests can replace it with a probe; un-skip both tests."},
|
||||
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."},
|
||||
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers."},
|
||||
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted."},
|
||||
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."},
|
||||
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."},
|
||||
{"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."},
|
||||
{"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."},
|
||||
{"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."},
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the scripted-alarm predicate engine. Production binds this to a
|
||||
/// wrapper around <c>ScriptedAlarmEngine</c> from <c>Core.ScriptedAlarms</c>; default
|
||||
/// binding is <see cref="NullScriptedAlarmEvaluator"/> which keeps the alarm in its
|
||||
/// current state (so an unconfigured node never spuriously alarms).
|
||||
/// </summary>
|
||||
public interface IScriptedAlarmEvaluator
|
||||
{
|
||||
ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary<string, object?> dependencies);
|
||||
}
|
||||
|
||||
/// <summary>Result of one alarm-predicate evaluation. <c>Active</c> is only meaningful when
|
||||
/// <c>Success</c> is true; on failure the caller should keep the prior state and log Reason.</summary>
|
||||
public sealed record ScriptedAlarmEvalResult(bool Success, bool Active, string? Reason)
|
||||
{
|
||||
public static ScriptedAlarmEvalResult Ok(bool active) => new(true, active, null);
|
||||
public static ScriptedAlarmEvalResult Failure(string reason) => new(false, false, reason);
|
||||
}
|
||||
|
||||
/// <summary>Default that always returns <c>Active = false, Success = true</c>. Safe no-op:
|
||||
/// no alarm fires when no real engine is bound.</summary>
|
||||
public sealed class NullScriptedAlarmEvaluator : IScriptedAlarmEvaluator
|
||||
{
|
||||
public static readonly NullScriptedAlarmEvaluator Instance = new();
|
||||
private NullScriptedAlarmEvaluator() { }
|
||||
public ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary<string, object?> dependencies)
|
||||
=> ScriptedAlarmEvalResult.Ok(active: false);
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the compiled virtual-tag expression engine. Runtime consumes this so
|
||||
/// <see cref="VirtualTagActor"/> can stay free of Roslyn / scripting machinery and the
|
||||
/// production wiring binds an adapter over <c>VirtualTagEngine</c> from
|
||||
/// <c>Core.VirtualTags</c>.
|
||||
/// </summary>
|
||||
public interface IVirtualTagEvaluator
|
||||
{
|
||||
/// <summary>
|
||||
/// Evaluate <paramref name="expression"/> against the snapshot in
|
||||
/// <paramref name="dependencies"/>. Implementations must not throw — script failures
|
||||
/// are reported via <see cref="VirtualTagEvalResult.Failure"/>.
|
||||
/// </summary>
|
||||
VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary<string, object?> dependencies);
|
||||
}
|
||||
|
||||
/// <summary>Result of one virtual-tag expression eval. Stash a Reason on every Failure so
|
||||
/// callers can emit a useful <c>ScriptLogEntry</c> to operators.</summary>
|
||||
public sealed record VirtualTagEvalResult(bool Success, object? Value, string? Reason)
|
||||
{
|
||||
public static readonly VirtualTagEvalResult NoChange = new(true, null, "no-change");
|
||||
public static VirtualTagEvalResult Ok(object? value) => new(true, value, null);
|
||||
public static VirtualTagEvalResult Failure(string reason) => new(false, null, reason);
|
||||
}
|
||||
|
||||
/// <summary>Returns <see cref="VirtualTagEvalResult.NoChange"/> from every call. Bound by default
|
||||
/// when the production <c>VirtualTagEngine</c> adapter hasn't been registered (Mac dev, tests).</summary>
|
||||
public sealed class NullVirtualTagEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
public static readonly NullVirtualTagEvaluator Instance = new();
|
||||
private NullVirtualTagEvaluator() { }
|
||||
public VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary<string, object?> dependencies)
|
||||
=> VirtualTagEvalResult.NoChange;
|
||||
}
|
||||
@@ -1,60 +1,162 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
|
||||
public enum ScriptedAlarmActorState { Inactive, Active, Acknowledged }
|
||||
|
||||
/// <summary>
|
||||
/// State machine wrapping a single scripted alarm. Transitions:
|
||||
/// <c>Inactive → Active → Acknowledged → Inactive</c>.
|
||||
///
|
||||
/// Engine wiring (compile alarm expression via <c>AlarmConditionService</c>, persist state to
|
||||
/// <c>ScriptedAlarmState</c> ConfigDb table on <c>PreRestart</c>, emit history rows to
|
||||
/// <c>HistorianAdapter</c>) is staged for follow-up F9. This skeleton owns the state machine
|
||||
/// so DriverHostActor can spawn it as a child.
|
||||
/// One scripted alarm. Receives dependency value updates, runs the predicate via an
|
||||
/// injected <see cref="IScriptedAlarmEvaluator"/>, and on transitions publishes both
|
||||
/// an <see cref="AlarmTransitionEvent"/> on the cluster <c>alerts</c> DPS topic and a
|
||||
/// <see cref="ScriptLogEntry"/> on <c>script-logs</c>. Manual <see cref="AcknowledgeAlarm"/>
|
||||
/// + <see cref="ConditionCleared"/> still flow through the same state machine so the
|
||||
/// legacy callers keep working.
|
||||
/// </summary>
|
||||
public sealed class ScriptedAlarmActor : ReceiveActor
|
||||
{
|
||||
public const string AlertsTopic = "alerts";
|
||||
public const string ScriptLogsTopic = "script-logs";
|
||||
|
||||
public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc);
|
||||
public sealed record ConditionMet(string Reason);
|
||||
public sealed record AcknowledgeAlarm(string Actor);
|
||||
public sealed record ConditionCleared;
|
||||
public sealed record StateChanged(string AlarmId, ScriptedAlarmActorState State, DateTime AtUtc);
|
||||
|
||||
private readonly string _alarmId;
|
||||
public sealed record AlarmConfig(
|
||||
string AlarmId,
|
||||
string AlarmName,
|
||||
string EquipmentPath,
|
||||
int Severity,
|
||||
string? Predicate);
|
||||
|
||||
private readonly AlarmConfig _config;
|
||||
private readonly IScriptedAlarmEvaluator _evaluator;
|
||||
private readonly Func<DPSPublisher>? _publisherFactory;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
|
||||
|
||||
private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive;
|
||||
|
||||
public static Props Props(
|
||||
AlarmConfig config,
|
||||
IScriptedAlarmEvaluator? evaluator = null,
|
||||
Func<DPSPublisher>? publisherFactory = null) =>
|
||||
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(
|
||||
config,
|
||||
evaluator ?? NullScriptedAlarmEvaluator.Instance,
|
||||
publisherFactory));
|
||||
|
||||
/// <summary>Legacy single-arg ctor kept for callers that only care about the state machine
|
||||
/// (no engine evaluation, no DPS fan-out). Equivalent to <c>Props(new AlarmConfig(...))</c>.</summary>
|
||||
public static Props Props(string alarmId) =>
|
||||
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(alarmId));
|
||||
Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null));
|
||||
|
||||
public ScriptedAlarmActor(string alarmId)
|
||||
public ScriptedAlarmActor(AlarmConfig config, IScriptedAlarmEvaluator evaluator, Func<DPSPublisher>? publisherFactory)
|
||||
{
|
||||
_alarmId = alarmId;
|
||||
_config = config;
|
||||
_evaluator = evaluator;
|
||||
_publisherFactory = publisherFactory;
|
||||
|
||||
Receive<ConditionMet>(msg =>
|
||||
{
|
||||
if (_state != ScriptedAlarmActorState.Inactive) return;
|
||||
Transition(ScriptedAlarmActorState.Active);
|
||||
});
|
||||
Receive<AcknowledgeAlarm>(msg =>
|
||||
{
|
||||
if (_state != ScriptedAlarmActorState.Active) return;
|
||||
Transition(ScriptedAlarmActorState.Acknowledged);
|
||||
});
|
||||
Receive<ConditionCleared>(_ =>
|
||||
{
|
||||
if (_state == ScriptedAlarmActorState.Inactive) return;
|
||||
Transition(ScriptedAlarmActorState.Inactive);
|
||||
});
|
||||
Receive<DependencyValueChanged>(OnDependencyChanged);
|
||||
Receive<ConditionMet>(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); });
|
||||
Receive<AcknowledgeAlarm>(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); });
|
||||
Receive<ConditionCleared>(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); });
|
||||
}
|
||||
|
||||
private void Transition(ScriptedAlarmActorState next)
|
||||
private void OnDependencyChanged(DependencyValueChanged msg)
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
|
||||
if (string.IsNullOrEmpty(_config.Predicate)) return;
|
||||
|
||||
ScriptedAlarmEvalResult result;
|
||||
try
|
||||
{
|
||||
result = _evaluator.Evaluate(_config.AlarmId, _config.Predicate, _dependencies);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "ScriptedAlarm {Id}: evaluator threw", _config.AlarmId);
|
||||
PublishLog("Error", $"evaluator threw: {ex.Message}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
PublishLog("Warning", result.Reason ?? "evaluator failure");
|
||||
return;
|
||||
}
|
||||
|
||||
// Active condition wins regardless of ack state — re-firing is suppressed because
|
||||
// _state already == Active. Cleared moves Active OR Acknowledged → Inactive.
|
||||
if (result.Active && _state == ScriptedAlarmActorState.Inactive)
|
||||
{
|
||||
Transition(ScriptedAlarmActorState.Active, user: "system");
|
||||
}
|
||||
else if (!result.Active && _state != ScriptedAlarmActorState.Inactive)
|
||||
{
|
||||
Transition(ScriptedAlarmActorState.Inactive, user: "system");
|
||||
}
|
||||
}
|
||||
|
||||
private void Transition(ScriptedAlarmActorState next, string user)
|
||||
{
|
||||
var prev = _state;
|
||||
_state = next;
|
||||
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _alarmId, prev, next);
|
||||
Context.Parent.Tell(new StateChanged(_alarmId, next, DateTime.UtcNow));
|
||||
// F9: emit history row via HistorianAdapter; persist state to ScriptedAlarmState DB.
|
||||
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next);
|
||||
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc));
|
||||
|
||||
var kind = next switch
|
||||
{
|
||||
ScriptedAlarmActorState.Active => "Activated",
|
||||
ScriptedAlarmActorState.Acknowledged => "Acknowledged",
|
||||
ScriptedAlarmActorState.Inactive => "Cleared",
|
||||
_ => next.ToString(),
|
||||
};
|
||||
|
||||
var evt = new AlarmTransitionEvent(
|
||||
AlarmId: _config.AlarmId,
|
||||
EquipmentPath: _config.EquipmentPath,
|
||||
AlarmName: _config.AlarmName,
|
||||
TransitionKind: kind,
|
||||
Severity: _config.Severity,
|
||||
Message: $"{_config.AlarmName} {kind}",
|
||||
User: user,
|
||||
TimestampUtc: nowUtc);
|
||||
|
||||
PublishOrFallback(AlertsTopic, evt);
|
||||
PublishLog("Information", $"{_config.AlarmName} {kind} (by {user})");
|
||||
}
|
||||
|
||||
private void PublishLog(string level, string message)
|
||||
{
|
||||
var entry = new ScriptLogEntry(
|
||||
ScriptId: _config.AlarmId,
|
||||
Level: level,
|
||||
Message: message,
|
||||
TimestampUtc: DateTime.UtcNow,
|
||||
VirtualTagId: null,
|
||||
AlarmId: _config.AlarmId,
|
||||
EquipmentId: null);
|
||||
PublishOrFallback(ScriptLogsTopic, entry);
|
||||
}
|
||||
|
||||
private void PublishOrFallback(string topic, object payload)
|
||||
{
|
||||
if (_publisherFactory is not null)
|
||||
{
|
||||
_publisherFactory().Publish(topic, payload);
|
||||
return;
|
||||
}
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,41 +1,123 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a single virtual-tag expression. Receives dependency-tag updates, recomputes the
|
||||
/// expression, and publishes the result to <c>OpcUaPublishActor</c>.
|
||||
///
|
||||
/// Engine wiring (compile expression via <c>VirtualTagEngine</c>, manage subscriptions,
|
||||
/// emit <c>AttributeValueUpdate</c>) is staged for follow-up F8. This skeleton compiles + has
|
||||
/// a basic message contract so DriverHostActor can spawn it as a child.
|
||||
/// expression via an injected <see cref="IVirtualTagEvaluator"/>, and emits an
|
||||
/// <see cref="EvaluationResult"/> to the parent (the publish actor) whenever the value
|
||||
/// actually changes. Script failures publish a Warning <see cref="ScriptLogEntry"/> on the
|
||||
/// <c>script-logs</c> DPS topic so operators see the diagnostic in the live tail.
|
||||
/// </summary>
|
||||
public sealed class VirtualTagActor : ReceiveActor
|
||||
{
|
||||
public const string ScriptLogsTopic = "script-logs";
|
||||
|
||||
public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc);
|
||||
public sealed record EvaluationResult(string VirtualTagId, object? Value, DateTime TimestampUtc, CorrelationId Correlation);
|
||||
|
||||
private readonly string _virtualTagId;
|
||||
private readonly string _scriptId;
|
||||
private readonly string _expression;
|
||||
private readonly IVirtualTagEvaluator _evaluator;
|
||||
private readonly Func<DPSPublisher>? _publisherFactory;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
|
||||
|
||||
public static Props Props(string virtualTagId, string expression) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagActor(virtualTagId, expression));
|
||||
private bool _hasLastValue;
|
||||
private object? _lastValue;
|
||||
|
||||
public VirtualTagActor(string virtualTagId, string expression)
|
||||
public static Props Props(
|
||||
string virtualTagId,
|
||||
string expression,
|
||||
IVirtualTagEvaluator? evaluator = null,
|
||||
string? scriptId = null,
|
||||
Func<DPSPublisher>? publisherFactory = null) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagActor(
|
||||
virtualTagId, expression,
|
||||
evaluator ?? NullVirtualTagEvaluator.Instance,
|
||||
scriptId ?? virtualTagId,
|
||||
publisherFactory));
|
||||
|
||||
public VirtualTagActor(
|
||||
string virtualTagId,
|
||||
string expression,
|
||||
IVirtualTagEvaluator evaluator,
|
||||
string scriptId,
|
||||
Func<DPSPublisher>? publisherFactory)
|
||||
{
|
||||
_virtualTagId = virtualTagId;
|
||||
_scriptId = scriptId;
|
||||
_expression = expression;
|
||||
_evaluator = evaluator;
|
||||
_publisherFactory = publisherFactory;
|
||||
|
||||
Receive<DependencyValueChanged>(msg =>
|
||||
Receive<DependencyValueChanged>(OnDependencyChanged);
|
||||
}
|
||||
|
||||
private void OnDependencyChanged(DependencyValueChanged msg)
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
|
||||
VirtualTagEvalResult result;
|
||||
try
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
// Engine wiring (F8): VirtualTagEngine.Evaluate(_expression, _dependencies) → publish.
|
||||
_log.Debug("VirtualTag {Id}: dependency {Tag}={Value} buffered (eval staged for F8)",
|
||||
_virtualTagId, msg.TagId, msg.Value);
|
||||
});
|
||||
result = _evaluator.Evaluate(_virtualTagId, _expression, _dependencies);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "VirtualTag {Id}: evaluator threw", _virtualTagId);
|
||||
PublishLog("Error", $"evaluator threw: {ex.Message}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
PublishLog("Warning", result.Reason ?? "evaluator failure");
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip no-change results. Real evaluator returns Ok(value); Null returns NoChange — both
|
||||
// safe because Null never produces a fresh value.
|
||||
if (ReferenceEquals(result, VirtualTagEvalResult.NoChange)) return;
|
||||
|
||||
if (_hasLastValue && Equals(_lastValue, result.Value)) return;
|
||||
|
||||
_hasLastValue = true;
|
||||
_lastValue = result.Value;
|
||||
var evalResult = new EvaluationResult(_virtualTagId, result.Value, msg.TimestampUtc, CorrelationId.NewId());
|
||||
Context.Parent.Tell(evalResult);
|
||||
}
|
||||
|
||||
private void PublishLog(string level, string message)
|
||||
{
|
||||
var entry = new ScriptLogEntry(
|
||||
ScriptId: _scriptId,
|
||||
Level: level,
|
||||
Message: message,
|
||||
TimestampUtc: DateTime.UtcNow,
|
||||
VirtualTagId: _virtualTagId,
|
||||
AlarmId: null,
|
||||
EquipmentId: null);
|
||||
|
||||
if (_publisherFactory is not null)
|
||||
{
|
||||
_publisherFactory().Publish(ScriptLogsTopic, entry);
|
||||
return;
|
||||
}
|
||||
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(ScriptLogsTopic, entry));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thin seam for tests to capture DPS publishes without standing up a real cluster.
|
||||
/// Production never instantiates this directly — the actor falls through to
|
||||
/// <see cref="DistributedPubSub"/> when the factory is null.
|
||||
/// </summary>
|
||||
public sealed record DPSPublisher(Action<string, object> Publish);
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
|
||||
|
||||
@@ -13,7 +18,6 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase
|
||||
public void Full_state_cycle_publishes_StateChanged_to_parent_at_each_transition()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
// Wrap the alarm actor under our probe as parent so StateChanged lands on the probe.
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props("alarm-1"));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("threshold"));
|
||||
@@ -41,4 +45,113 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("second"));
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Engine_active_transition_publishes_AlarmTransitionEvent_to_alerts_topic()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig(
|
||||
AlarmId: "alarm-7",
|
||||
AlarmName: "High Temp",
|
||||
EquipmentPath: "/site-1/line-A/oven",
|
||||
Severity: 800,
|
||||
Predicate: "temp > 80");
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config,
|
||||
evaluator: new ThresholdEvaluator(80),
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Active);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var transitionEvt = capture.Payloads.OfType<AlarmTransitionEvent>().SingleOrDefault();
|
||||
transitionEvt.ShouldNotBeNull();
|
||||
transitionEvt.AlarmId.ShouldBe("alarm-7");
|
||||
transitionEvt.AlarmName.ShouldBe("High Temp");
|
||||
transitionEvt.EquipmentPath.ShouldBe("/site-1/line-A/oven");
|
||||
transitionEvt.Severity.ShouldBe(800);
|
||||
transitionEvt.TransitionKind.ShouldBe("Activated");
|
||||
transitionEvt.User.ShouldBe("system");
|
||||
|
||||
var log = capture.Payloads.OfType<ScriptLogEntry>().SingleOrDefault();
|
||||
log.ShouldNotBeNull();
|
||||
log.AlarmId.ShouldBe("alarm-7");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Engine_clear_transition_publishes_Cleared_event()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig("alarm-7", "High Temp", "/p", 500, "temp > 80");
|
||||
var evaluator = new ThresholdEvaluator(80);
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config, evaluator,
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>();
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 70, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Inactive);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var kinds = capture.Payloads.OfType<AlarmTransitionEvent>().Select(e => e.TransitionKind).ToList();
|
||||
kinds.ShouldContain("Activated");
|
||||
kinds.ShouldContain("Cleared");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Manual_acknowledge_emits_Acknowledged_transition_with_user()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig("a-1", "Pump Fail", "/eq", 700, Predicate: null);
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config, evaluator: null,
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("driver-fault"));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>();
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("operator-jane"));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Acknowledged);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var ackEvt = capture.Payloads.OfType<AlarmTransitionEvent>()
|
||||
.SingleOrDefault(e => e.TransitionKind == "Acknowledged");
|
||||
ackEvt.ShouldNotBeNull();
|
||||
ackEvt.User.ShouldBe("operator-jane");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
private sealed class ThresholdEvaluator : IScriptedAlarmEvaluator
|
||||
{
|
||||
private readonly double _threshold;
|
||||
public ThresholdEvaluator(double threshold) { _threshold = threshold; }
|
||||
public ScriptedAlarmEvalResult Evaluate(string id, string predicate, IReadOnlyDictionary<string, object?> deps)
|
||||
{
|
||||
if (!deps.TryGetValue("temp", out var raw) || raw is null)
|
||||
return ScriptedAlarmEvalResult.Failure("missing temp");
|
||||
return ScriptedAlarmEvalResult.Ok(Convert.ToDouble(raw) > _threshold);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class CapturingPublisher
|
||||
{
|
||||
public ConcurrentBag<string> Topics { get; } = new();
|
||||
public ConcurrentBag<object> Payloads { get; } = new();
|
||||
public void Publish(string topic, object payload)
|
||||
{
|
||||
Topics.Add(topic);
|
||||
Payloads.Add(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
@@ -15,8 +18,100 @@ public sealed class VirtualTagActorTests : RuntimeActorTestBase
|
||||
Watch(actor);
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-a", 10, DateTime.UtcNow));
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-b", 20, DateTime.UtcNow));
|
||||
|
||||
// No crash, no termination.
|
||||
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Evaluator_result_flows_to_parent_as_EvaluationResult()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
var evaluator = new SumEvaluator();
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "a + b", evaluator: evaluator));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 10, DateTime.UtcNow));
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("b", 32, DateTime.UtcNow));
|
||||
|
||||
// First dep: a alone -> 10. Second dep: a + b -> 42.
|
||||
var first = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
first.Value.ShouldBe(10);
|
||||
var second = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
second.Value.ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Repeated_same_value_does_not_emit_a_second_EvaluationResult()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
var evaluator = new ConstEvaluator(42);
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "expr", evaluator: evaluator));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow));
|
||||
var first = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
first.Value.ShouldBe(42);
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 2, DateTime.UtcNow));
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Evaluator_failure_publishes_ScriptLogEntry_warning()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props(
|
||||
"vt-1", "broken",
|
||||
evaluator: new FailingEvaluator("syntax error"),
|
||||
scriptId: "script-7",
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow));
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
capture.Topics.ShouldContain("script-logs");
|
||||
var entry = (ScriptLogEntry)capture.Payloads.Single();
|
||||
entry.Level.ShouldBe("Warning");
|
||||
entry.Message.ShouldContain("syntax error");
|
||||
entry.ScriptId.ShouldBe("script-7");
|
||||
entry.VirtualTagId.ShouldBe("vt-1");
|
||||
}, duration: TimeSpan.FromMilliseconds(500));
|
||||
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
|
||||
}
|
||||
|
||||
private sealed class SumEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
{
|
||||
var sum = deps.Values.OfType<int>().Sum();
|
||||
return VirtualTagEvalResult.Ok(sum);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ConstEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
private readonly object _value;
|
||||
public ConstEvaluator(object value) { _value = value; }
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
=> VirtualTagEvalResult.Ok(_value);
|
||||
}
|
||||
|
||||
private sealed class FailingEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
private readonly string _reason;
|
||||
public FailingEvaluator(string reason) { _reason = reason; }
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
=> VirtualTagEvalResult.Failure(_reason);
|
||||
}
|
||||
|
||||
private sealed class CapturingPublisher
|
||||
{
|
||||
public ConcurrentBag<string> Topics { get; } = new();
|
||||
public ConcurrentBag<object> Payloads { get; } = new();
|
||||
public void Publish(string topic, object payload)
|
||||
{
|
||||
Topics.Add(topic);
|
||||
Payloads.Add(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user