Single commit covering the four small/medium fixes from the updated code review. Core.Scripting-014 (Medium, Concurrency): CompiledScriptCache.Clear() used the key-only TryRemove(key, out var lazy) overload — same race shape Core.Scripting-006 closed in GetOrCompile's catch block. A concurrent re-add between snapshot and TryRemove was evicted + disposed while the new caller still held it. Replaced with the value-scoped TryRemove(KeyValuePair<,>) overload. Regression test Clear_uses_value_scoped_TryRemove_so_a_race_inserted_entry_survives added. Core.Scripting-013 (Medium, Security): Hand-rolled BuildWrapperSource pastes user source between literal braces; brace-balanced source could inject sibling methods/classes alongside CompiledScript.Run. Analyzer still walked the injected members so it wasn't a direct escape, but it relaxed the documented 'method body' authoring contract. Added EnforceSingleRunMember: after ParseText, the compilation unit must hold exactly one type (CompiledScript) and that type must hold exactly one member (the Run method). Any deviation throws CompilationErrorException with LMX001/ LMX002 diagnostic IDs and a Core.Scripting-013 reference in the message. Two regression tests added covering the sibling-method and sibling-class injection vectors. Core.Scripting-015 (Low, Correctness, latent): ToCSharpTypeName's generic branch truncated at the first backtick via IndexOf, silently dropping closed args of nested-generic shapes (Outer<T>.Inner<U>). No production caller exercises this shape today (all TContext/TResult are top-level non-nested), so the bug was latent. Rewrote the generic branch to walk the FullName segment-by- segment, consuming generic args per segment so nested shapes emit valid C# (global::Ns.Outer<T>.Inner<U> rather than the broken Outer<T,U>). Core.ScriptedAlarms-013 (Low, Documentation): The internal test accessors TryGetScratchReadCacheForTest / TryGetScratchContextForTest return live mutable scratch refilled in place under _evalGate. XML docs didn't warn future test authors about the synchronization contract. Added a <remarks> block to each documenting the only-safe-on-quiesced-engine + identity-or-single-key contract. Verification (suites green): Core.Scripting.Tests: 110/110 (was 107 — +3 new rejection/race tests) Core.ScriptedAlarms.Tests: 67/67 (unchanged — doc-only fix) Core.VirtualTags.Tests: 57/57 (unchanged) After this commit, all 12 findings from the updated re-review are closed (10 Resolved, 1 Won't Fix none, 1 Deferred — Driver.Galaxy-017). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
774 lines
39 KiB
C#
774 lines
39 KiB
C#
using System.Collections.Concurrent;
|
|
using Serilog;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
|
|
|
/// <summary>
|
|
/// Phase 7 scripted-alarm orchestrator. Compiles every configured alarm's predicate
|
|
/// against the Stream A sandbox, subscribes to the referenced upstream tags,
|
|
/// re-evaluates the predicate on every input change + on a shelving-check timer,
|
|
/// applies the resulting transition through <see cref="Part9StateMachine"/>,
|
|
/// persists state via <see cref="IAlarmStateStore"/>, and emits the resulting events
|
|
/// through <see cref="ScriptedAlarmSource"/> (which wires into the existing
|
|
/// <c>IAlarmSource</c> fan-out).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Scripted alarms are leaves in the evaluation DAG — no alarm's state drives
|
|
/// another alarm's predicate. The engine maintains only an inverse index from
|
|
/// upstream tag path → alarms referencing it; no topological sort needed
|
|
/// (unlike the virtual-tag engine).
|
|
/// </para>
|
|
/// <para>
|
|
/// Evaluation errors (script throws, timeout, coercion fail) surface as
|
|
/// structured errors in the dedicated scripts-*.log sink plus a WARN companion
|
|
/// in the main log. The alarm's ActiveState stays at its prior value — the
|
|
/// engine does NOT invent a clear transition just because the predicate broke.
|
|
/// Operators investigating a broken predicate shouldn't see a phantom
|
|
/// clear-event preceding the failure.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class ScriptedAlarmEngine : IDisposable
|
|
{
|
|
private readonly ITagUpstreamSource _upstream;
|
|
private readonly IAlarmStateStore _store;
|
|
private readonly ScriptLoggerFactory _loggerFactory;
|
|
private readonly ILogger _engineLogger;
|
|
private readonly Func<DateTime> _clock;
|
|
private readonly TimeSpan _scriptTimeout;
|
|
|
|
// ConcurrentDictionary, not a plain Dictionary: every mutation happens under
|
|
// _evalGate, but four read paths (GetState, GetAllStates, LoadedAlarmIds,
|
|
// RunShelvingCheck) touch _alarms from arbitrary threads (Admin UI request
|
|
// threads, the shelving Timer thread-pool callback) without holding the gate.
|
|
// A plain Dictionary read concurrent with a writer's entry reassignment can
|
|
// throw or return torn state; ConcurrentDictionary makes entry assignment and
|
|
// snapshot enumeration safe. The only write shapes are indexer-set and Clear,
|
|
// both of which ConcurrentDictionary supports atomically. (Core.ScriptedAlarms-001)
|
|
private readonly ConcurrentDictionary<string, AlarmState> _alarms = new(StringComparer.Ordinal);
|
|
|
|
/// <summary>
|
|
/// Per-alarm reusable evaluation scratch. The read-cache dictionary and the
|
|
/// <see cref="AlarmPredicateContext"/> instance are both allocated once per
|
|
/// alarm (on first evaluation) and reused across every subsequent re-eval —
|
|
/// the hot path no longer allocates a fresh dictionary + context per upstream
|
|
/// tag change. Safe because <see cref="EvaluatePredicateToStateAsync"/> only
|
|
/// runs under <see cref="_evalGate"/>, which serialises every evaluation:
|
|
/// two threads can never observe the same scratch in a half-refilled state.
|
|
/// Cleared in <see cref="LoadAsync"/> alongside <see cref="_alarms"/>.
|
|
/// (Core.ScriptedAlarms-009)
|
|
/// </summary>
|
|
private readonly ConcurrentDictionary<string, AlarmScratch> _scratchByAlarmId =
|
|
new(StringComparer.Ordinal);
|
|
|
|
/// <summary>
|
|
/// Compile cache for every alarm predicate. Routes <see cref="LoadAsync"/>'s
|
|
/// <see cref="ScriptEvaluator{TContext, TResult}.Compile"/> calls through the
|
|
/// cache so the collectible <see cref="System.Runtime.Loader.AssemblyLoadContext"/>
|
|
/// each compile produces is actually disposed on the publish-replace path
|
|
/// (Core.Scripting-016): the cache's <see cref="CompiledScriptCache{TContext, TResult}.Clear"/>
|
|
/// disposes every materialised evaluator before dropping its dictionary entry,
|
|
/// so a config-publish releases the prior generation's ALCs and the per-publish
|
|
/// accretion the Core.Scripting-008 fix targeted is actually freed in production.
|
|
/// Pre-fix the engine called <c>ScriptEvaluator.Compile</c> directly, which left
|
|
/// the ALCs rooted until the process exited — defeating -008 on the real path.
|
|
/// </summary>
|
|
private readonly CompiledScriptCache<AlarmPredicateContext, bool> _compileCache = new();
|
|
|
|
/// <summary>
|
|
/// Test-only diagnostic: returns the per-alarm scratch read-cache dictionary
|
|
/// if one has been allocated, else null. Used by Core.ScriptedAlarms-009
|
|
/// regression tests to assert the scratch is reused across evaluations
|
|
/// (two reads return the same instance).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <b>Synchronization:</b> the returned <see cref="IReadOnlyDictionary{TKey, TValue}"/>
|
|
/// is the engine's live mutable read-cache. It is refilled in place by
|
|
/// <c>RefillReadCache</c> on every predicate evaluation, under <c>_evalGate</c>.
|
|
/// Test callers MUST NOT iterate this dictionary while the engine is
|
|
/// actively evaluating (i.e. while an upstream change is mid-flight); the
|
|
/// refill clears the dict before repopulating and a concurrent iterator
|
|
/// would observe torn / partial state. Safe uses are: reference-identity
|
|
/// comparisons (e.g. asserting the same instance is reused across calls),
|
|
/// and single-key reads against an engine that has quiesced after a
|
|
/// deterministic upstream push. Anything more involved should snapshot a
|
|
/// copy under the gate. (Core.ScriptedAlarms-013.)
|
|
/// </remarks>
|
|
internal IReadOnlyDictionary<string, DataValueSnapshot>? TryGetScratchReadCacheForTest(string alarmId)
|
|
=> _scratchByAlarmId.TryGetValue(alarmId, out var s) ? s.ReadCache : null;
|
|
|
|
/// <summary>
|
|
/// Test-only diagnostic: returns the per-alarm <see cref="AlarmPredicateContext"/>
|
|
/// if one has been allocated, else null. Companion to
|
|
/// <see cref="TryGetScratchReadCacheForTest"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <b>Synchronization:</b> the returned context wraps the same live
|
|
/// read-cache as <see cref="TryGetScratchReadCacheForTest"/> — the same
|
|
/// "don't iterate during an in-flight evaluation" caveat applies. Safe
|
|
/// for reference-identity assertions on a quiesced engine.
|
|
/// (Core.ScriptedAlarms-013.)
|
|
/// </remarks>
|
|
internal AlarmPredicateContext? TryGetScratchContextForTest(string alarmId)
|
|
=> _scratchByAlarmId.TryGetValue(alarmId, out var s) ? s.Context : null;
|
|
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache
|
|
= new(StringComparer.Ordinal);
|
|
private readonly Dictionary<string, HashSet<string>> _alarmsReferencing
|
|
= new(StringComparer.Ordinal); // tag path -> alarm ids
|
|
|
|
private readonly List<IDisposable> _upstreamSubscriptions = [];
|
|
private readonly SemaphoreSlim _evalGate = new(1, 1);
|
|
private Timer? _shelvingTimer;
|
|
private bool _loaded;
|
|
private bool _disposed;
|
|
|
|
// Tracks fire-and-forget background work launched by OnUpstreamChange
|
|
// (ReevaluateAsync) and RunShelvingCheck (ShelvingCheckAsync). Dispose drains
|
|
// these so a re-evaluation in flight when shutdown begins finishes its
|
|
// SaveAsync before the engine returns control to the caller. The HashSet is
|
|
// accessed under its own lock — never under _evalGate — so registration /
|
|
// unregistration cannot deadlock against the gate. (Core.ScriptedAlarms-006)
|
|
private readonly HashSet<Task> _inFlight = [];
|
|
private readonly object _inFlightLock = new();
|
|
|
|
public ScriptedAlarmEngine(
|
|
ITagUpstreamSource upstream,
|
|
IAlarmStateStore store,
|
|
ScriptLoggerFactory loggerFactory,
|
|
ILogger engineLogger,
|
|
Func<DateTime>? clock = null,
|
|
TimeSpan? scriptTimeout = null)
|
|
{
|
|
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
|
|
_store = store ?? throw new ArgumentNullException(nameof(store));
|
|
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
|
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
|
|
_clock = clock ?? (() => DateTime.UtcNow);
|
|
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<AlarmPredicateContext, bool>.DefaultTimeout;
|
|
}
|
|
|
|
/// <summary>Raised for every emission the Part9StateMachine produces that the engine should publish.</summary>
|
|
public event EventHandler<ScriptedAlarmEvent>? OnEvent;
|
|
|
|
public IReadOnlyCollection<string> LoadedAlarmIds => _alarms.Keys.ToArray();
|
|
|
|
/// <summary>
|
|
/// Load a batch of alarm definitions. Compiles every predicate, aggregates any
|
|
/// compile failures into one <see cref="InvalidOperationException"/>, subscribes
|
|
/// to upstream input tags, seeds the value cache, loads persisted state from
|
|
/// the store (falling back to Fresh for first-load alarms), and recomputes
|
|
/// ActiveState per Phase 7 plan decision #14 (startup recovery).
|
|
/// </summary>
|
|
public async Task LoadAsync(IReadOnlyList<ScriptedAlarmDefinition> definitions, CancellationToken ct)
|
|
{
|
|
if (_disposed) throw new ObjectDisposedException(nameof(ScriptedAlarmEngine));
|
|
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
|
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
UnsubscribeFromUpstream();
|
|
_alarms.Clear();
|
|
_alarmsReferencing.Clear();
|
|
// Drop the prior generation's per-alarm scratch buffers — definitions may
|
|
// have changed (different Inputs, different Logger), so any reuse would be
|
|
// unsafe. (Core.ScriptedAlarms-009)
|
|
_scratchByAlarmId.Clear();
|
|
// Dispose every compiled-predicate ALC from the prior generation BEFORE we
|
|
// recompile this one. Skipping this is what made Core.Scripting-008 a
|
|
// no-op in production. (Core.Scripting-016)
|
|
_compileCache.Clear();
|
|
|
|
var compileFailures = new List<string>();
|
|
foreach (var def in definitions)
|
|
{
|
|
try
|
|
{
|
|
var extraction = DependencyExtractor.Extract(def.PredicateScriptSource);
|
|
if (!extraction.IsValid)
|
|
{
|
|
var joined = string.Join("; ", extraction.Rejections.Select(r => r.Message));
|
|
compileFailures.Add($"{def.AlarmId}: dependency extraction rejected — {joined}");
|
|
continue;
|
|
}
|
|
|
|
// Route through CompiledScriptCache so the emitted assembly's
|
|
// collectible ALC participates in publish-replace cleanup.
|
|
// (Core.Scripting-016)
|
|
var evaluator = _compileCache.GetOrCompile(def.PredicateScriptSource);
|
|
var timed = new TimedScriptEvaluator<AlarmPredicateContext, bool>(evaluator, _scriptTimeout);
|
|
var logger = _loggerFactory.Create(def.AlarmId);
|
|
|
|
var templateTokens = MessageTemplate.ExtractTokenPaths(def.MessageTemplate);
|
|
var allInputs = new HashSet<string>(extraction.Reads, StringComparer.Ordinal);
|
|
foreach (var t in templateTokens) allInputs.Add(t);
|
|
|
|
_alarms[def.AlarmId] = new AlarmState(def, timed, extraction.Reads, templateTokens, logger,
|
|
AlarmConditionState.Fresh(def.AlarmId, _clock()));
|
|
|
|
foreach (var path in allInputs)
|
|
{
|
|
if (!_alarmsReferencing.TryGetValue(path, out var set))
|
|
_alarmsReferencing[path] = set = new HashSet<string>(StringComparer.Ordinal);
|
|
set.Add(def.AlarmId);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
compileFailures.Add($"{def.AlarmId}: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
if (compileFailures.Count > 0)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"ScriptedAlarmEngine load failed. {compileFailures.Count} alarm(s) did not compile:\n "
|
|
+ string.Join("\n ", compileFailures));
|
|
}
|
|
|
|
// Seed the value cache with current tag values before subscribing. The
|
|
// ReadTag calls happen first so that the initial predicate evaluation below
|
|
// (startup recovery, decision #14) uses a consistent snapshot.
|
|
// Subscriptions are established AFTER _loaded = true so that any synchronous
|
|
// initial-push an ITagUpstreamSource delivers from inside SubscribeTag arrives
|
|
// when _alarms is fully initialised. Before _loaded = true, a synchronous push
|
|
// would race the in-progress state restore and could overwrite the carefully
|
|
// seeded cache with a push that has no defined ordering relative to ReadTag.
|
|
// (Core.ScriptedAlarms-004)
|
|
foreach (var path in _alarmsReferencing.Keys)
|
|
_valueCache[path] = _upstream.ReadTag(path);
|
|
|
|
// Restore persisted state, falling back to Fresh where nothing was saved,
|
|
// then re-derive ActiveState from the current predicate per decision #14.
|
|
// Any predicate emissions queue into `pending` and fire after the gate
|
|
// is released — so a startup-recovery activation event can call back into
|
|
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
|
foreach (var (alarmId, state) in _alarms)
|
|
{
|
|
var persisted = await _store.LoadAsync(alarmId, ct).ConfigureAwait(false);
|
|
var seed = persisted ?? state.Condition;
|
|
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct, pending)
|
|
.ConfigureAwait(false);
|
|
_alarms[alarmId] = state with { Condition = afterPredicate };
|
|
await _store.SaveAsync(afterPredicate, ct).ConfigureAwait(false);
|
|
}
|
|
|
|
_loaded = true;
|
|
|
|
// Subscribe after _loaded = true and full state restore. If an upstream
|
|
// implementation pushes its initial value synchronously from inside
|
|
// SubscribeTag, OnUpstreamChange will queue a ReevaluateAsync that acquires
|
|
// _evalGate — it will correctly block until LoadAsync releases the gate, then
|
|
// re-evaluate against the fully-populated _alarms dict.
|
|
foreach (var path in _alarmsReferencing.Keys)
|
|
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
|
|
_engineLogger.Information("ScriptedAlarmEngine loaded {Count} alarm(s)", _alarms.Count);
|
|
|
|
// Dispose any previously-created timer before reassigning; a second LoadAsync
|
|
// call without this would leave two timers firing against the same engine.
|
|
// (Core.ScriptedAlarms-002)
|
|
_shelvingTimer?.Dispose();
|
|
|
|
// Start the shelving-check timer — ticks every 5s, expires any timed shelves
|
|
// that have passed their UnshelveAtUtc.
|
|
_shelvingTimer = new Timer(_ => RunShelvingCheck(),
|
|
null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
|
|
}
|
|
finally
|
|
{
|
|
_evalGate.Release();
|
|
}
|
|
|
|
// Fire any emissions collected during startup recovery OUTSIDE the gate so
|
|
// subscribers can re-enter the engine safely. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Current persisted state for <paramref name="alarmId"/>. Returns null for
|
|
/// unknown alarm. Mainly used for diagnostics + the Admin UI status page.
|
|
/// </summary>
|
|
public AlarmConditionState? GetState(string alarmId)
|
|
=> _alarms.TryGetValue(alarmId, out var s) ? s.Condition : null;
|
|
|
|
public IReadOnlyCollection<AlarmConditionState> GetAllStates()
|
|
=> _alarms.Values.Select(a => a.Condition).ToArray();
|
|
|
|
public Task AcknowledgeAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAcknowledge(cur, user, comment, _clock()));
|
|
|
|
public Task ConfirmAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyConfirm(cur, user, comment, _clock()));
|
|
|
|
public Task OneShotShelveAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyOneShotShelve(cur, user, _clock()));
|
|
|
|
public Task TimedShelveAsync(string alarmId, string user, DateTime unshelveAtUtc, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyTimedShelve(cur, user, unshelveAtUtc, _clock()));
|
|
|
|
public Task UnshelveAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyUnshelve(cur, user, _clock()));
|
|
|
|
public Task EnableAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyEnable(cur, user, _clock()));
|
|
|
|
public Task DisableAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyDisable(cur, user, _clock()));
|
|
|
|
public Task AddCommentAsync(string alarmId, string user, string text, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAddComment(cur, user, text, _clock()));
|
|
|
|
private async Task ApplyAsync(string alarmId, CancellationToken ct, Func<AlarmConditionState, TransitionResult> op)
|
|
{
|
|
EnsureLoaded();
|
|
if (!_alarms.TryGetValue(alarmId, out var state))
|
|
throw new ArgumentException($"Unknown alarm {alarmId}", nameof(alarmId));
|
|
|
|
ScriptedAlarmEvent? pending = null;
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
var result = op(state.Condition);
|
|
// Persist BEFORE updating in-memory so a store failure leaves both
|
|
// in-memory and persisted at the prior state rather than diverging.
|
|
// If SaveAsync throws the in-memory _alarms entry stays unchanged and
|
|
// the exception propagates to the caller. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
|
_alarms[alarmId] = state with { Condition = result.State };
|
|
// Build the emission event under the gate (it captures a coherent
|
|
// snapshot of state + message-template values) but defer the actual
|
|
// OnEvent dispatch until after Release() so a slow subscriber or a
|
|
// subscriber that re-enters the engine doesn't block / deadlock.
|
|
// (Core.ScriptedAlarms-003)
|
|
if (result.Emission != EmissionKind.None)
|
|
pending = BuildEmission(state, result.State, result.Emission);
|
|
else if (result.NoOpReason is { } reason)
|
|
{
|
|
// The Part9StateMachine remarks promise a diagnostic log line for
|
|
// disabled-alarm no-ops + idempotent ack/confirm/shelve/unshelve
|
|
// calls. We surface them at debug so they're available when
|
|
// investigating "why didn't my ack take effect?" without spamming
|
|
// the main info log. (Core.ScriptedAlarms-011)
|
|
state.Logger.Debug("Alarm {AlarmId} no-op transition: {Reason}", alarmId, reason);
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
|
|
// OnEvent dispatch happens OUTSIDE _evalGate so subscribers can call back
|
|
// into the engine (e.g. AcknowledgeAsync from inside an Activated handler)
|
|
// without deadlocking against the non-reentrant SemaphoreSlim.
|
|
if (pending is not null) FireEvent(pending);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Upstream-change callback. Updates the value cache + enqueues predicate
|
|
/// re-evaluation for every alarm referencing the changed path. Fire-and-forget
|
|
/// so driver-side dispatch isn't blocked; the background task is tracked so
|
|
/// <see cref="Dispose"/> can drain it. (Core.ScriptedAlarms-006)
|
|
/// </summary>
|
|
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
|
{
|
|
_valueCache[path] = value;
|
|
if (_disposed) return; // don't queue new work against a disposing engine
|
|
if (_alarmsReferencing.TryGetValue(path, out var alarmIds))
|
|
{
|
|
TrackBackgroundTask(ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None));
|
|
}
|
|
}
|
|
|
|
private async Task ReevaluateAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
|
{
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
try
|
|
{
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// Re-check after acquiring the gate: a Dispose() call may have
|
|
// completed between our _evalGate.WaitAsync and here. Writing to a
|
|
// disposing store or mutating _alarms after clear is unsafe.
|
|
// (Core.ScriptedAlarms-005)
|
|
if (_disposed) return;
|
|
|
|
foreach (var id in alarmIds)
|
|
{
|
|
if (!_alarms.TryGetValue(id, out var state)) continue;
|
|
var newState = await EvaluatePredicateToStateAsync(
|
|
state, state.Condition, _clock(), ct, pending).ConfigureAwait(false);
|
|
if (!ReferenceEquals(newState, state.Condition))
|
|
{
|
|
// Persist before updating in-memory so a store failure leaves
|
|
// both sides at the prior state. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(newState, ct).ConfigureAwait(false);
|
|
_alarms[id] = state with { Condition = newState };
|
|
}
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Error(ex, "ScriptedAlarmEngine reevaluate failed");
|
|
return;
|
|
}
|
|
// Fire emissions OUTSIDE _evalGate so subscriber callbacks can re-enter
|
|
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Evaluate the predicate + apply the resulting state-machine transition.
|
|
/// Returns the new condition state. If the transition produces an emission,
|
|
/// appends it to <paramref name="pendingEmissions"/> so the caller can fire
|
|
/// them after releasing <c>_evalGate</c> — keeping subscriber callbacks
|
|
/// outside the gate. (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
private async Task<AlarmConditionState> EvaluatePredicateToStateAsync(
|
|
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct,
|
|
List<ScriptedAlarmEvent>? pendingEmissions = null)
|
|
{
|
|
// Look up (or lazily allocate) the per-alarm scratch and refill its read cache
|
|
// in place. The dictionary + context survive across evaluations so the hot path
|
|
// no longer allocates per upstream tag change. (Core.ScriptedAlarms-009)
|
|
var scratch = _scratchByAlarmId.GetOrAdd(
|
|
state.Definition.AlarmId,
|
|
_ => new AlarmScratch(state.Inputs, state.Logger, _clock));
|
|
RefillReadCache(scratch.ReadCache, state.Inputs);
|
|
|
|
// Cold-start guard — skip the predicate when any referenced upstream tag has no
|
|
// cached value yet (the upstream subscription hasn't delivered its first push).
|
|
// Without this, predicates that cast `(double)ctx.GetTag(path).Value` throw NRE on
|
|
// every tick until the cache fills, spamming the log with identical stack traces.
|
|
// Bad quality is treated the same: the input isn't available at the predicate's
|
|
// expected type, so the only defensible move is to hold the prior condition state.
|
|
if (!AreInputsReady(scratch.ReadCache)) return seed;
|
|
|
|
var context = scratch.Context;
|
|
|
|
bool predicateTrue;
|
|
try
|
|
{
|
|
predicateTrue = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (ScriptTimeoutException tex)
|
|
{
|
|
state.Logger.Warning("Alarm predicate timed out after {Timeout} — state unchanged", tex.Timeout);
|
|
return seed;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
state.Logger.Error(ex, "Alarm predicate threw — state unchanged");
|
|
return seed;
|
|
}
|
|
|
|
var result = Part9StateMachine.ApplyPredicate(seed, predicateTrue, nowUtc);
|
|
if (result.Emission != EmissionKind.None)
|
|
{
|
|
var evt = BuildEmission(state, result.State, result.Emission);
|
|
if (evt is not null)
|
|
{
|
|
if (pendingEmissions is not null) pendingEmissions.Add(evt);
|
|
else FireEvent(evt); // LoadAsync path: no caller-supplied list, fire here.
|
|
}
|
|
}
|
|
return result.State;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Refill <paramref name="cache"/> in place from <c>_valueCache</c>, falling
|
|
/// back to a synchronous <c>ITagUpstreamSource.ReadTag</c> for paths whose
|
|
/// first upstream push hasn't arrived yet. The dictionary is cleared and
|
|
/// repopulated under <c>_evalGate</c> so no concurrent reader can observe
|
|
/// a partial state. Replaces the old <c>BuildReadCache</c> which allocated a
|
|
/// fresh dictionary every call (Core.ScriptedAlarms-009).
|
|
/// </summary>
|
|
private void RefillReadCache(
|
|
Dictionary<string, DataValueSnapshot> cache, IReadOnlySet<string> inputs)
|
|
{
|
|
cache.Clear();
|
|
foreach (var p in inputs)
|
|
cache[p] = _valueCache.TryGetValue(p, out var v) ? v : _upstream.ReadTag(p);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true when every entry in <paramref name="cache"/> has a non-null value
|
|
/// and a Good-quality <see cref="DataValueSnapshot.StatusCode"/>. A false here lets
|
|
/// callers short-circuit script evaluation — predicates that unconditionally cast
|
|
/// <c>ctx.GetTag(path).Value</c> to a numeric type would otherwise throw
|
|
/// <see cref="NullReferenceException"/> until the upstream subscription delivers
|
|
/// its first push.
|
|
/// </summary>
|
|
private static bool AreInputsReady(IReadOnlyDictionary<string, DataValueSnapshot> cache)
|
|
{
|
|
foreach (var kv in cache)
|
|
{
|
|
if (kv.Value is null || kv.Value.Value is null) return false;
|
|
// OPC UA Part 4 StatusCode: bit 31 = severity 10 (Bad). Treat Good + Uncertain
|
|
// as "ready"; Uncertain carries a value the script can still inspect + make a
|
|
// qualified decision from. Only outright Bad is skipped.
|
|
if ((kv.Value.StatusCode & 0x80000000u) != 0) return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Build (but do not fire) the <see cref="ScriptedAlarmEvent"/> for a
|
|
/// transition. Returns null for kinds that should not be published
|
|
/// (<see cref="EmissionKind.Suppressed"/> and
|
|
/// <see cref="EmissionKind.None"/>). Pure construction — called under
|
|
/// <c>_evalGate</c> so the message-template resolution uses a coherent
|
|
/// value-cache snapshot. The actual <see cref="OnEvent"/> dispatch is
|
|
/// done by <see cref="FireEvent(ScriptedAlarmEvent)"/> AFTER the gate is
|
|
/// released. (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
private ScriptedAlarmEvent? BuildEmission(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
|
{
|
|
// Suppressed kind means shelving ate the emission — we don't fire for subscribers
|
|
// but the state record still advanced so startup recovery reflects reality.
|
|
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return null;
|
|
|
|
var message = MessageTemplate.Resolve(state.Definition.MessageTemplate, TryLookup);
|
|
return new ScriptedAlarmEvent(
|
|
AlarmId: state.Definition.AlarmId,
|
|
EquipmentPath: state.Definition.EquipmentPath,
|
|
AlarmName: state.Definition.AlarmName,
|
|
Kind: state.Definition.Kind,
|
|
Severity: state.Definition.Severity,
|
|
Message: message,
|
|
Condition: condition,
|
|
Emission: kind,
|
|
TimestampUtc: _clock());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invoke the <see cref="OnEvent"/> handler for a built emission. Must be
|
|
/// called OUTSIDE <c>_evalGate</c>: a slow subscriber would otherwise
|
|
/// block the gate for every other engine operation, and a subscriber
|
|
/// that re-enters the engine (e.g. calls AcknowledgeAsync) would
|
|
/// deadlock against the non-reentrant SemaphoreSlim.
|
|
/// (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
private void FireEvent(ScriptedAlarmEvent evt)
|
|
{
|
|
try { OnEvent?.Invoke(this, evt); }
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", evt.AlarmId);
|
|
}
|
|
}
|
|
|
|
private DataValueSnapshot? TryLookup(string path)
|
|
=> _valueCache.TryGetValue(path, out var v) ? v : null;
|
|
|
|
private void RunShelvingCheck()
|
|
{
|
|
if (_disposed) return;
|
|
var ids = _alarms.Keys.ToArray();
|
|
TrackBackgroundTask(ShelvingCheckAsync(ids, CancellationToken.None));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Register a fire-and-forget task so <see cref="Dispose"/> can await it.
|
|
/// The task removes itself from the set on completion via a continuation.
|
|
/// (Core.ScriptedAlarms-006)
|
|
/// </summary>
|
|
private void TrackBackgroundTask(Task task)
|
|
{
|
|
lock (_inFlightLock) { _inFlight.Add(task); }
|
|
// Use ContinueWith with ExecuteSynchronously so the removal runs on the
|
|
// completing thread — avoids scheduler delay between completion and
|
|
// unregistration that would otherwise let Dispose see a stale set.
|
|
task.ContinueWith(t =>
|
|
{
|
|
lock (_inFlightLock) { _inFlight.Remove(t); }
|
|
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Test hook — triggers a shelving check synchronously without waiting for
|
|
/// the 5-second timer. Allows tests that inject a controllable clock to advance
|
|
/// time and immediately drive timed-shelve expiry. (Core.ScriptedAlarms-012)
|
|
/// </summary>
|
|
internal void RunShelvingCheckForTest() => RunShelvingCheck();
|
|
|
|
private async Task ShelvingCheckAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
|
{
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
try
|
|
{
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// Re-check after acquiring the gate: Timer.Dispose() does not wait for
|
|
// running callbacks, so a shelving-check callback that passed the _disposed
|
|
// check in RunShelvingCheck can arrive here after Dispose() has returned.
|
|
// Mutating _alarms or saving to a disposed store here is unsafe.
|
|
// (Core.ScriptedAlarms-005)
|
|
if (_disposed) return;
|
|
|
|
var now = _clock();
|
|
foreach (var id in alarmIds)
|
|
{
|
|
if (!_alarms.TryGetValue(id, out var state)) continue;
|
|
var result = Part9StateMachine.ApplyShelvingCheck(state.Condition, now);
|
|
if (!ReferenceEquals(result.State, state.Condition))
|
|
{
|
|
// Persist before updating in-memory so a store failure leaves
|
|
// both sides at the prior state. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
|
_alarms[id] = state with { Condition = result.State };
|
|
if (result.Emission != EmissionKind.None)
|
|
{
|
|
var evt = BuildEmission(state, result.State, result.Emission);
|
|
if (evt is not null) pending.Add(evt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Warning(ex, "ScriptedAlarmEngine shelving-check failed");
|
|
return;
|
|
}
|
|
// Fire emissions OUTSIDE _evalGate. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
private void UnsubscribeFromUpstream()
|
|
{
|
|
foreach (var s in _upstreamSubscriptions)
|
|
{
|
|
try { s.Dispose(); } catch { }
|
|
}
|
|
_upstreamSubscriptions.Clear();
|
|
}
|
|
|
|
private void EnsureLoaded()
|
|
{
|
|
if (!_loaded) throw new InvalidOperationException(
|
|
"ScriptedAlarmEngine not loaded. Call LoadAsync first.");
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_shelvingTimer?.Dispose();
|
|
UnsubscribeFromUpstream();
|
|
|
|
// Drain any fire-and-forget background work (ReevaluateAsync from
|
|
// OnUpstreamChange + ShelvingCheckAsync from the 5s timer) that started
|
|
// before _disposed = true was visible. Without this, a SaveAsync in
|
|
// flight can outlive the engine and write to a (possibly disposed) store
|
|
// after Dispose() has returned. The tasks re-check _disposed after
|
|
// acquiring the gate and bail out, but the await still has to complete.
|
|
// (Core.ScriptedAlarms-006)
|
|
Task[] toAwait;
|
|
lock (_inFlightLock) { toAwait = [.. _inFlight]; }
|
|
if (toAwait.Length > 0)
|
|
{
|
|
try { Task.WhenAll(toAwait).GetAwaiter().GetResult(); }
|
|
catch (Exception ex)
|
|
{
|
|
// Background task failures already logged inside ReevaluateAsync /
|
|
// ShelvingCheckAsync; surface here at debug so a parent shutdown is
|
|
// not noisy. The key invariant is that the tasks have COMPLETED.
|
|
_engineLogger.Debug(ex, "ScriptedAlarmEngine background task threw during shutdown drain");
|
|
}
|
|
}
|
|
|
|
// Safe to clear here: the Task.WhenAll drain above guaranteed no
|
|
// ReevaluateAsync / ShelvingCheckAsync is mid-flight, and _disposed=true
|
|
// prevents new background work from being queued (OnUpstreamChange bails on
|
|
// line 334). Pre-Core.Scripting-016 the comment said "Do NOT clear _alarms",
|
|
// but that was when the engine called ScriptEvaluator.Compile directly and
|
|
// held the script ALCs through _alarms→AlarmState→TimedScriptEvaluator
|
|
// forever — leaving them rooted defeated the -008 collectible-ALC unload.
|
|
// Clearing now drops the delegate references so the cache's Dispose call
|
|
// below can actually unload the emitted assemblies. (Core.ScriptedAlarms-005
|
|
// re-evaluated under -016.)
|
|
_alarms.Clear();
|
|
_alarmsReferencing.Clear();
|
|
_scratchByAlarmId.Clear();
|
|
// Dispose every compiled-predicate ALC so the engine's shutdown actually
|
|
// releases the emitted assemblies. The drain above ensures no evaluator is
|
|
// mid-call; CompiledScriptCache.Dispose internally guards against use-after-
|
|
// dispose. (Core.Scripting-016)
|
|
_compileCache.Dispose();
|
|
}
|
|
|
|
private sealed record AlarmState(
|
|
ScriptedAlarmDefinition Definition,
|
|
TimedScriptEvaluator<AlarmPredicateContext, bool> Evaluator,
|
|
IReadOnlySet<string> Inputs,
|
|
IReadOnlyList<string> TemplateTokens,
|
|
ILogger Logger,
|
|
AlarmConditionState Condition);
|
|
|
|
/// <summary>
|
|
/// Per-alarm reusable evaluation scratch. The <see cref="ReadCache"/> dictionary
|
|
/// is the same instance across every evaluation of the owning alarm — it is
|
|
/// cleared and refilled in <see cref="ScriptedAlarmEngine.RefillReadCache"/> on
|
|
/// each call. <see cref="Context"/> wraps that dictionary by reference, so a
|
|
/// refilled <see cref="ReadCache"/> is what the predicate's
|
|
/// <c>ctx.GetTag(path)</c> calls observe. (Core.ScriptedAlarms-009)
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Reuse is safe because <see cref="ScriptedAlarmEngine"/> serialises every
|
|
/// evaluation under <c>_evalGate</c>: two threads can never observe the same
|
|
/// scratch in a half-refilled state.
|
|
/// </remarks>
|
|
private sealed class AlarmScratch
|
|
{
|
|
public Dictionary<string, DataValueSnapshot> ReadCache { get; }
|
|
public AlarmPredicateContext Context { get; }
|
|
|
|
public AlarmScratch(IReadOnlySet<string> inputs, ILogger logger, Func<DateTime> clock)
|
|
{
|
|
// Pre-size to the expected input count so the first refill doesn't pay the
|
|
// dictionary-grow cost. The dictionary auto-grows if Inputs changes (it
|
|
// cannot under the current contract — Inputs is fixed at LoadAsync — but
|
|
// pre-sizing is defensive against future changes).
|
|
ReadCache = new Dictionary<string, DataValueSnapshot>(inputs.Count, StringComparer.Ordinal);
|
|
// Context holds the read cache by reference. Refilling the dictionary
|
|
// updates what the context (and the script) observes.
|
|
Context = new AlarmPredicateContext(ReadCache, logger, clock);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// One alarm emission the engine pushed to subscribers. Carries everything
|
|
/// downstream consumers (OPC UA alarm-source adapter + historian sink) need to
|
|
/// publish the event without re-querying the engine.
|
|
/// </summary>
|
|
public sealed record ScriptedAlarmEvent(
|
|
string AlarmId,
|
|
string EquipmentPath,
|
|
string AlarmName,
|
|
AlarmKind Kind,
|
|
AlarmSeverity Severity,
|
|
string Message,
|
|
AlarmConditionState Condition,
|
|
EmissionKind Emission,
|
|
DateTime TimestampUtc);
|
|
|
|
/// <summary>
|
|
/// Upstream source abstraction — intentionally identical shape to the virtual-tag
|
|
/// engine's so Stream G can compose them behind one driver bridge.
|
|
/// </summary>
|
|
public interface ITagUpstreamSource
|
|
{
|
|
DataValueSnapshot ReadTag(string path);
|
|
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
|
|
}
|