fix(core-scripted-alarms): resolve Low code-review findings (Core.ScriptedAlarms-003,006,008,010,011; -009 documented)
- Core.ScriptedAlarms-003: emit OnEvent OUTSIDE _evalGate by collecting
pending emissions during the gate-held section and flushing them after
release; eliminates re-entrancy deadlock the docs already promised.
- Core.ScriptedAlarms-006: track every fire-and-forget Reevaluate /
ShelvingCheck task in _inFlight; Dispose drains the set so the engine
no longer races store writes against teardown.
- Core.ScriptedAlarms-008: store comments as ImmutableList<AlarmComment>
so AppendComment is O(log n) instead of O(n).
- Core.ScriptedAlarms-010: document the deliberate input-quality
asymmetry (Uncertain drives the predicate, renders {?} in the message)
in docs/ScriptedAlarms.md and on MessageTemplate.Resolve remarks.
- Core.ScriptedAlarms-011: propagate the no-op reason through
TransitionResult.NoOp(state, reason) and log it from
ScriptedAlarmEngine.ApplyAsync.
- Core.ScriptedAlarms-009 (Won't Fix per recommendation): documented the
per-evaluation dictionary allocation in docs/v2/Galaxy.Performance.md
with a mitigation path if a future soak surfaces pressure.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -59,6 +59,15 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
private bool _loaded;
|
||||
private bool _disposed;
|
||||
|
||||
// Tracks fire-and-forget background work launched by OnUpstreamChange
|
||||
// (ReevaluateAsync) and RunShelvingCheck (ShelvingCheckAsync). Dispose drains
|
||||
// these so a re-evaluation in flight when shutdown begins finishes its
|
||||
// SaveAsync before the engine returns control to the caller. The HashSet is
|
||||
// accessed under its own lock — never under _evalGate — so registration /
|
||||
// unregistration cannot deadlock against the gate. (Core.ScriptedAlarms-006)
|
||||
private readonly HashSet<Task> _inFlight = [];
|
||||
private readonly object _inFlightLock = new();
|
||||
|
||||
public ScriptedAlarmEngine(
|
||||
ITagUpstreamSource upstream,
|
||||
IAlarmStateStore store,
|
||||
@@ -92,6 +101,7 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(ScriptedAlarmEngine));
|
||||
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
||||
|
||||
var pending = new List<ScriptedAlarmEvent>(0);
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
@@ -157,11 +167,14 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
|
||||
// Restore persisted state, falling back to Fresh where nothing was saved,
|
||||
// then re-derive ActiveState from the current predicate per decision #14.
|
||||
// Any predicate emissions queue into `pending` and fire after the gate
|
||||
// is released — so a startup-recovery activation event can call back into
|
||||
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
||||
foreach (var (alarmId, state) in _alarms)
|
||||
{
|
||||
var persisted = await _store.LoadAsync(alarmId, ct).ConfigureAwait(false);
|
||||
var seed = persisted ?? state.Condition;
|
||||
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct)
|
||||
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct, pending)
|
||||
.ConfigureAwait(false);
|
||||
_alarms[alarmId] = state with { Condition = afterPredicate };
|
||||
await _store.SaveAsync(afterPredicate, ct).ConfigureAwait(false);
|
||||
@@ -192,6 +205,10 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
{
|
||||
_evalGate.Release();
|
||||
}
|
||||
|
||||
// Fire any emissions collected during startup recovery OUTSIDE the gate so
|
||||
// subscribers can re-enter the engine safely. (Core.ScriptedAlarms-003)
|
||||
foreach (var evt in pending) FireEvent(evt);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -234,6 +251,7 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
if (!_alarms.TryGetValue(alarmId, out var state))
|
||||
throw new ArgumentException($"Unknown alarm {alarmId}", nameof(alarmId));
|
||||
|
||||
ScriptedAlarmEvent? pending = null;
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
@@ -244,27 +262,50 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
// the exception propagates to the caller. (Core.ScriptedAlarms-007)
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
_alarms[alarmId] = state with { Condition = result.State };
|
||||
if (result.Emission != EmissionKind.None) EmitEvent(state, result.State, result.Emission);
|
||||
// Build the emission event under the gate (it captures a coherent
|
||||
// snapshot of state + message-template values) but defer the actual
|
||||
// OnEvent dispatch until after Release() so a slow subscriber or a
|
||||
// subscriber that re-enters the engine doesn't block / deadlock.
|
||||
// (Core.ScriptedAlarms-003)
|
||||
if (result.Emission != EmissionKind.None)
|
||||
pending = BuildEmission(state, result.State, result.Emission);
|
||||
else if (result.NoOpReason is { } reason)
|
||||
{
|
||||
// The Part9StateMachine remarks promise a diagnostic log line for
|
||||
// disabled-alarm no-ops + idempotent ack/confirm/shelve/unshelve
|
||||
// calls. We surface them at debug so they're available when
|
||||
// investigating "why didn't my ack take effect?" without spamming
|
||||
// the main info log. (Core.ScriptedAlarms-011)
|
||||
state.Logger.Debug("Alarm {AlarmId} no-op transition: {Reason}", alarmId, reason);
|
||||
}
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
|
||||
// OnEvent dispatch happens OUTSIDE _evalGate so subscribers can call back
|
||||
// into the engine (e.g. AcknowledgeAsync from inside an Activated handler)
|
||||
// without deadlocking against the non-reentrant SemaphoreSlim.
|
||||
if (pending is not null) FireEvent(pending);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Upstream-change callback. Updates the value cache + enqueues predicate
|
||||
/// re-evaluation for every alarm referencing the changed path. Fire-and-forget
|
||||
/// so driver-side dispatch isn't blocked.
|
||||
/// so driver-side dispatch isn't blocked; the background task is tracked so
|
||||
/// <see cref="Dispose"/> can drain it. (Core.ScriptedAlarms-006)
|
||||
/// </summary>
|
||||
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
||||
{
|
||||
_valueCache[path] = value;
|
||||
if (_disposed) return; // don't queue new work against a disposing engine
|
||||
if (_alarmsReferencing.TryGetValue(path, out var alarmIds))
|
||||
{
|
||||
_ = ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None);
|
||||
TrackBackgroundTask(ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None));
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReevaluateAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
var pending = new List<ScriptedAlarmEvent>(0);
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
@@ -280,7 +321,7 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
{
|
||||
if (!_alarms.TryGetValue(id, out var state)) continue;
|
||||
var newState = await EvaluatePredicateToStateAsync(
|
||||
state, state.Condition, _clock(), ct).ConfigureAwait(false);
|
||||
state, state.Condition, _clock(), ct, pending).ConfigureAwait(false);
|
||||
if (!ReferenceEquals(newState, state.Condition))
|
||||
{
|
||||
// Persist before updating in-memory so a store failure leaves
|
||||
@@ -295,16 +336,23 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Error(ex, "ScriptedAlarmEngine reevaluate failed");
|
||||
return;
|
||||
}
|
||||
// Fire emissions OUTSIDE _evalGate so subscriber callbacks can re-enter
|
||||
// the engine without deadlocking. (Core.ScriptedAlarms-003)
|
||||
foreach (var evt in pending) FireEvent(evt);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Evaluate the predicate + apply the resulting state-machine transition.
|
||||
/// Returns the new condition state. Emits the appropriate event if the
|
||||
/// transition produces one.
|
||||
/// Returns the new condition state. If the transition produces an emission,
|
||||
/// appends it to <paramref name="pendingEmissions"/> so the caller can fire
|
||||
/// them after releasing <c>_evalGate</c> — keeping subscriber callbacks
|
||||
/// outside the gate. (Core.ScriptedAlarms-003)
|
||||
/// </summary>
|
||||
private async Task<AlarmConditionState> EvaluatePredicateToStateAsync(
|
||||
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct)
|
||||
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct,
|
||||
List<ScriptedAlarmEvent>? pendingEmissions = null)
|
||||
{
|
||||
var inputs = BuildReadCache(state.Inputs);
|
||||
|
||||
@@ -340,7 +388,14 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
|
||||
var result = Part9StateMachine.ApplyPredicate(seed, predicateTrue, nowUtc);
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
{
|
||||
var evt = BuildEmission(state, result.State, result.Emission);
|
||||
if (evt is not null)
|
||||
{
|
||||
if (pendingEmissions is not null) pendingEmissions.Add(evt);
|
||||
else FireEvent(evt); // LoadAsync path: no caller-supplied list, fire here.
|
||||
}
|
||||
}
|
||||
return result.State;
|
||||
}
|
||||
|
||||
@@ -373,14 +428,24 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
return true;
|
||||
}
|
||||
|
||||
private void EmitEvent(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
||||
/// <summary>
|
||||
/// Build (but do not fire) the <see cref="ScriptedAlarmEvent"/> for a
|
||||
/// transition. Returns null for kinds that should not be published
|
||||
/// (<see cref="EmissionKind.Suppressed"/> and
|
||||
/// <see cref="EmissionKind.None"/>). Pure construction — called under
|
||||
/// <c>_evalGate</c> so the message-template resolution uses a coherent
|
||||
/// value-cache snapshot. The actual <see cref="OnEvent"/> dispatch is
|
||||
/// done by <see cref="FireEvent(ScriptedAlarmEvent)"/> AFTER the gate is
|
||||
/// released. (Core.ScriptedAlarms-003)
|
||||
/// </summary>
|
||||
private ScriptedAlarmEvent? BuildEmission(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
||||
{
|
||||
// Suppressed kind means shelving ate the emission — we don't fire for subscribers
|
||||
// but the state record still advanced so startup recovery reflects reality.
|
||||
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return;
|
||||
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return null;
|
||||
|
||||
var message = MessageTemplate.Resolve(state.Definition.MessageTemplate, TryLookup);
|
||||
var evt = new ScriptedAlarmEvent(
|
||||
return new ScriptedAlarmEvent(
|
||||
AlarmId: state.Definition.AlarmId,
|
||||
EquipmentPath: state.Definition.EquipmentPath,
|
||||
AlarmName: state.Definition.AlarmName,
|
||||
@@ -390,10 +455,22 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
Condition: condition,
|
||||
Emission: kind,
|
||||
TimestampUtc: _clock());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Invoke the <see cref="OnEvent"/> handler for a built emission. Must be
|
||||
/// called OUTSIDE <c>_evalGate</c>: a slow subscriber would otherwise
|
||||
/// block the gate for every other engine operation, and a subscriber
|
||||
/// that re-enters the engine (e.g. calls AcknowledgeAsync) would
|
||||
/// deadlock against the non-reentrant SemaphoreSlim.
|
||||
/// (Core.ScriptedAlarms-003)
|
||||
/// </summary>
|
||||
private void FireEvent(ScriptedAlarmEvent evt)
|
||||
{
|
||||
try { OnEvent?.Invoke(this, evt); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", state.Definition.AlarmId);
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", evt.AlarmId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -404,7 +481,24 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
{
|
||||
if (_disposed) return;
|
||||
var ids = _alarms.Keys.ToArray();
|
||||
_ = ShelvingCheckAsync(ids, CancellationToken.None);
|
||||
TrackBackgroundTask(ShelvingCheckAsync(ids, CancellationToken.None));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Register a fire-and-forget task so <see cref="Dispose"/> can await it.
|
||||
/// The task removes itself from the set on completion via a continuation.
|
||||
/// (Core.ScriptedAlarms-006)
|
||||
/// </summary>
|
||||
private void TrackBackgroundTask(Task task)
|
||||
{
|
||||
lock (_inFlightLock) { _inFlight.Add(task); }
|
||||
// Use ContinueWith with ExecuteSynchronously so the removal runs on the
|
||||
// completing thread — avoids scheduler delay between completion and
|
||||
// unregistration that would otherwise let Dispose see a stale set.
|
||||
task.ContinueWith(t =>
|
||||
{
|
||||
lock (_inFlightLock) { _inFlight.Remove(t); }
|
||||
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -416,6 +510,7 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
|
||||
private async Task ShelvingCheckAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
var pending = new List<ScriptedAlarmEvent>(0);
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
@@ -440,7 +535,10 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
_alarms[id] = state with { Condition = result.State };
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
{
|
||||
var evt = BuildEmission(state, result.State, result.Emission);
|
||||
if (evt is not null) pending.Add(evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -449,7 +547,10 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine shelving-check failed");
|
||||
return;
|
||||
}
|
||||
// Fire emissions OUTSIDE _evalGate. (Core.ScriptedAlarms-003)
|
||||
foreach (var evt in pending) FireEvent(evt);
|
||||
}
|
||||
|
||||
private void UnsubscribeFromUpstream()
|
||||
@@ -473,6 +574,28 @@ public sealed class ScriptedAlarmEngine : IDisposable
|
||||
_disposed = true;
|
||||
_shelvingTimer?.Dispose();
|
||||
UnsubscribeFromUpstream();
|
||||
|
||||
// Drain any fire-and-forget background work (ReevaluateAsync from
|
||||
// OnUpstreamChange + ShelvingCheckAsync from the 5s timer) that started
|
||||
// before _disposed = true was visible. Without this, a SaveAsync in
|
||||
// flight can outlive the engine and write to a (possibly disposed) store
|
||||
// after Dispose() has returned. The tasks re-check _disposed after
|
||||
// acquiring the gate and bail out, but the await still has to complete.
|
||||
// (Core.ScriptedAlarms-006)
|
||||
Task[] toAwait;
|
||||
lock (_inFlightLock) { toAwait = [.. _inFlight]; }
|
||||
if (toAwait.Length > 0)
|
||||
{
|
||||
try { Task.WhenAll(toAwait).GetAwaiter().GetResult(); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Background task failures already logged inside ReevaluateAsync /
|
||||
// ShelvingCheckAsync; surface here at debug so a parent shutdown is
|
||||
// not noisy. The key invariant is that the tasks have COMPLETED.
|
||||
_engineLogger.Debug(ex, "ScriptedAlarmEngine background task threw during shutdown drain");
|
||||
}
|
||||
}
|
||||
|
||||
// Do NOT clear _alarms here: Timer.Dispose() does not wait for in-flight callbacks,
|
||||
// so a ShelvingCheckAsync or ReevaluateAsync can still be running inside _evalGate.
|
||||
// Those paths now re-check _disposed after acquiring the gate and bail out safely.
|
||||
|
||||
Reference in New Issue
Block a user