272a9da61e
Re-review at 7286d320. -015: dispose shelving timer at top of LoadAsync so a failed
reload doesn't leave it firing against partially-cleared state + test. -014: make
pendingEmissions required (removes unreachable fire-under-gate branch that could
reintroduce the -003 deadlock).
872 lines
46 KiB
C#
872 lines
46 KiB
C#
using System.Collections.Concurrent;
|
|
using Serilog;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
|
|
|
/// <summary>
|
|
/// Phase 7 scripted-alarm orchestrator. Compiles every configured alarm's predicate
|
|
/// against the Stream A sandbox, subscribes to the referenced upstream tags,
|
|
/// re-evaluates the predicate on every input change + on a shelving-check timer,
|
|
/// applies the resulting transition through <see cref="Part9StateMachine"/>,
|
|
/// persists state via <see cref="IAlarmStateStore"/>, and emits the resulting events
|
|
/// through <see cref="ScriptedAlarmSource"/> (which wires into the existing
|
|
/// <c>IAlarmSource</c> fan-out).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Scripted alarms are leaves in the evaluation DAG — no alarm's state drives
|
|
/// another alarm's predicate. The engine maintains only an inverse index from
|
|
/// upstream tag path → alarms referencing it; no topological sort needed
|
|
/// (unlike the virtual-tag engine).
|
|
/// </para>
|
|
/// <para>
|
|
/// Evaluation errors (script throws, timeout, coercion fail) surface as
|
|
/// structured errors in the dedicated scripts-*.log sink plus a WARN companion
|
|
/// in the main log. The alarm's ActiveState stays at its prior value — the
|
|
/// engine does NOT invent a clear transition just because the predicate broke.
|
|
/// Operators investigating a broken predicate shouldn't see a phantom
|
|
/// clear-event preceding the failure.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class ScriptedAlarmEngine : IDisposable
|
|
{
|
|
private readonly ITagUpstreamSource _upstream;
|
|
private readonly IAlarmStateStore _store;
|
|
private readonly ScriptLoggerFactory _loggerFactory;
|
|
private readonly ILogger _engineLogger;
|
|
private readonly Func<DateTime> _clock;
|
|
private readonly TimeSpan _scriptTimeout;
|
|
|
|
// ConcurrentDictionary, not a plain Dictionary: every mutation happens under
|
|
// _evalGate, but four read paths (GetState, GetAllStates, LoadedAlarmIds,
|
|
// RunShelvingCheck) touch _alarms from arbitrary threads (Admin UI request
|
|
// threads, the shelving Timer thread-pool callback) without holding the gate.
|
|
// A plain Dictionary read concurrent with a writer's entry reassignment can
|
|
// throw or return torn state; ConcurrentDictionary makes entry assignment and
|
|
// snapshot enumeration safe. The only write shapes are indexer-set and Clear,
|
|
// both of which ConcurrentDictionary supports atomically. (Core.ScriptedAlarms-001)
|
|
private readonly ConcurrentDictionary<string, AlarmState> _alarms = new(StringComparer.Ordinal);
|
|
|
|
/// <summary>
|
|
/// Per-alarm reusable evaluation scratch. The read-cache dictionary and the
|
|
/// <see cref="AlarmPredicateContext"/> instance are both allocated once per
|
|
/// alarm (on first evaluation) and reused across every subsequent re-eval —
|
|
/// the hot path no longer allocates a fresh dictionary + context per upstream
|
|
/// tag change. Safe because <see cref="EvaluatePredicateToStateAsync"/> only
|
|
/// runs under <see cref="_evalGate"/>, which serialises every evaluation:
|
|
/// two threads can never observe the same scratch in a half-refilled state.
|
|
/// Cleared in <see cref="LoadAsync"/> alongside <see cref="_alarms"/>.
|
|
/// (Core.ScriptedAlarms-009)
|
|
/// </summary>
|
|
private readonly ConcurrentDictionary<string, AlarmScratch> _scratchByAlarmId =
|
|
new(StringComparer.Ordinal);
|
|
|
|
/// <summary>
|
|
/// Compile cache for every alarm predicate. Routes <see cref="LoadAsync"/>'s
|
|
/// <see cref="ScriptEvaluator{TContext, TResult}.Compile"/> calls through the
|
|
/// cache so the collectible <see cref="System.Runtime.Loader.AssemblyLoadContext"/>
|
|
/// each compile produces is actually disposed on the publish-replace path
|
|
/// (Core.Scripting-016): the cache's <see cref="CompiledScriptCache{TContext, TResult}.Clear"/>
|
|
/// disposes every materialised evaluator before dropping its dictionary entry,
|
|
/// so a config-publish releases the prior generation's ALCs and the per-publish
|
|
/// accretion the Core.Scripting-008 fix targeted is actually freed in production.
|
|
/// Pre-fix the engine called <c>ScriptEvaluator.Compile</c> directly, which left
|
|
/// the ALCs rooted until the process exited — defeating -008 on the real path.
|
|
/// </summary>
|
|
private readonly CompiledScriptCache<AlarmPredicateContext, bool> _compileCache = new();
|
|
|
|
/// <summary>
|
|
/// Test-only diagnostic: returns the per-alarm scratch read-cache dictionary
|
|
/// if one has been allocated, else null. Used by Core.ScriptedAlarms-009
|
|
/// regression tests to assert the scratch is reused across evaluations
|
|
/// (two reads return the same instance).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <b>Synchronization:</b> the returned <see cref="IReadOnlyDictionary{TKey, TValue}"/>
|
|
/// is the engine's live mutable read-cache. It is refilled in place by
|
|
/// <c>RefillReadCache</c> on every predicate evaluation, under <c>_evalGate</c>.
|
|
/// Test callers MUST NOT iterate this dictionary while the engine is
|
|
/// actively evaluating (i.e. while an upstream change is mid-flight); the
|
|
/// refill clears the dict before repopulating and a concurrent iterator
|
|
/// would observe torn / partial state. Safe uses are: reference-identity
|
|
/// comparisons (e.g. asserting the same instance is reused across calls),
|
|
/// and single-key reads against an engine that has quiesced after a
|
|
/// deterministic upstream push. Anything more involved should snapshot a
|
|
/// copy under the gate. (Core.ScriptedAlarms-013.)
|
|
/// </remarks>
|
|
/// <param name="alarmId">The alarm identifier to look up.</param>
|
|
internal IReadOnlyDictionary<string, DataValueSnapshot>? TryGetScratchReadCacheForTest(string alarmId)
|
|
=> _scratchByAlarmId.TryGetValue(alarmId, out var s) ? s.ReadCache : null;
|
|
|
|
/// <summary>
|
|
/// Test-only diagnostic: returns the per-alarm <see cref="AlarmPredicateContext"/>
|
|
/// if one has been allocated, else null. Companion to
|
|
/// <see cref="TryGetScratchReadCacheForTest"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <b>Synchronization:</b> the returned context wraps the same live
|
|
/// read-cache as <see cref="TryGetScratchReadCacheForTest"/> — the same
|
|
/// "don't iterate during an in-flight evaluation" caveat applies. Safe
|
|
/// for reference-identity assertions on a quiesced engine.
|
|
/// (Core.ScriptedAlarms-013.)
|
|
/// </remarks>
|
|
/// <param name="alarmId">The alarm identifier to look up.</param>
|
|
internal AlarmPredicateContext? TryGetScratchContextForTest(string alarmId)
|
|
=> _scratchByAlarmId.TryGetValue(alarmId, out var s) ? s.Context : null;
|
|
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache
|
|
= new(StringComparer.Ordinal);
|
|
private readonly Dictionary<string, HashSet<string>> _alarmsReferencing
|
|
= new(StringComparer.Ordinal); // tag path -> alarm ids
|
|
|
|
private readonly List<IDisposable> _upstreamSubscriptions = [];
|
|
private readonly SemaphoreSlim _evalGate = new(1, 1);
|
|
private Timer? _shelvingTimer;
|
|
private bool _loaded;
|
|
private bool _disposed;
|
|
|
|
// Tracks fire-and-forget background work launched by OnUpstreamChange
|
|
// (ReevaluateAsync) and RunShelvingCheck (ShelvingCheckAsync). Dispose drains
|
|
// these so a re-evaluation in flight when shutdown begins finishes its
|
|
// SaveAsync before the engine returns control to the caller. The HashSet is
|
|
// accessed under its own lock — never under _evalGate — so registration /
|
|
// unregistration cannot deadlock against the gate. (Core.ScriptedAlarms-006)
|
|
private readonly HashSet<Task> _inFlight = [];
|
|
private readonly object _inFlightLock = new();
|
|
|
|
/// <summary>
|
|
/// Initializes a new ScriptedAlarmEngine with the provided dependencies.
|
|
/// </summary>
|
|
/// <param name="upstream">The upstream tag source for reading tag values.</param>
|
|
/// <param name="store">The alarm state store for persistence.</param>
|
|
/// <param name="loggerFactory">The factory for creating alarm loggers.</param>
|
|
/// <param name="engineLogger">The logger for engine-level diagnostics.</param>
|
|
/// <param name="clock">Optional function providing the current UTC time; defaults to DateTime.UtcNow.</param>
|
|
/// <param name="scriptTimeout">Optional timeout for script execution; defaults to the evaluator's default timeout.</param>
|
|
public ScriptedAlarmEngine(
|
|
ITagUpstreamSource upstream,
|
|
IAlarmStateStore store,
|
|
ScriptLoggerFactory loggerFactory,
|
|
ILogger engineLogger,
|
|
Func<DateTime>? clock = null,
|
|
TimeSpan? scriptTimeout = null)
|
|
{
|
|
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
|
|
_store = store ?? throw new ArgumentNullException(nameof(store));
|
|
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
|
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
|
|
_clock = clock ?? (() => DateTime.UtcNow);
|
|
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<AlarmPredicateContext, bool>.DefaultTimeout;
|
|
}
|
|
|
|
/// <summary>Raised for every emission the Part9StateMachine produces that the engine should publish.</summary>
|
|
public event EventHandler<ScriptedAlarmEvent>? OnEvent;
|
|
|
|
/// <summary>Gets the collection of loaded alarm identifiers.</summary>
|
|
public IReadOnlyCollection<string> LoadedAlarmIds => _alarms.Keys.ToArray();
|
|
|
|
/// <summary>
|
|
/// Load a batch of alarm definitions. Compiles every predicate, aggregates any
|
|
/// compile failures into one <see cref="InvalidOperationException"/>, subscribes
|
|
/// to upstream input tags, seeds the value cache, loads persisted state from
|
|
/// the store (falling back to Fresh for first-load alarms), and recomputes
|
|
/// ActiveState per Phase 7 plan decision #14 (startup recovery).
|
|
/// </summary>
|
|
/// <param name="definitions">The alarm definitions to load.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public async Task LoadAsync(IReadOnlyList<ScriptedAlarmDefinition> definitions, CancellationToken ct)
|
|
{
|
|
if (_disposed) throw new ObjectDisposedException(nameof(ScriptedAlarmEngine));
|
|
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
|
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// Stop the prior shelving timer BEFORE clearing _alarms so no in-progress
|
|
// tick can observe a half-cleared dictionary. Moving the dispose here also
|
|
// means a failed reload (compile errors, store failures) stops the timer
|
|
// even though execution never reaches the _shelvingTimer = new Timer(...)
|
|
// assignment at the bottom of the try block. Without this, the old timer
|
|
// keeps firing against the partially-cleared _alarms until Dispose() is
|
|
// eventually called — not a permanent leak, but an unexpected side effect
|
|
// during the window. (Core.ScriptedAlarms-015)
|
|
_shelvingTimer?.Dispose();
|
|
_shelvingTimer = null;
|
|
|
|
UnsubscribeFromUpstream();
|
|
_alarms.Clear();
|
|
_alarmsReferencing.Clear();
|
|
// Drop the prior generation's per-alarm scratch buffers — definitions may
|
|
// have changed (different Inputs, different Logger), so any reuse would be
|
|
// unsafe. (Core.ScriptedAlarms-009)
|
|
_scratchByAlarmId.Clear();
|
|
// Dispose every compiled-predicate ALC from the prior generation BEFORE we
|
|
// recompile this one. Skipping this is what made Core.Scripting-008 a
|
|
// no-op in production. (Core.Scripting-016)
|
|
_compileCache.Clear();
|
|
|
|
var compileFailures = new List<string>();
|
|
foreach (var def in definitions)
|
|
{
|
|
try
|
|
{
|
|
var extraction = DependencyExtractor.Extract(def.PredicateScriptSource);
|
|
if (!extraction.IsValid)
|
|
{
|
|
var joined = string.Join("; ", extraction.Rejections.Select(r => r.Message));
|
|
compileFailures.Add($"{def.AlarmId}: dependency extraction rejected — {joined}");
|
|
continue;
|
|
}
|
|
|
|
// Route through CompiledScriptCache so the emitted assembly's
|
|
// collectible ALC participates in publish-replace cleanup.
|
|
// (Core.Scripting-016)
|
|
var evaluator = _compileCache.GetOrCompile(def.PredicateScriptSource);
|
|
var timed = new TimedScriptEvaluator<AlarmPredicateContext, bool>(evaluator, _scriptTimeout);
|
|
var logger = _loggerFactory.Create(def.AlarmId);
|
|
|
|
var templateTokens = MessageTemplate.ExtractTokenPaths(def.MessageTemplate);
|
|
var allInputs = new HashSet<string>(extraction.Reads, StringComparer.Ordinal);
|
|
foreach (var t in templateTokens) allInputs.Add(t);
|
|
|
|
_alarms[def.AlarmId] = new AlarmState(def, timed, extraction.Reads, templateTokens, logger,
|
|
AlarmConditionState.Fresh(def.AlarmId, _clock()));
|
|
|
|
foreach (var path in allInputs)
|
|
{
|
|
if (!_alarmsReferencing.TryGetValue(path, out var set))
|
|
_alarmsReferencing[path] = set = new HashSet<string>(StringComparer.Ordinal);
|
|
set.Add(def.AlarmId);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
compileFailures.Add($"{def.AlarmId}: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
if (compileFailures.Count > 0)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"ScriptedAlarmEngine load failed. {compileFailures.Count} alarm(s) did not compile:\n "
|
|
+ string.Join("\n ", compileFailures));
|
|
}
|
|
|
|
// Seed the value cache with current tag values before subscribing. The
|
|
// ReadTag calls happen first so that the initial predicate evaluation below
|
|
// (startup recovery, decision #14) uses a consistent snapshot.
|
|
// Subscriptions are established AFTER _loaded = true so that any synchronous
|
|
// initial-push an ITagUpstreamSource delivers from inside SubscribeTag arrives
|
|
// when _alarms is fully initialised. Before _loaded = true, a synchronous push
|
|
// would race the in-progress state restore and could overwrite the carefully
|
|
// seeded cache with a push that has no defined ordering relative to ReadTag.
|
|
// (Core.ScriptedAlarms-004)
|
|
foreach (var path in _alarmsReferencing.Keys)
|
|
_valueCache[path] = _upstream.ReadTag(path);
|
|
|
|
// Restore persisted state, falling back to Fresh where nothing was saved,
|
|
// then re-derive ActiveState from the current predicate per decision #14.
|
|
// Any predicate emissions queue into `pending` and fire after the gate
|
|
// is released — so a startup-recovery activation event can call back into
|
|
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
|
foreach (var (alarmId, state) in _alarms)
|
|
{
|
|
var persisted = await _store.LoadAsync(alarmId, ct).ConfigureAwait(false);
|
|
var seed = persisted ?? state.Condition;
|
|
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct, pending)
|
|
.ConfigureAwait(false);
|
|
_alarms[alarmId] = state with { Condition = afterPredicate };
|
|
await _store.SaveAsync(afterPredicate, ct).ConfigureAwait(false);
|
|
}
|
|
|
|
_loaded = true;
|
|
|
|
// Subscribe after _loaded = true and full state restore. If an upstream
|
|
// implementation pushes its initial value synchronously from inside
|
|
// SubscribeTag, OnUpstreamChange will queue a ReevaluateAsync that acquires
|
|
// _evalGate — it will correctly block until LoadAsync releases the gate, then
|
|
// re-evaluate against the fully-populated _alarms dict.
|
|
foreach (var path in _alarmsReferencing.Keys)
|
|
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
|
|
_engineLogger.Information("ScriptedAlarmEngine loaded {Count} alarm(s)", _alarms.Count);
|
|
|
|
// Start the shelving-check timer — ticks every 5s, expires any timed shelves
|
|
// that have passed their UnshelveAtUtc. The prior timer was already disposed
|
|
// at the START of this try block (Core.ScriptedAlarms-015), so _shelvingTimer
|
|
// is null here; no double-dispose risk. (Core.ScriptedAlarms-002, -015)
|
|
_shelvingTimer = new Timer(_ => RunShelvingCheck(),
|
|
null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
|
|
}
|
|
finally
|
|
{
|
|
_evalGate.Release();
|
|
}
|
|
|
|
// Fire any emissions collected during startup recovery OUTSIDE the gate so
|
|
// subscribers can re-enter the engine safely. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Current persisted state for <paramref name="alarmId"/>. Returns null for
|
|
/// unknown alarm. Mainly used for diagnostics + the Admin UI status page.
|
|
/// </summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
public AlarmConditionState? GetState(string alarmId)
|
|
=> _alarms.TryGetValue(alarmId, out var s) ? s.Condition : null;
|
|
|
|
/// <summary>Gets the current persisted state for all loaded alarms.</summary>
|
|
public IReadOnlyCollection<AlarmConditionState> GetAllStates()
|
|
=> _alarms.Values.Select(a => a.Condition).ToArray();
|
|
|
|
/// <summary>Acknowledges the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the acknowledgment.</param>
|
|
/// <param name="comment">An optional comment to attach to the acknowledgment.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task AcknowledgeAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAcknowledge(cur, user, comment, _clock()));
|
|
|
|
/// <summary>Confirms the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the confirmation.</param>
|
|
/// <param name="comment">An optional comment to attach to the confirmation.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task ConfirmAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyConfirm(cur, user, comment, _clock()));
|
|
|
|
/// <summary>Applies a one-shot shelve to the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the shelve operation.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task OneShotShelveAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyOneShotShelve(cur, user, _clock()));
|
|
|
|
/// <summary>Applies a timed shelve to the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the shelve operation.</param>
|
|
/// <param name="unshelveAtUtc">The UTC time at which the shelve will automatically expire.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task TimedShelveAsync(string alarmId, string user, DateTime unshelveAtUtc, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyTimedShelve(cur, user, unshelveAtUtc, _clock()));
|
|
|
|
/// <summary>Removes any shelve from the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the unshelve operation.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task UnshelveAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyUnshelve(cur, user, _clock()));
|
|
|
|
/// <summary>Enables the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the enable operation.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task EnableAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyEnable(cur, user, _clock()));
|
|
|
|
/// <summary>Disables the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user performing the disable operation.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task DisableAsync(string alarmId, string user, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyDisable(cur, user, _clock()));
|
|
|
|
/// <summary>Adds a comment to the specified alarm on behalf of the given user.</summary>
|
|
/// <param name="alarmId">The alarm identifier.</param>
|
|
/// <param name="user">The user adding the comment.</param>
|
|
/// <param name="text">The comment text.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public Task AddCommentAsync(string alarmId, string user, string text, CancellationToken ct)
|
|
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAddComment(cur, user, text, _clock()));
|
|
|
|
private async Task ApplyAsync(string alarmId, CancellationToken ct, Func<AlarmConditionState, TransitionResult> op)
|
|
{
|
|
EnsureLoaded();
|
|
if (!_alarms.TryGetValue(alarmId, out var state))
|
|
throw new ArgumentException($"Unknown alarm {alarmId}", nameof(alarmId));
|
|
|
|
ScriptedAlarmEvent? pending = null;
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
var result = op(state.Condition);
|
|
// Persist BEFORE updating in-memory so a store failure leaves both
|
|
// in-memory and persisted at the prior state rather than diverging.
|
|
// If SaveAsync throws the in-memory _alarms entry stays unchanged and
|
|
// the exception propagates to the caller. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
|
_alarms[alarmId] = state with { Condition = result.State };
|
|
// Build the emission event under the gate (it captures a coherent
|
|
// snapshot of state + message-template values) but defer the actual
|
|
// OnEvent dispatch until after Release() so a slow subscriber or a
|
|
// subscriber that re-enters the engine doesn't block / deadlock.
|
|
// (Core.ScriptedAlarms-003)
|
|
if (result.Emission != EmissionKind.None)
|
|
pending = BuildEmission(state, result.State, result.Emission);
|
|
else if (result.NoOpReason is { } reason)
|
|
{
|
|
// The Part9StateMachine remarks promise a diagnostic log line for
|
|
// disabled-alarm no-ops + idempotent ack/confirm/shelve/unshelve
|
|
// calls. We surface them at debug so they're available when
|
|
// investigating "why didn't my ack take effect?" without spamming
|
|
// the main info log. (Core.ScriptedAlarms-011)
|
|
state.Logger.Debug("Alarm {AlarmId} no-op transition: {Reason}", alarmId, reason);
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
|
|
// OnEvent dispatch happens OUTSIDE _evalGate so subscribers can call back
|
|
// into the engine (e.g. AcknowledgeAsync from inside an Activated handler)
|
|
// without deadlocking against the non-reentrant SemaphoreSlim.
|
|
if (pending is not null) FireEvent(pending);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Upstream-change callback. Updates the value cache + enqueues predicate
|
|
/// re-evaluation for every alarm referencing the changed path. Fire-and-forget
|
|
/// so driver-side dispatch isn't blocked; the background task is tracked so
|
|
/// <see cref="Dispose"/> can drain it. (Core.ScriptedAlarms-006)
|
|
/// </summary>
|
|
/// <param name="path">The upstream tag path that changed.</param>
|
|
/// <param name="value">The new data value snapshot.</param>
|
|
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
|
{
|
|
_valueCache[path] = value;
|
|
if (_disposed) return; // don't queue new work against a disposing engine
|
|
if (_alarmsReferencing.TryGetValue(path, out var alarmIds))
|
|
{
|
|
TrackBackgroundTask(ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None));
|
|
}
|
|
}
|
|
|
|
private async Task ReevaluateAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
|
{
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
try
|
|
{
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// Re-check after acquiring the gate: a Dispose() call may have
|
|
// completed between our _evalGate.WaitAsync and here. Writing to a
|
|
// disposing store or mutating _alarms after clear is unsafe.
|
|
// (Core.ScriptedAlarms-005)
|
|
if (_disposed) return;
|
|
|
|
foreach (var id in alarmIds)
|
|
{
|
|
if (!_alarms.TryGetValue(id, out var state)) continue;
|
|
var newState = await EvaluatePredicateToStateAsync(
|
|
state, state.Condition, _clock(), ct, pending).ConfigureAwait(false);
|
|
if (!ReferenceEquals(newState, state.Condition))
|
|
{
|
|
// Persist before updating in-memory so a store failure leaves
|
|
// both sides at the prior state. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(newState, ct).ConfigureAwait(false);
|
|
_alarms[id] = state with { Condition = newState };
|
|
}
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Error(ex, "ScriptedAlarmEngine reevaluate failed");
|
|
return;
|
|
}
|
|
// Fire emissions OUTSIDE _evalGate so subscriber callbacks can re-enter
|
|
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Evaluate the predicate + apply the resulting state-machine transition.
|
|
/// Returns the new condition state. If the transition produces an emission,
|
|
/// appends it to <paramref name="pendingEmissions"/> so the caller can fire
|
|
/// them after releasing <c>_evalGate</c> — keeping subscriber callbacks
|
|
/// outside the gate. (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Every caller (LoadAsync and ReevaluateAsync) owns a <c>pending</c> list and
|
|
/// passes it here; emissions are always deferred to after the gate is released.
|
|
/// The parameter is required (non-nullable) to make this contract explicit and
|
|
/// prevent a future caller from accidentally firing events under the gate.
|
|
/// (Core.ScriptedAlarms-014)
|
|
/// </remarks>
|
|
private async Task<AlarmConditionState> EvaluatePredicateToStateAsync(
|
|
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct,
|
|
List<ScriptedAlarmEvent> pendingEmissions)
|
|
{
|
|
// Look up (or lazily allocate) the per-alarm scratch and refill its read cache
|
|
// in place. The dictionary + context survive across evaluations so the hot path
|
|
// no longer allocates per upstream tag change. (Core.ScriptedAlarms-009)
|
|
var scratch = _scratchByAlarmId.GetOrAdd(
|
|
state.Definition.AlarmId,
|
|
_ => new AlarmScratch(state.Inputs, state.Logger, _clock));
|
|
RefillReadCache(scratch.ReadCache, state.Inputs);
|
|
|
|
// Cold-start guard — skip the predicate when any referenced upstream tag has no
|
|
// cached value yet (the upstream subscription hasn't delivered its first push).
|
|
// Without this, predicates that cast `(double)ctx.GetTag(path).Value` throw NRE on
|
|
// every tick until the cache fills, spamming the log with identical stack traces.
|
|
// Bad quality is treated the same: the input isn't available at the predicate's
|
|
// expected type, so the only defensible move is to hold the prior condition state.
|
|
if (!AreInputsReady(scratch.ReadCache)) return seed;
|
|
|
|
var context = scratch.Context;
|
|
|
|
bool predicateTrue;
|
|
try
|
|
{
|
|
predicateTrue = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (ScriptTimeoutException tex)
|
|
{
|
|
state.Logger.Warning("Alarm predicate timed out after {Timeout} — state unchanged", tex.Timeout);
|
|
return seed;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
state.Logger.Error(ex, "Alarm predicate threw — state unchanged");
|
|
return seed;
|
|
}
|
|
|
|
var result = Part9StateMachine.ApplyPredicate(seed, predicateTrue, nowUtc);
|
|
if (result.Emission != EmissionKind.None)
|
|
{
|
|
var evt = BuildEmission(state, result.State, result.Emission);
|
|
if (evt is not null) pendingEmissions.Add(evt);
|
|
}
|
|
return result.State;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Refill <paramref name="cache"/> in place from <c>_valueCache</c>, falling
|
|
/// back to a synchronous <c>ITagUpstreamSource.ReadTag</c> for paths whose
|
|
/// first upstream push hasn't arrived yet. The dictionary is cleared and
|
|
/// repopulated under <c>_evalGate</c> so no concurrent reader can observe
|
|
/// a partial state. Replaces the old <c>BuildReadCache</c> which allocated a
|
|
/// fresh dictionary every call (Core.ScriptedAlarms-009).
|
|
/// </summary>
|
|
private void RefillReadCache(
|
|
Dictionary<string, DataValueSnapshot> cache, IReadOnlySet<string> inputs)
|
|
{
|
|
cache.Clear();
|
|
foreach (var p in inputs)
|
|
cache[p] = _valueCache.TryGetValue(p, out var v) ? v : _upstream.ReadTag(p);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true when every entry in <paramref name="cache"/> has a non-null value
|
|
/// and a Good-quality <see cref="DataValueSnapshot.StatusCode"/>. A false here lets
|
|
/// callers short-circuit script evaluation — predicates that unconditionally cast
|
|
/// <c>ctx.GetTag(path).Value</c> to a numeric type would otherwise throw
|
|
/// <see cref="NullReferenceException"/> until the upstream subscription delivers
|
|
/// its first push.
|
|
/// </summary>
|
|
private static bool AreInputsReady(IReadOnlyDictionary<string, DataValueSnapshot> cache)
|
|
{
|
|
foreach (var kv in cache)
|
|
{
|
|
if (kv.Value is null || kv.Value.Value is null) return false;
|
|
// OPC UA Part 4 StatusCode: bit 31 = severity 10 (Bad). Treat Good + Uncertain
|
|
// as "ready"; Uncertain carries a value the script can still inspect + make a
|
|
// qualified decision from. Only outright Bad is skipped.
|
|
if ((kv.Value.StatusCode & 0x80000000u) != 0) return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Build (but do not fire) the <see cref="ScriptedAlarmEvent"/> for a
|
|
/// transition. Returns null for kinds that should not be published
|
|
/// (<see cref="EmissionKind.Suppressed"/> and
|
|
/// <see cref="EmissionKind.None"/>). Pure construction — called under
|
|
/// <c>_evalGate</c> so the message-template resolution uses a coherent
|
|
/// value-cache snapshot. The actual <see cref="OnEvent"/> dispatch is
|
|
/// done by <see cref="FireEvent(ScriptedAlarmEvent)"/> AFTER the gate is
|
|
/// released. (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
private ScriptedAlarmEvent? BuildEmission(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
|
{
|
|
// Suppressed kind means shelving ate the emission — we don't fire for subscribers
|
|
// but the state record still advanced so startup recovery reflects reality.
|
|
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return null;
|
|
|
|
var message = MessageTemplate.Resolve(state.Definition.MessageTemplate, TryLookup);
|
|
return new ScriptedAlarmEvent(
|
|
AlarmId: state.Definition.AlarmId,
|
|
EquipmentPath: state.Definition.EquipmentPath,
|
|
AlarmName: state.Definition.AlarmName,
|
|
Kind: state.Definition.Kind,
|
|
Severity: state.Definition.Severity,
|
|
Message: message,
|
|
Condition: condition,
|
|
Emission: kind,
|
|
TimestampUtc: _clock(),
|
|
// Operator comment rides along on comment-bearing transitions — the condition
|
|
// state already carries it. Engine-driven transitions (Activated/Cleared/Shelved/…)
|
|
// and shelve ops (no comment param) leave it null.
|
|
Comment: kind switch
|
|
{
|
|
EmissionKind.Acknowledged => condition.LastAckComment,
|
|
EmissionKind.Confirmed => condition.LastConfirmComment,
|
|
EmissionKind.CommentAdded => condition.Comments.Count == 0 ? null : condition.Comments[^1].Text,
|
|
_ => null,
|
|
},
|
|
// Carry the per-alarm durable-historization opt-out through to subscribers. The historian
|
|
// adapter honors it to suppress ONLY the durable sink write; the live alerts fan-out is
|
|
// unaffected (it is not gated on this flag).
|
|
HistorizeToAveva: state.Definition.HistorizeToAveva);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invoke the <see cref="OnEvent"/> handler for a built emission. Must be
|
|
/// called OUTSIDE <c>_evalGate</c>: a slow subscriber would otherwise
|
|
/// block the gate for every other engine operation, and a subscriber
|
|
/// that re-enters the engine (e.g. calls AcknowledgeAsync) would
|
|
/// deadlock against the non-reentrant SemaphoreSlim.
|
|
/// (Core.ScriptedAlarms-003)
|
|
/// </summary>
|
|
private void FireEvent(ScriptedAlarmEvent evt)
|
|
{
|
|
try { OnEvent?.Invoke(this, evt); }
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", evt.AlarmId);
|
|
}
|
|
}
|
|
|
|
private DataValueSnapshot? TryLookup(string path)
|
|
=> _valueCache.TryGetValue(path, out var v) ? v : null;
|
|
|
|
private void RunShelvingCheck()
|
|
{
|
|
if (_disposed) return;
|
|
var ids = _alarms.Keys.ToArray();
|
|
TrackBackgroundTask(ShelvingCheckAsync(ids, CancellationToken.None));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Register a fire-and-forget task so <see cref="Dispose"/> can await it.
|
|
/// The task removes itself from the set on completion via a continuation.
|
|
/// (Core.ScriptedAlarms-006)
|
|
/// </summary>
|
|
private void TrackBackgroundTask(Task task)
|
|
{
|
|
lock (_inFlightLock) { _inFlight.Add(task); }
|
|
// Use ContinueWith with ExecuteSynchronously so the removal runs on the
|
|
// completing thread — avoids scheduler delay between completion and
|
|
// unregistration that would otherwise let Dispose see a stale set.
|
|
task.ContinueWith(t =>
|
|
{
|
|
lock (_inFlightLock) { _inFlight.Remove(t); }
|
|
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Test hook — triggers a shelving check synchronously without waiting for
|
|
/// the 5-second timer. Allows tests that inject a controllable clock to advance
|
|
/// time and immediately drive timed-shelve expiry. (Core.ScriptedAlarms-012)
|
|
/// </summary>
|
|
internal void RunShelvingCheckForTest() => RunShelvingCheck();
|
|
|
|
private async Task ShelvingCheckAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
|
{
|
|
var pending = new List<ScriptedAlarmEvent>(0);
|
|
try
|
|
{
|
|
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// Re-check after acquiring the gate: Timer.Dispose() does not wait for
|
|
// running callbacks, so a shelving-check callback that passed the _disposed
|
|
// check in RunShelvingCheck can arrive here after Dispose() has returned.
|
|
// Mutating _alarms or saving to a disposed store here is unsafe.
|
|
// (Core.ScriptedAlarms-005)
|
|
if (_disposed) return;
|
|
|
|
var now = _clock();
|
|
foreach (var id in alarmIds)
|
|
{
|
|
if (!_alarms.TryGetValue(id, out var state)) continue;
|
|
var result = Part9StateMachine.ApplyShelvingCheck(state.Condition, now);
|
|
if (!ReferenceEquals(result.State, state.Condition))
|
|
{
|
|
// Persist before updating in-memory so a store failure leaves
|
|
// both sides at the prior state. (Core.ScriptedAlarms-007)
|
|
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
|
_alarms[id] = state with { Condition = result.State };
|
|
if (result.Emission != EmissionKind.None)
|
|
{
|
|
var evt = BuildEmission(state, result.State, result.Emission);
|
|
if (evt is not null) pending.Add(evt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
finally { _evalGate.Release(); }
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_engineLogger.Warning(ex, "ScriptedAlarmEngine shelving-check failed");
|
|
return;
|
|
}
|
|
// Fire emissions OUTSIDE _evalGate. (Core.ScriptedAlarms-003)
|
|
foreach (var evt in pending) FireEvent(evt);
|
|
}
|
|
|
|
private void UnsubscribeFromUpstream()
|
|
{
|
|
foreach (var s in _upstreamSubscriptions)
|
|
{
|
|
try { s.Dispose(); } catch { }
|
|
}
|
|
_upstreamSubscriptions.Clear();
|
|
}
|
|
|
|
private void EnsureLoaded()
|
|
{
|
|
if (!_loaded) throw new InvalidOperationException(
|
|
"ScriptedAlarmEngine not loaded. Call LoadAsync first.");
|
|
}
|
|
|
|
/// <summary>Disposes the engine, cleaning up resources and waiting for in-flight background tasks.</summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_shelvingTimer?.Dispose();
|
|
UnsubscribeFromUpstream();
|
|
|
|
// Drain any fire-and-forget background work (ReevaluateAsync from
|
|
// OnUpstreamChange + ShelvingCheckAsync from the 5s timer) that started
|
|
// before _disposed = true was visible. Without this, a SaveAsync in
|
|
// flight can outlive the engine and write to a (possibly disposed) store
|
|
// after Dispose() has returned. The tasks re-check _disposed after
|
|
// acquiring the gate and bail out, but the await still has to complete.
|
|
// (Core.ScriptedAlarms-006)
|
|
Task[] toAwait;
|
|
lock (_inFlightLock) { toAwait = [.. _inFlight]; }
|
|
if (toAwait.Length > 0)
|
|
{
|
|
try { Task.WhenAll(toAwait).GetAwaiter().GetResult(); }
|
|
catch (Exception ex)
|
|
{
|
|
// Background task failures already logged inside ReevaluateAsync /
|
|
// ShelvingCheckAsync; surface here at debug so a parent shutdown is
|
|
// not noisy. The key invariant is that the tasks have COMPLETED.
|
|
_engineLogger.Debug(ex, "ScriptedAlarmEngine background task threw during shutdown drain");
|
|
}
|
|
}
|
|
|
|
// Safe to clear here: the Task.WhenAll drain above guaranteed no
|
|
// ReevaluateAsync / ShelvingCheckAsync is mid-flight, and _disposed=true
|
|
// prevents new background work from being queued (OnUpstreamChange bails on
|
|
// line 334). Pre-Core.Scripting-016 the comment said "Do NOT clear _alarms",
|
|
// but that was when the engine called ScriptEvaluator.Compile directly and
|
|
// held the script ALCs through _alarms→AlarmState→TimedScriptEvaluator
|
|
// forever — leaving them rooted defeated the -008 collectible-ALC unload.
|
|
// Clearing now drops the delegate references so the cache's Dispose call
|
|
// below can actually unload the emitted assemblies. (Core.ScriptedAlarms-005
|
|
// re-evaluated under -016.)
|
|
_alarms.Clear();
|
|
_alarmsReferencing.Clear();
|
|
_scratchByAlarmId.Clear();
|
|
// Dispose every compiled-predicate ALC so the engine's shutdown actually
|
|
// releases the emitted assemblies. The drain above ensures no evaluator is
|
|
// mid-call; CompiledScriptCache.Dispose internally guards against use-after-
|
|
// dispose. (Core.Scripting-016)
|
|
_compileCache.Dispose();
|
|
}
|
|
|
|
private sealed record AlarmState(
|
|
ScriptedAlarmDefinition Definition,
|
|
TimedScriptEvaluator<AlarmPredicateContext, bool> Evaluator,
|
|
IReadOnlySet<string> Inputs,
|
|
IReadOnlyList<string> TemplateTokens,
|
|
ILogger Logger,
|
|
AlarmConditionState Condition);
|
|
|
|
/// <summary>
|
|
/// Per-alarm reusable evaluation scratch. The <see cref="ReadCache"/> dictionary
|
|
/// is the same instance across every evaluation of the owning alarm — it is
|
|
/// cleared and refilled in <see cref="ScriptedAlarmEngine.RefillReadCache"/> on
|
|
/// each call. <see cref="Context"/> wraps that dictionary by reference, so a
|
|
/// refilled <see cref="ReadCache"/> is what the predicate's
|
|
/// <c>ctx.GetTag(path)</c> calls observe. (Core.ScriptedAlarms-009)
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Reuse is safe because <see cref="ScriptedAlarmEngine"/> serialises every
|
|
/// evaluation under <c>_evalGate</c>: two threads can never observe the same
|
|
/// scratch in a half-refilled state.
|
|
/// </remarks>
|
|
private sealed class AlarmScratch
|
|
{
|
|
/// <summary>Gets the read cache dictionary containing current upstream tag values.</summary>
|
|
public Dictionary<string, DataValueSnapshot> ReadCache { get; }
|
|
/// <summary>Gets the predicate evaluation context.</summary>
|
|
public AlarmPredicateContext Context { get; }
|
|
|
|
/// <summary>
|
|
/// Initializes a new AlarmScratch with the specified inputs, logger, and clock.
|
|
/// </summary>
|
|
/// <param name="inputs">The set of input tag paths this alarm references.</param>
|
|
/// <param name="logger">The logger for this alarm's diagnostics.</param>
|
|
/// <param name="clock">Function providing the current UTC time.</param>
|
|
public AlarmScratch(IReadOnlySet<string> inputs, ILogger logger, Func<DateTime> clock)
|
|
{
|
|
// Pre-size to the expected input count so the first refill doesn't pay the
|
|
// dictionary-grow cost. The dictionary auto-grows if Inputs changes (it
|
|
// cannot under the current contract — Inputs is fixed at LoadAsync — but
|
|
// pre-sizing is defensive against future changes).
|
|
ReadCache = new Dictionary<string, DataValueSnapshot>(inputs.Count, StringComparer.Ordinal);
|
|
// Context holds the read cache by reference. Refilling the dictionary
|
|
// updates what the context (and the script) observes.
|
|
Context = new AlarmPredicateContext(ReadCache, logger, clock);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// One alarm emission the engine pushed to subscribers. Carries everything
|
|
/// downstream consumers (OPC UA alarm-source adapter + historian sink) need to
|
|
/// publish the event without re-querying the engine.
|
|
/// </summary>
|
|
public sealed record ScriptedAlarmEvent(
|
|
string AlarmId,
|
|
string EquipmentPath,
|
|
string AlarmName,
|
|
AlarmKind Kind,
|
|
AlarmSeverity Severity,
|
|
string Message,
|
|
AlarmConditionState Condition,
|
|
EmissionKind Emission,
|
|
DateTime TimestampUtc,
|
|
string? Comment = null,
|
|
bool HistorizeToAveva = true);
|
|
|
|
/// <summary>
|
|
/// Upstream source abstraction — intentionally identical shape to the virtual-tag
|
|
/// engine's so Stream G can compose them behind one driver bridge.
|
|
/// </summary>
|
|
public interface ITagUpstreamSource
|
|
{
|
|
/// <summary>Reads a tag value synchronously.</summary>
|
|
/// <param name="path">The tag path to read.</param>
|
|
/// <returns>A data value snapshot containing the tag value and status.</returns>
|
|
DataValueSnapshot ReadTag(string path);
|
|
|
|
/// <summary>Subscribes to upstream tag changes.</summary>
|
|
/// <param name="path">The tag path to observe.</param>
|
|
/// <param name="observer">Callback invoked when the tag value changes.</param>
|
|
/// <returns>A subscription handle that removes the observer when disposed.</returns>
|
|
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
|
|
}
|