feat(runtime): #112 ScriptedAlarmActor state persistence via IAlarmActorStateStore
Some checks failed
v2-ci / build (push) Failing after 42s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
Some checks failed
v2-ci / build (push) Failing after 42s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
ScriptedAlarmActor now survives actor restart: PreStart loads from the configured store + restores in-memory state; every Transition() fires a fire-and-forget save. ActiveState still re-derives from the evaluator on first tick (Phase 7 decision #14), but Acked state + lastAckUser persist verbatim so operators don't re-ack across an outage. Three pieces: - IAlarmActorStateStore seam in Commons.Engines, with the AlarmActorStateSnapshot record (alarmId / state / lastTransitionUtc / lastAckUser) and NullAlarmActorStateStore default. - EfAlarmActorStateStore in Runtime.ScriptedAlarms — production adapter over the existing ScriptedAlarmState table in ConfigDb. Maps the actor's 3-state enum to the table's AckedState column (Active⇒Unacknowledged, Acknowledged⇒Acknowledged, Inactive⇒ Acknowledged). Concurrency conflicts are logged + dropped — the next transition writes again. - ScriptedAlarmActor PreStart load (async, piped back as StateRestored) + Transition save. New Props overload takes the store; default is NullAlarmActorStateStore so tests stay quiet. Tests: Runtime 52 -> 57 (+5): - Transition writes Active then Acknowledged snapshots with lastAckUser populated - PreStart with persisted Active state restores so a subsequent AcknowledgeAlarm fires (not ignored as it would be from Inactive) - Empty store boots Inactive (AcknowledgeAlarm correctly ignored) - EfAlarmActorStateStore Save + Load round-trips via in-memory EF - Load for unknown alarmId returns null All 6 v2 test suites green: 157 tests passing. Closes #112. F9 (#80) remaining residual is predicate binding to Core.ScriptedAlarms.ScriptedAlarmEngine — split as F9b in tasks JSON.
This commit is contained in:
@@ -38,36 +38,84 @@ public sealed class ScriptedAlarmActor : ReceiveActor
|
||||
|
||||
private readonly AlarmConfig _config;
|
||||
private readonly IScriptedAlarmEvaluator _evaluator;
|
||||
private readonly IAlarmActorStateStore _stateStore;
|
||||
private readonly Func<DPSPublisher>? _publisherFactory;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
|
||||
|
||||
private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive;
|
||||
private string? _lastAckUser;
|
||||
|
||||
public sealed record StateRestored(ScriptedAlarmActorState State, string? LastAckUser);
|
||||
|
||||
public static Props Props(
|
||||
AlarmConfig config,
|
||||
IScriptedAlarmEvaluator? evaluator = null,
|
||||
Func<DPSPublisher>? publisherFactory = null) =>
|
||||
Func<DPSPublisher>? publisherFactory = null,
|
||||
IAlarmActorStateStore? stateStore = null) =>
|
||||
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(
|
||||
config,
|
||||
evaluator ?? NullScriptedAlarmEvaluator.Instance,
|
||||
publisherFactory));
|
||||
publisherFactory,
|
||||
stateStore ?? NullAlarmActorStateStore.Instance));
|
||||
|
||||
/// <summary>Legacy single-arg ctor kept for callers that only care about the state machine
|
||||
/// (no engine evaluation, no DPS fan-out). Equivalent to <c>Props(new AlarmConfig(...))</c>.</summary>
|
||||
/// (no engine evaluation, no DPS fan-out, no persistence). Equivalent to <c>Props(new AlarmConfig(...))</c>.</summary>
|
||||
public static Props Props(string alarmId) =>
|
||||
Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null));
|
||||
|
||||
public ScriptedAlarmActor(AlarmConfig config, IScriptedAlarmEvaluator evaluator, Func<DPSPublisher>? publisherFactory)
|
||||
public ScriptedAlarmActor(
|
||||
AlarmConfig config,
|
||||
IScriptedAlarmEvaluator evaluator,
|
||||
Func<DPSPublisher>? publisherFactory,
|
||||
IAlarmActorStateStore stateStore)
|
||||
{
|
||||
_config = config;
|
||||
_evaluator = evaluator;
|
||||
_publisherFactory = publisherFactory;
|
||||
_stateStore = stateStore;
|
||||
|
||||
Receive<DependencyValueChanged>(OnDependencyChanged);
|
||||
Receive<ConditionMet>(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); });
|
||||
Receive<AcknowledgeAlarm>(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); });
|
||||
Receive<ConditionCleared>(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); });
|
||||
Receive<StateRestored>(OnStateRestored);
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
{
|
||||
// Load persisted state — when the store has a row, restore in-memory state before the
|
||||
// first dependency-change arrives. Async I/O is piped back as StateRestored so we don't
|
||||
// block the message-loop thread; until it arrives the actor stays at the default Inactive.
|
||||
var self = Self;
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var snapshot = await _stateStore.LoadAsync(_config.AlarmId, CancellationToken.None)
|
||||
.ConfigureAwait(false);
|
||||
if (snapshot is null) return;
|
||||
if (!Enum.TryParse<ScriptedAlarmActorState>(snapshot.State, ignoreCase: true, out var parsed))
|
||||
return;
|
||||
self.Tell(new StateRestored(parsed, snapshot.LastAckUser));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "ScriptedAlarm {Id}: state-store load failed; booting Inactive",
|
||||
_config.AlarmId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void OnStateRestored(StateRestored msg)
|
||||
{
|
||||
// Active is re-derived from the evaluator at the next DependencyValueChanged — we still
|
||||
// restore Active here so operators don't lose the in-flight transition if a restart races
|
||||
// ahead of the next eval. The first evaluator tick will correct it if the condition cleared.
|
||||
_state = msg.State;
|
||||
_lastAckUser = msg.LastAckUser;
|
||||
_log.Info("ScriptedAlarm {Id}: restored persisted state {State} (lastAck={User})",
|
||||
_config.AlarmId, _state, _lastAckUser ?? "(none)");
|
||||
}
|
||||
|
||||
private void OnDependencyChanged(DependencyValueChanged msg)
|
||||
@@ -110,10 +158,12 @@ public sealed class ScriptedAlarmActor : ReceiveActor
|
||||
{
|
||||
var prev = _state;
|
||||
_state = next;
|
||||
if (next == ScriptedAlarmActorState.Acknowledged) _lastAckUser = user;
|
||||
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next);
|
||||
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc));
|
||||
PersistStateAsync(nowUtc);
|
||||
|
||||
var kind = next switch
|
||||
{
|
||||
@@ -159,4 +209,28 @@ public sealed class ScriptedAlarmActor : ReceiveActor
|
||||
}
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload));
|
||||
}
|
||||
|
||||
private void PersistStateAsync(DateTime nowUtc)
|
||||
{
|
||||
var snapshot = new AlarmActorStateSnapshot(
|
||||
AlarmId: _config.AlarmId,
|
||||
State: _state.ToString(),
|
||||
LastTransitionUtc: nowUtc,
|
||||
LastAckUser: _lastAckUser);
|
||||
|
||||
// Fire-and-forget. Save failures get logged but don't block the message loop —
|
||||
// the worst case is a restart loses one transition, which then re-derives from
|
||||
// the evaluator's next tick anyway.
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _stateStore.SaveAsync(snapshot, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "ScriptedAlarm {Id}: state-store save failed", _config.AlarmId);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user