Phase 7 Stream C — Core.ScriptedAlarms project (Part 9 state machine + predicate engine + IAlarmSource adapter)
Ships the Part 9 alarm fidelity layer Phase 7 committed to in plan decision #5. Every scripted alarm gets a full OPC UA AlarmConditionType state machine — EnabledState, ActiveState, AckedState, ConfirmedState, ShelvingState — with persistent operator-supplied state across server restarts per Phase 7 plan decision #14. Runtime shape matches the Galaxy-native + AB CIP ALMD alarm sources: scripted alarms fan out through the existing IAlarmSource surface so Phase 6.1 AlarmTracker composition consumes them without per-source branching. Part9StateMachine is a pure-functions module — no instance state, no I/O, no mutation. Every transition (ApplyPredicate, ApplyAcknowledge, ApplyConfirm, ApplyOneShotShelve, ApplyTimedShelve, ApplyUnshelve, ApplyEnable, ApplyDisable, ApplyAddComment, ApplyShelvingCheck) takes the current AlarmConditionState record plus the event and returns a fresh state + EmissionKind hint. Two structural invariants enforced: disabled alarms never transition ActiveState / AckedState / ConfirmedState; shelved alarms still advance state (so startup recovery reflects reality) but emit a Suppressed hint so subscribers do not see the transition. OneShot shelving expires on clear; Timed shelving expires via ApplyShelvingCheck against the UnshelveAtUtc timestamp. Comments are append-only — every acknowledge, confirm, shelve, unshelve, enable, disable, explicit add-comment, and auto-unshelve appends an AlarmComment record with user identity + timestamp + kind + text for the GxP / 21 CFR Part 11 audit surface. AlarmConditionState is the persistent record the store saves. Fields: AlarmId, Enabled, Active, Acked, Confirmed, Shelving (kind + UnshelveAtUtc), LastTransitionUtc, LastActiveUtc, LastClearedUtc, LastAckUtc + LastAckUser + LastAckComment, LastConfirmUtc + LastConfirmUser + LastConfirmComment, Comments. Fresh factory initializes everything to the no-event position. IAlarmStateStore is the persistence abstraction — LoadAsync, LoadAllAsync, SaveAsync, RemoveAsync. Stream E wires this to a SQL-backed store with IAuditLogger hooks; tests use InMemoryAlarmStateStore. Startup recovery per Phase 7 plan decision #14: LoadAsync runs every configured alarm predicate against current tag values to rederive ActiveState, but EnabledState / AckedState / ConfirmedState / ShelvingState + audit history are loaded verbatim from the store so operators do not re-ack after an outage and shelved alarms stay shelved through maintenance windows. MessageTemplate implements Phase 7 plan decision #13 — static-with-substitution. {TagPath} tokens resolved at event emission time from the engine value cache. Missing paths, non-Good quality, or null values all resolve to {?} so the event still fires but the operator sees where the reference broke. ExtractTokenPaths enumerates tokens at publish time so the engine knows to subscribe to every template-referenced tag in addition to predicate-referenced tags. AlarmPredicateContext is the ScriptContext subclass alarm scripts see. GetTag reads from the engine shared cache; SetVirtualTag is explicitly rejected at runtime with a pointed error message — alarm predicates must be pure so their output does not couple to virtual-tag state in ways that become impossible to reason about. If cross-tag side effects are needed, the operator authors a virtual tag and the alarm predicate reads it. ScriptedAlarmEngine orchestrates. LoadAsync compiles every predicate through Stream A ScriptSandbox + ForbiddenTypeAnalyzer, runs DependencyExtractor to find the read set, adds template token paths to the input set, reports every compile failure as one aggregated InvalidOperationException (not one-at-a-time), subscribes to each unique referenced upstream path, seeds the value cache, loads persisted state for each alarm (falling back to Fresh for first-load), re-evaluates the predicate, and saves the recovered state. ChangeTrigger — when an upstream tag changes, look up every alarm referencing that path in a per-path inverse index, enqueue all of them for re-evaluation via a SemaphoreSlim-gated path. Unlike the virtual-tag engine, scripted alarms are leaves in the evaluation DAG (no alarm drives another alarm), so no topological sort is needed. Operator actions (AcknowledgeAsync, ConfirmAsync, OneShotShelveAsync, TimedShelveAsync, UnshelveAsync, EnableAsync, DisableAsync, AddCommentAsync) route through the state machine, persist, and emit if there is an emission. A 5-second shelving-check timer auto-expires Timed shelving and emits Unshelved events at the right moment. Predicate evaluation errors (script throws, timeout, compile-time reads bad tag) leave the state unchanged — the engine does NOT invent a clear transition on predicate failure. Logged as scripts-*.log Error; companion WARN in main log. ScriptedAlarmSource implements IAlarmSource. SubscribeAlarmsAsync filter is a set of equipment-path prefixes; empty means all. AcknowledgeAsync from the base interface routes to the engine with user identity "opcua-client" — Stream G will replace this with the authenticated principal from the OPC UA dispatch layer. The adapter implements only the base IAlarmSource methods; richer Part 9 methods (Confirm, Shelve, Unshelve, AddComment) remain on the engine and will bind to OPC UA method nodes in Stream G. 47 unit tests across 5 files. Part9StateMachineTests (16) — every transition + noop edge cases: predicate true/false, same-state noop, disabled ignores predicate, acknowledge records user/comment/adds audit, idempotent acknowledge, reject no-user ack, full activate-ack-clear-confirm walk, one-shot shelve suppresses next activation, one-shot expires on clear, timed shelve requires future unshelve time, timed shelve expires via shelving-check, explicit unshelve emits, add-comment appends to audit, comments append-only through multiple operations, full lifecycle walk emits every expected EmissionKind. MessageTemplateTests (11) — no-token passthrough, single+multiple token substitution, bad quality becomes {?}, unknown path becomes {?}, null value becomes {?}, tokens with slashes+dots, empty + null template, ExtractTokenPaths returns every distinct path, whitespace inside tokens trimmed. ScriptedAlarmEngineTests (13) — load compiles+subscribes, compile failures aggregated, upstream change emits Activated, clearing emits Cleared, message template resolves at emission, ack persists to store, startup recovery preserves ack but rederives active, shelved activation state-advances but suppresses emission, runtime exception isolates to owning alarm, disable prevents activation until re-enable, AddComment appends audit without state change, SetVirtualTag from predicate rejected (state unchanged), Dispose releases upstream subscriptions. ScriptedAlarmSourceTests (5) — empty filter matches all, equipment-prefix filter, Unsubscribe stops events, AcknowledgeAsync routes with default user, null arguments rejected. FakeUpstream fixture gives tests an in-memory driver mock with subscription count tracking. Full Phase 7 test count after Stream C: 146 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms). Stream D (historian alarm sink with SQLite store-and-forward + Galaxy.Host IPC) consumes ScriptedAlarmEvent + similar Galaxy / AB CIP emissions to produce the unified alarm timeline. Stream G wires the OPC UA method calls and AlarmSource into DriverNodeManager dispatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
429
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmEngine.cs
Normal file
429
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmEngine.cs
Normal file
@@ -0,0 +1,429 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 scripted-alarm orchestrator. Compiles every configured alarm's predicate
|
||||
/// against the Stream A sandbox, subscribes to the referenced upstream tags,
|
||||
/// re-evaluates the predicate on every input change + on a shelving-check timer,
|
||||
/// applies the resulting transition through <see cref="Part9StateMachine"/>,
|
||||
/// persists state via <see cref="IAlarmStateStore"/>, and emits the resulting events
|
||||
/// through <see cref="ScriptedAlarmSource"/> (which wires into the existing
|
||||
/// <c>IAlarmSource</c> fan-out).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Scripted alarms are leaves in the evaluation DAG — no alarm's state drives
|
||||
/// another alarm's predicate. The engine maintains only an inverse index from
|
||||
/// upstream tag path → alarms referencing it; no topological sort needed
|
||||
/// (unlike the virtual-tag engine).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Evaluation errors (script throws, timeout, coercion fail) surface as
|
||||
/// structured errors in the dedicated scripts-*.log sink plus a WARN companion
|
||||
/// in the main log. The alarm's ActiveState stays at its prior value — the
|
||||
/// engine does NOT invent a clear transition just because the predicate broke.
|
||||
/// Operators investigating a broken predicate shouldn't see a phantom
|
||||
/// clear-event preceding the failure.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptedAlarmEngine : IDisposable
|
||||
{
|
||||
private readonly ITagUpstreamSource _upstream;
|
||||
private readonly IAlarmStateStore _store;
|
||||
private readonly ScriptLoggerFactory _loggerFactory;
|
||||
private readonly ILogger _engineLogger;
|
||||
private readonly Func<DateTime> _clock;
|
||||
private readonly TimeSpan _scriptTimeout;
|
||||
|
||||
private readonly Dictionary<string, AlarmState> _alarms = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache
|
||||
= new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, HashSet<string>> _alarmsReferencing
|
||||
= new(StringComparer.Ordinal); // tag path -> alarm ids
|
||||
|
||||
private readonly List<IDisposable> _upstreamSubscriptions = [];
|
||||
private readonly SemaphoreSlim _evalGate = new(1, 1);
|
||||
private Timer? _shelvingTimer;
|
||||
private bool _loaded;
|
||||
private bool _disposed;
|
||||
|
||||
public ScriptedAlarmEngine(
|
||||
ITagUpstreamSource upstream,
|
||||
IAlarmStateStore store,
|
||||
ScriptLoggerFactory loggerFactory,
|
||||
ILogger engineLogger,
|
||||
Func<DateTime>? clock = null,
|
||||
TimeSpan? scriptTimeout = null)
|
||||
{
|
||||
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
|
||||
_store = store ?? throw new ArgumentNullException(nameof(store));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<AlarmPredicateContext, bool>.DefaultTimeout;
|
||||
}
|
||||
|
||||
/// <summary>Raised for every emission the Part9StateMachine produces that the engine should publish.</summary>
|
||||
public event EventHandler<ScriptedAlarmEvent>? OnEvent;
|
||||
|
||||
public IReadOnlyCollection<string> LoadedAlarmIds => _alarms.Keys;
|
||||
|
||||
/// <summary>
|
||||
/// Load a batch of alarm definitions. Compiles every predicate, aggregates any
|
||||
/// compile failures into one <see cref="InvalidOperationException"/>, subscribes
|
||||
/// to upstream input tags, seeds the value cache, loads persisted state from
|
||||
/// the store (falling back to Fresh for first-load alarms), and recomputes
|
||||
/// ActiveState per Phase 7 plan decision #14 (startup recovery).
|
||||
/// </summary>
|
||||
public async Task LoadAsync(IReadOnlyList<ScriptedAlarmDefinition> definitions, CancellationToken ct)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(ScriptedAlarmEngine));
|
||||
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
||||
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
UnsubscribeFromUpstream();
|
||||
_alarms.Clear();
|
||||
_alarmsReferencing.Clear();
|
||||
|
||||
var compileFailures = new List<string>();
|
||||
foreach (var def in definitions)
|
||||
{
|
||||
try
|
||||
{
|
||||
var extraction = DependencyExtractor.Extract(def.PredicateScriptSource);
|
||||
if (!extraction.IsValid)
|
||||
{
|
||||
var joined = string.Join("; ", extraction.Rejections.Select(r => r.Message));
|
||||
compileFailures.Add($"{def.AlarmId}: dependency extraction rejected — {joined}");
|
||||
continue;
|
||||
}
|
||||
|
||||
var evaluator = ScriptEvaluator<AlarmPredicateContext, bool>.Compile(def.PredicateScriptSource);
|
||||
var timed = new TimedScriptEvaluator<AlarmPredicateContext, bool>(evaluator, _scriptTimeout);
|
||||
var logger = _loggerFactory.Create(def.AlarmId);
|
||||
|
||||
var templateTokens = MessageTemplate.ExtractTokenPaths(def.MessageTemplate);
|
||||
var allInputs = new HashSet<string>(extraction.Reads, StringComparer.Ordinal);
|
||||
foreach (var t in templateTokens) allInputs.Add(t);
|
||||
|
||||
_alarms[def.AlarmId] = new AlarmState(def, timed, extraction.Reads, templateTokens, logger,
|
||||
AlarmConditionState.Fresh(def.AlarmId, _clock()));
|
||||
|
||||
foreach (var path in allInputs)
|
||||
{
|
||||
if (!_alarmsReferencing.TryGetValue(path, out var set))
|
||||
_alarmsReferencing[path] = set = new HashSet<string>(StringComparer.Ordinal);
|
||||
set.Add(def.AlarmId);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
compileFailures.Add($"{def.AlarmId}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
if (compileFailures.Count > 0)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"ScriptedAlarmEngine load failed. {compileFailures.Count} alarm(s) did not compile:\n "
|
||||
+ string.Join("\n ", compileFailures));
|
||||
}
|
||||
|
||||
// Seed the value cache with current upstream values + subscribe for changes.
|
||||
foreach (var path in _alarmsReferencing.Keys)
|
||||
{
|
||||
_valueCache[path] = _upstream.ReadTag(path);
|
||||
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
|
||||
}
|
||||
|
||||
// Restore persisted state, falling back to Fresh where nothing was saved,
|
||||
// then re-derive ActiveState from the current predicate per decision #14.
|
||||
foreach (var (alarmId, state) in _alarms)
|
||||
{
|
||||
var persisted = await _store.LoadAsync(alarmId, ct).ConfigureAwait(false);
|
||||
var seed = persisted ?? state.Condition;
|
||||
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct)
|
||||
.ConfigureAwait(false);
|
||||
_alarms[alarmId] = state with { Condition = afterPredicate };
|
||||
await _store.SaveAsync(afterPredicate, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_loaded = true;
|
||||
_engineLogger.Information("ScriptedAlarmEngine loaded {Count} alarm(s)", _alarms.Count);
|
||||
|
||||
// Start the shelving-check timer — ticks every 5s, expires any timed shelves
|
||||
// that have passed their UnshelveAtUtc.
|
||||
_shelvingTimer = new Timer(_ => RunShelvingCheck(),
|
||||
null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_evalGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Current persisted state for <paramref name="alarmId"/>. Returns null for
|
||||
/// unknown alarm. Mainly used for diagnostics + the Admin UI status page.
|
||||
/// </summary>
|
||||
public AlarmConditionState? GetState(string alarmId)
|
||||
=> _alarms.TryGetValue(alarmId, out var s) ? s.Condition : null;
|
||||
|
||||
public IReadOnlyCollection<AlarmConditionState> GetAllStates()
|
||||
=> _alarms.Values.Select(a => a.Condition).ToArray();
|
||||
|
||||
public Task AcknowledgeAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAcknowledge(cur, user, comment, _clock()));
|
||||
|
||||
public Task ConfirmAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyConfirm(cur, user, comment, _clock()));
|
||||
|
||||
public Task OneShotShelveAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyOneShotShelve(cur, user, _clock()));
|
||||
|
||||
public Task TimedShelveAsync(string alarmId, string user, DateTime unshelveAtUtc, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyTimedShelve(cur, user, unshelveAtUtc, _clock()));
|
||||
|
||||
public Task UnshelveAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyUnshelve(cur, user, _clock()));
|
||||
|
||||
public Task EnableAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyEnable(cur, user, _clock()));
|
||||
|
||||
public Task DisableAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyDisable(cur, user, _clock()));
|
||||
|
||||
public Task AddCommentAsync(string alarmId, string user, string text, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAddComment(cur, user, text, _clock()));
|
||||
|
||||
private async Task ApplyAsync(string alarmId, CancellationToken ct, Func<AlarmConditionState, TransitionResult> op)
|
||||
{
|
||||
EnsureLoaded();
|
||||
if (!_alarms.TryGetValue(alarmId, out var state))
|
||||
throw new ArgumentException($"Unknown alarm {alarmId}", nameof(alarmId));
|
||||
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var result = op(state.Condition);
|
||||
_alarms[alarmId] = state with { Condition = result.State };
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
if (result.Emission != EmissionKind.None) EmitEvent(state, result.State, result.Emission);
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Upstream-change callback. Updates the value cache + enqueues predicate
|
||||
/// re-evaluation for every alarm referencing the changed path. Fire-and-forget
|
||||
/// so driver-side dispatch isn't blocked.
|
||||
/// </summary>
|
||||
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
||||
{
|
||||
_valueCache[path] = value;
|
||||
if (_alarmsReferencing.TryGetValue(path, out var alarmIds))
|
||||
{
|
||||
_ = ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReevaluateAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
foreach (var id in alarmIds)
|
||||
{
|
||||
if (!_alarms.TryGetValue(id, out var state)) continue;
|
||||
var newState = await EvaluatePredicateToStateAsync(
|
||||
state, state.Condition, _clock(), ct).ConfigureAwait(false);
|
||||
if (!ReferenceEquals(newState, state.Condition))
|
||||
{
|
||||
_alarms[id] = state with { Condition = newState };
|
||||
await _store.SaveAsync(newState, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Error(ex, "ScriptedAlarmEngine reevaluate failed");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Evaluate the predicate + apply the resulting state-machine transition.
|
||||
/// Returns the new condition state. Emits the appropriate event if the
|
||||
/// transition produces one.
|
||||
/// </summary>
|
||||
private async Task<AlarmConditionState> EvaluatePredicateToStateAsync(
|
||||
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct)
|
||||
{
|
||||
var inputs = BuildReadCache(state.Inputs);
|
||||
var context = new AlarmPredicateContext(inputs, state.Logger, _clock);
|
||||
|
||||
bool predicateTrue;
|
||||
try
|
||||
{
|
||||
predicateTrue = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (ScriptTimeoutException tex)
|
||||
{
|
||||
state.Logger.Warning("Alarm predicate timed out after {Timeout} — state unchanged", tex.Timeout);
|
||||
return seed;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
state.Logger.Error(ex, "Alarm predicate threw — state unchanged");
|
||||
return seed;
|
||||
}
|
||||
|
||||
var result = Part9StateMachine.ApplyPredicate(seed, predicateTrue, nowUtc);
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
return result.State;
|
||||
}
|
||||
|
||||
private IReadOnlyDictionary<string, DataValueSnapshot> BuildReadCache(IReadOnlySet<string> inputs)
|
||||
{
|
||||
var d = new Dictionary<string, DataValueSnapshot>(StringComparer.Ordinal);
|
||||
foreach (var p in inputs)
|
||||
d[p] = _valueCache.TryGetValue(p, out var v) ? v : _upstream.ReadTag(p);
|
||||
return d;
|
||||
}
|
||||
|
||||
private void EmitEvent(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
||||
{
|
||||
// Suppressed kind means shelving ate the emission — we don't fire for subscribers
|
||||
// but the state record still advanced so startup recovery reflects reality.
|
||||
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return;
|
||||
|
||||
var message = MessageTemplate.Resolve(state.Definition.MessageTemplate, TryLookup);
|
||||
var evt = new ScriptedAlarmEvent(
|
||||
AlarmId: state.Definition.AlarmId,
|
||||
EquipmentPath: state.Definition.EquipmentPath,
|
||||
AlarmName: state.Definition.AlarmName,
|
||||
Kind: state.Definition.Kind,
|
||||
Severity: state.Definition.Severity,
|
||||
Message: message,
|
||||
Condition: condition,
|
||||
Emission: kind,
|
||||
TimestampUtc: _clock());
|
||||
try { OnEvent?.Invoke(this, evt); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", state.Definition.AlarmId);
|
||||
}
|
||||
}
|
||||
|
||||
private DataValueSnapshot? TryLookup(string path)
|
||||
=> _valueCache.TryGetValue(path, out var v) ? v : null;
|
||||
|
||||
private void RunShelvingCheck()
|
||||
{
|
||||
if (_disposed) return;
|
||||
var ids = _alarms.Keys.ToArray();
|
||||
_ = ShelvingCheckAsync(ids, CancellationToken.None);
|
||||
}
|
||||
|
||||
private async Task ShelvingCheckAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var now = _clock();
|
||||
foreach (var id in alarmIds)
|
||||
{
|
||||
if (!_alarms.TryGetValue(id, out var state)) continue;
|
||||
var result = Part9StateMachine.ApplyShelvingCheck(state.Condition, now);
|
||||
if (!ReferenceEquals(result.State, state.Condition))
|
||||
{
|
||||
_alarms[id] = state with { Condition = result.State };
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine shelving-check failed");
|
||||
}
|
||||
}
|
||||
|
||||
private void UnsubscribeFromUpstream()
|
||||
{
|
||||
foreach (var s in _upstreamSubscriptions)
|
||||
{
|
||||
try { s.Dispose(); } catch { }
|
||||
}
|
||||
_upstreamSubscriptions.Clear();
|
||||
}
|
||||
|
||||
private void EnsureLoaded()
|
||||
{
|
||||
if (!_loaded) throw new InvalidOperationException(
|
||||
"ScriptedAlarmEngine not loaded. Call LoadAsync first.");
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_shelvingTimer?.Dispose();
|
||||
UnsubscribeFromUpstream();
|
||||
_alarms.Clear();
|
||||
_alarmsReferencing.Clear();
|
||||
}
|
||||
|
||||
private sealed record AlarmState(
|
||||
ScriptedAlarmDefinition Definition,
|
||||
TimedScriptEvaluator<AlarmPredicateContext, bool> Evaluator,
|
||||
IReadOnlySet<string> Inputs,
|
||||
IReadOnlyList<string> TemplateTokens,
|
||||
ILogger Logger,
|
||||
AlarmConditionState Condition);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One alarm emission the engine pushed to subscribers. Carries everything
|
||||
/// downstream consumers (OPC UA alarm-source adapter + historian sink) need to
|
||||
/// publish the event without re-querying the engine.
|
||||
/// </summary>
|
||||
public sealed record ScriptedAlarmEvent(
|
||||
string AlarmId,
|
||||
string EquipmentPath,
|
||||
string AlarmName,
|
||||
AlarmKind Kind,
|
||||
AlarmSeverity Severity,
|
||||
string Message,
|
||||
AlarmConditionState Condition,
|
||||
EmissionKind Emission,
|
||||
DateTime TimestampUtc);
|
||||
|
||||
/// <summary>
|
||||
/// Upstream source abstraction — intentionally identical shape to the virtual-tag
|
||||
/// engine's so Stream G can compose them behind one driver bridge.
|
||||
/// </summary>
|
||||
public interface ITagUpstreamSource
|
||||
{
|
||||
DataValueSnapshot ReadTag(string path);
|
||||
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
|
||||
}
|
||||
Reference in New Issue
Block a user