diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index facdb37..392c6ec 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -83,7 +83,7 @@ {"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action broadcast so tests can replace it with a probe; un-skip both tests."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "completed", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "All three pieces landed: (1) spawn lifecycle in DriverHostActor (DriverSpawnPlanner + IDriverFactory seam) — da14149, (2) ISubscribable wiring + OPC UA status-code → OpcUaQuality severity-bit mapping + DetachSubscription on disconnect/PostStop, (3) IWritable.WriteAsync write path with 5s timeout, status-code bubble-up, and AttributeValuePublished published to parent on every OnDataChange — both shipped in the F7-residual batch. Host DI binding (DriverFactoryBootstrap registers AbCip/AbLegacy/FOCAS/Galaxy/Modbus/S7/TwinCAT factories) lives in src/Server/ZB.MOM.WW.OtOpcUa.Host/Drivers/."}, {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."}, - {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."}, + {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "(1) IScriptedAlarmEvaluator seam + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), evaluates on DependencyValueChanged, publishes AlarmTransitionEvent + ScriptLogEntry on every transition. (2) IAlarmActorStateStore seam in Commons.Engines + NullAlarmActorStateStore default + EfAlarmActorStateStore production adapter over the ScriptedAlarmState entity. ScriptedAlarmActor PreStart loads + restores; every Transition fires a fire-and-forget save with lastAckUser. Predicate binding to Core.ScriptedAlarms.ScriptedAlarmEngine still TODO — split as F9b."}, {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, {"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."}, diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IAlarmActorStateStore.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IAlarmActorStateStore.cs new file mode 100644 index 0000000..d84d382 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Engines/IAlarmActorStateStore.cs @@ -0,0 +1,41 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.Engines; + +/// +/// Persistence seam for ScriptedAlarmActor's in-memory state across actor restarts. +/// Captures only the slice the actor's 3-state machine needs (Inactive / Active / +/// Acknowledged + last transition + last-ack user). The fuller GxP audit trail +/// ('s Comments/Confirmed/Shelving) +/// stays in the production engine binding — this seam is the small surface the actor +/// consumes directly. +/// +public interface IAlarmActorStateStore +{ + Task LoadAsync(string alarmId, CancellationToken ct); + Task SaveAsync(AlarmActorStateSnapshot snapshot, CancellationToken ct); +} + +/// Persisted slice of ScriptedAlarmActor's state. Active is NOT persisted — +/// it re-derives from the evaluator on startup per Phase 7 decision #14. State here +/// distinguishes Acknowledged vs not-yet-acknowledged for cases where the actor came up +/// Active and operator interaction had already happened. +/// Matches ScriptedAlarm.ScriptedAlarmId. +/// Inactive / Active / Acknowledged — the actor's 3-state enum, projected to string. +/// When the actor last transitioned. +/// Who acknowledged most recently. Null when never acked. +public sealed record AlarmActorStateSnapshot( + string AlarmId, + string State, + DateTime LastTransitionUtc, + string? LastAckUser); + +/// No-op default. Bound when no production store is configured (tests, smoke runs). +/// Load returns null → actor boots Inactive; Save is a no-op so state doesn't leak. +public sealed class NullAlarmActorStateStore : IAlarmActorStateStore +{ + public static readonly NullAlarmActorStateStore Instance = new(); + private NullAlarmActorStateStore() { } + public Task LoadAsync(string alarmId, CancellationToken ct) => + Task.FromResult(null); + public Task SaveAsync(AlarmActorStateSnapshot snapshot, CancellationToken ct) => + Task.CompletedTask; +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/EfAlarmActorStateStore.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/EfAlarmActorStateStore.cs new file mode 100644 index 0000000..75be23d --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/EfAlarmActorStateStore.cs @@ -0,0 +1,111 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; + +/// +/// Production-side backed by the +/// table in the central config DB. The actor's +/// 3-state enum projects into the table's two persisted dimensions: Acked + an +/// internal "_lastActiveState" recorded via a synthetic mapping (Inactive ⇒ Acked, +/// Active ⇒ Unacked, Acknowledged ⇒ Acked). ActiveState itself is deliberately NOT +/// persisted — re-derives from the evaluator on startup (Phase 7 decision #14). +/// +public sealed class EfAlarmActorStateStore : IAlarmActorStateStore +{ + private readonly IDbContextFactory _dbFactory; + private readonly ILogger _logger; + + public EfAlarmActorStateStore( + IDbContextFactory dbFactory, + ILogger logger) + { + _dbFactory = dbFactory; + _logger = logger; + } + + public async Task LoadAsync(string alarmId, CancellationToken ct) + { + using var db = await _dbFactory.CreateDbContextAsync(ct).ConfigureAwait(false); + var row = await db.ScriptedAlarmStates.AsNoTracking() + .FirstOrDefaultAsync(r => r.ScriptedAlarmId == alarmId, ct) + .ConfigureAwait(false); + if (row is null) return null; + + var state = MapAckedToActorState(row.AckedState); + return new AlarmActorStateSnapshot( + AlarmId: alarmId, + State: state, + LastTransitionUtc: row.UpdatedAtUtc, + LastAckUser: row.LastAckUser); + } + + public async Task SaveAsync(AlarmActorStateSnapshot snapshot, CancellationToken ct) + { + using var db = await _dbFactory.CreateDbContextAsync(ct).ConfigureAwait(false); + var row = await db.ScriptedAlarmStates + .FirstOrDefaultAsync(r => r.ScriptedAlarmId == snapshot.AlarmId, ct) + .ConfigureAwait(false); + + var ackedState = MapActorStateToAcked(snapshot.State); + if (row is null) + { + db.ScriptedAlarmStates.Add(new ScriptedAlarmState + { + ScriptedAlarmId = snapshot.AlarmId, + EnabledState = "Enabled", + AckedState = ackedState, + ConfirmedState = "Confirmed", + ShelvingState = "Unshelved", + LastAckUser = snapshot.LastAckUser, + LastAckUtc = string.Equals(snapshot.State, "Acknowledged", StringComparison.Ordinal) + ? snapshot.LastTransitionUtc + : null, + UpdatedAtUtc = snapshot.LastTransitionUtc, + CommentsJson = "[]", + }); + } + else + { + row.AckedState = ackedState; + row.LastAckUser = snapshot.LastAckUser ?? row.LastAckUser; + if (string.Equals(snapshot.State, "Acknowledged", StringComparison.Ordinal)) + row.LastAckUtc = snapshot.LastTransitionUtc; + row.UpdatedAtUtc = snapshot.LastTransitionUtc; + } + + try + { + await db.SaveChangesAsync(ct).ConfigureAwait(false); + } + catch (DbUpdateConcurrencyException ex) + { + // Two actors racing to save the same alarm is benign — the last writer wins on + // UpdatedAtUtc, and the next transition on either side will write again. Log + // + drop so a race doesn't crash the dispatcher. + _logger.LogDebug(ex, "EfAlarmActorStateStore: concurrency conflict for {AlarmId}; dropping save", + snapshot.AlarmId); + } + } + + private static string MapActorStateToAcked(string actorState) => actorState switch + { + "Active" => "Unacknowledged", + "Acknowledged" => "Acknowledged", + // Inactive maps to Acknowledged — when an alarm clears, nothing is left to ack. + _ => "Acknowledged", + }; + + private static string MapAckedToActorState(string ackedState) + { + // Only Active distinguishes from Acked — Inactive comes from a re-eval, not from + // the table. Persisted "Unacknowledged" implies the actor was last Active + + // un-acked; we restore it to Active so a restart doesn't drop pending operator work. + return string.Equals(ackedState, "Unacknowledged", StringComparison.Ordinal) + ? "Active" + : "Acknowledged"; + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs index 1dbb016..f6f2e53 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs @@ -38,36 +38,84 @@ public sealed class ScriptedAlarmActor : ReceiveActor private readonly AlarmConfig _config; private readonly IScriptedAlarmEvaluator _evaluator; + private readonly IAlarmActorStateStore _stateStore; private readonly Func? _publisherFactory; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Dictionary _dependencies = new(StringComparer.Ordinal); private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive; + private string? _lastAckUser; + + public sealed record StateRestored(ScriptedAlarmActorState State, string? LastAckUser); public static Props Props( AlarmConfig config, IScriptedAlarmEvaluator? evaluator = null, - Func? publisherFactory = null) => + Func? publisherFactory = null, + IAlarmActorStateStore? stateStore = null) => Akka.Actor.Props.Create(() => new ScriptedAlarmActor( config, evaluator ?? NullScriptedAlarmEvaluator.Instance, - publisherFactory)); + publisherFactory, + stateStore ?? NullAlarmActorStateStore.Instance)); /// Legacy single-arg ctor kept for callers that only care about the state machine - /// (no engine evaluation, no DPS fan-out). Equivalent to Props(new AlarmConfig(...)). + /// (no engine evaluation, no DPS fan-out, no persistence). Equivalent to Props(new AlarmConfig(...)). public static Props Props(string alarmId) => Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null)); - public ScriptedAlarmActor(AlarmConfig config, IScriptedAlarmEvaluator evaluator, Func? publisherFactory) + public ScriptedAlarmActor( + AlarmConfig config, + IScriptedAlarmEvaluator evaluator, + Func? publisherFactory, + IAlarmActorStateStore stateStore) { _config = config; _evaluator = evaluator; _publisherFactory = publisherFactory; + _stateStore = stateStore; Receive(OnDependencyChanged); Receive(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); }); Receive(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); }); Receive(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); }); + Receive(OnStateRestored); + } + + protected override void PreStart() + { + // Load persisted state — when the store has a row, restore in-memory state before the + // first dependency-change arrives. Async I/O is piped back as StateRestored so we don't + // block the message-loop thread; until it arrives the actor stays at the default Inactive. + var self = Self; + _ = Task.Run(async () => + { + try + { + var snapshot = await _stateStore.LoadAsync(_config.AlarmId, CancellationToken.None) + .ConfigureAwait(false); + if (snapshot is null) return; + if (!Enum.TryParse(snapshot.State, ignoreCase: true, out var parsed)) + return; + self.Tell(new StateRestored(parsed, snapshot.LastAckUser)); + } + catch (Exception ex) + { + _log.Warning(ex, "ScriptedAlarm {Id}: state-store load failed; booting Inactive", + _config.AlarmId); + } + }); + } + + private void OnStateRestored(StateRestored msg) + { + // Active is re-derived from the evaluator at the next DependencyValueChanged — we still + // restore Active here so operators don't lose the in-flight transition if a restart races + // ahead of the next eval. The first evaluator tick will correct it if the condition cleared. + _state = msg.State; + _lastAckUser = msg.LastAckUser; + _log.Info("ScriptedAlarm {Id}: restored persisted state {State} (lastAck={User})", + _config.AlarmId, _state, _lastAckUser ?? "(none)"); } private void OnDependencyChanged(DependencyValueChanged msg) @@ -110,10 +158,12 @@ public sealed class ScriptedAlarmActor : ReceiveActor { var prev = _state; _state = next; + if (next == ScriptedAlarmActorState.Acknowledged) _lastAckUser = user; _log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next); var nowUtc = DateTime.UtcNow; Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc)); + PersistStateAsync(nowUtc); var kind = next switch { @@ -159,4 +209,28 @@ public sealed class ScriptedAlarmActor : ReceiveActor } DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload)); } + + private void PersistStateAsync(DateTime nowUtc) + { + var snapshot = new AlarmActorStateSnapshot( + AlarmId: _config.AlarmId, + State: _state.ToString(), + LastTransitionUtc: nowUtc, + LastAckUser: _lastAckUser); + + // Fire-and-forget. Save failures get logged but don't block the message loop — + // the worst case is a restart loses one transition, which then re-derives from + // the evaluator's next tick anyway. + _ = Task.Run(async () => + { + try + { + await _stateStore.SaveAsync(snapshot, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception ex) + { + _log.Warning(ex, "ScriptedAlarm {Id}: state-store save failed", _config.AlarmId); + } + }); + } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmStatePersistenceTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmStatePersistenceTests.cs new file mode 100644 index 0000000..408e4f3 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmStatePersistenceTests.cs @@ -0,0 +1,143 @@ +using System.Collections.Concurrent; +using Akka.Actor; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms; + +public sealed class ScriptedAlarmStatePersistenceTests : RuntimeActorTestBase +{ + [Fact] + public async Task Transition_writes_to_state_store_with_lastAckUser() + { + var store = new RecordingStateStore(); + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig("a-1", "Pump", "/eq", 700, Predicate: null); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(config, stateStore: store)); + + actor.Tell(new ScriptedAlarmActor.ConditionMet("threshold")); + parent.ExpectMsg(); + AwaitAssert(() => + { + store.Snapshots.Last().State.ShouldBe("Active"); + store.Snapshots.Last().LastAckUser.ShouldBeNull(); + }, duration: TimeSpan.FromSeconds(2)); + + actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("operator-jane")); + parent.ExpectMsg(); + AwaitAssert(() => + { + var ackedSnap = store.Snapshots.Last(s => s.State == "Acknowledged"); + ackedSnap.LastAckUser.ShouldBe("operator-jane"); + }, duration: TimeSpan.FromSeconds(2)); + } + + [Fact] + public async Task PreStart_restores_persisted_state_so_restart_does_not_drop_pending_ack() + { + var store = new RecordingStateStore(); + await store.SaveAsync(new AlarmActorStateSnapshot( + AlarmId: "a-1", + State: "Active", + LastTransitionUtc: DateTime.UtcNow.AddMinutes(-5), + LastAckUser: null), CancellationToken.None); + + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig("a-1", "Pump", "/eq", 700, Predicate: null); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(config, stateStore: store)); + + // After PreStart's async load, the actor should be in Active — duplicate ConditionMet + // is then ignored because the existing Active-state check. + AwaitAssert(() => + { + actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("operator-bob")); + parent.ExpectMsg(TimeSpan.FromMilliseconds(500)) + .State.ShouldBe(ScriptedAlarmActorState.Acknowledged); + }, duration: TimeSpan.FromSeconds(3)); + } + + [Fact] + public async Task PreStart_with_no_persisted_state_boots_inactive() + { + var store = new RecordingStateStore(); + var parent = CreateTestProbe(); + var config = new ScriptedAlarmActor.AlarmConfig("never-seen", "X", "/eq", 500, Predicate: null); + var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(config, stateStore: store)); + + // Empty store ⇒ actor sits Inactive; AcknowledgeAlarm is ignored from Inactive so no + // StateChanged should arrive. + await Task.Delay(200); + actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("anyone")); + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task EfAlarmActorStateStore_round_trip_persists_via_ConfigDb() + { + var db = NewInMemoryDbFactory(); + var ef = new EfAlarmActorStateStore(db, NullLogger.Instance); + + await ef.SaveAsync(new AlarmActorStateSnapshot( + AlarmId: "alarm-7", + State: "Active", + LastTransitionUtc: DateTime.UtcNow, + LastAckUser: null), CancellationToken.None); + + using (var ctx = db.CreateDbContext()) + { + var row = ctx.ScriptedAlarmStates.Single(r => r.ScriptedAlarmId == "alarm-7"); + row.AckedState.ShouldBe("Unacknowledged"); + } + + // Acknowledge — same alarmId, transitions to Acknowledged. + await ef.SaveAsync(new AlarmActorStateSnapshot( + AlarmId: "alarm-7", + State: "Acknowledged", + LastTransitionUtc: DateTime.UtcNow, + LastAckUser: "jane"), CancellationToken.None); + + var loaded = await ef.LoadAsync("alarm-7", CancellationToken.None); + loaded.ShouldNotBeNull(); + loaded.State.ShouldBe("Acknowledged"); + loaded.LastAckUser.ShouldBe("jane"); + + using (var ctx = db.CreateDbContext()) + { + ctx.ScriptedAlarmStates.Count(r => r.ScriptedAlarmId == "alarm-7").ShouldBe(1); + ctx.ScriptedAlarmStates.Single(r => r.ScriptedAlarmId == "alarm-7").LastAckUser.ShouldBe("jane"); + } + } + + [Fact] + public async Task EfAlarmActorStateStore_load_for_missing_id_returns_null() + { + var db = NewInMemoryDbFactory(); + var ef = new EfAlarmActorStateStore(db, NullLogger.Instance); + + var loaded = await ef.LoadAsync("never-saved", CancellationToken.None); + loaded.ShouldBeNull(); + } + + private sealed class RecordingStateStore : IAlarmActorStateStore + { + private readonly ConcurrentDictionary _byId = new(StringComparer.Ordinal); + private readonly ConcurrentQueue _saves = new(); + + public List Snapshots => _saves.ToList(); + + public Task LoadAsync(string alarmId, CancellationToken ct) + => Task.FromResult(_byId.TryGetValue(alarmId, out var v) ? v : null); + + public Task SaveAsync(AlarmActorStateSnapshot snapshot, CancellationToken ct) + { + _byId[snapshot.AlarmId] = snapshot; + _saves.Enqueue(snapshot); + return Task.CompletedTask; + } + } +}