From fda7ac9c5092aeb2a6eb72a6ff695a6ed4c0d4dc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 31 May 2026 01:49:28 -0400 Subject: [PATCH] feat(siteruntime): NativeAlarmActor mirrors source alarms (snapshot swap, retention, persistence) --- .../Actors/NativeAlarmActor.cs | 334 ++++++++++++++++++ .../Actors/NativeAlarmActorTests.cs | 132 +++++++ 2 files changed, 466 insertions(+) create mode 100644 src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/NativeAlarmActor.cs create mode 100644 tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/NativeAlarmActorTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/NativeAlarmActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/NativeAlarmActor.cs new file mode 100644 index 00000000..28747cea --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/NativeAlarmActor.cs @@ -0,0 +1,334 @@ +using System.Text.Json; +using Akka.Actor; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; + +/// +/// Task 15: Native Alarm Actor — child of Instance Actor, peer to the computed +/// . Mirrors a single source binding's native alarms +/// read-only: subscribes through the DCL, applies snapshot/live transitions, +/// retains active+unacked conditions, persists to site SQLite, and emits an +/// enriched upward for every transition. +/// +/// State is keyed by (the +/// stable per-condition key). A snapshot replay (Snapshot* sentinels) atomically +/// swaps the mirrored set so conditions that cleared while disconnected emit a +/// return-to-normal. Persistence is best-effort/fire-and-forget — a failed write +/// never blocks the actor or aborts the emit. +/// +/// Supervision: parent Instance Actor uses Resume for coordinator children. +/// +public class NativeAlarmActor : ReceiveActor +{ + private readonly ResolvedNativeAlarmSource _source; + private readonly string _instanceName; + private readonly IActorRef _instanceActor; + private readonly IActorRef _dclManager; + private readonly SiteStorageService _storage; + private readonly SiteRuntimeOptions _options; + private readonly ILogger _logger; + private readonly AlarmKind _nativeKind; + + /// Current mirrored conditions, keyed by source reference. + private readonly Dictionary _alarms = new(); + + /// Buffer accumulating a snapshot replay until . + private readonly Dictionary _snapshotBuffer = new(); + + private ICancelable? _retryTimer; + + /// Initializes a new for a single resolved source binding. + /// The resolved native alarm source (connection + source reference + filter). + /// The owning instance's unique name. + /// Parent instance actor; receives emitted . + /// DCL target the subscribe request is sent to. + /// Site-local SQLite store for rehydration + persistence. + /// Site runtime options (alarm cap, retry interval). + /// Logger for diagnostics. + /// Alarm kind to stamp on emitted events (OPC UA vs MxAccess); set by the + /// Instance Actor from the connection protocol. Defaults to . + public NativeAlarmActor( + ResolvedNativeAlarmSource source, + string instanceName, + IActorRef instanceActor, + IActorRef dclManager, + SiteStorageService storage, + SiteRuntimeOptions options, + ILogger logger, + AlarmKind nativeKind = AlarmKind.NativeOpcUa) + { + _source = source; + _instanceName = instanceName; + _instanceActor = instanceActor; + _dclManager = dclManager; + _storage = storage; + _options = options; + _logger = logger; + _nativeKind = nativeKind; + + Receive(HandleRehydration); + Receive(HandleTransition); + Receive(HandleSourceUnavailable); + Receive(HandleSubscribeResponse); + Receive(_ => SendSubscribe()); + } + + /// + protected override void PreStart() + { + base.PreStart(); + + // Rehydrate last-known state from SQLite (non-blocking), then subscribe. + // A fresh source snapshot will reconcile/replace this once it arrives. + _storage.GetNativeAlarmsAsync(_instanceName, _source.CanonicalName) + .PipeTo(Self, Self, rows => new RehydrationCompleted(rows)); + + SendSubscribe(); + + _logger.LogInformation( + "NativeAlarmActor started for {Source} ({Connection}:{Ref}) on {Instance}", + _source.CanonicalName, _source.ConnectionName, _source.SourceReference, _instanceName); + } + + /// + protected override void PostStop() + { + _retryTimer?.Cancel(); + base.PostStop(); + } + + private void SendSubscribe() + { + _dclManager.Tell(new SubscribeAlarmsRequest( + CorrelationId: Guid.NewGuid().ToString("N"), + InstanceUniqueName: _instanceName, + ConnectionName: _source.ConnectionName, + SourceReference: _source.SourceReference, + ConditionFilter: _source.ConditionFilter, + Timestamp: DateTimeOffset.UtcNow)); + } + + private void HandleRehydration(RehydrationCompleted msg) + { + foreach (var row in msg.Rows) + { + // A live transition that arrived before the read completed wins. + if (_alarms.ContainsKey(row.SourceReference)) + { + continue; + } + + AlarmConditionState? condition; + try + { + condition = JsonSerializer.Deserialize(row.ConditionJson); + } + catch (JsonException ex) + { + _logger.LogWarning(ex, "Discarding unreadable persisted native alarm {Ref} on {Instance}", + row.SourceReference, _instanceName); + continue; + } + + if (condition is null) + { + continue; + } + + // Metadata (type/category/message) is not persisted — rehydrate the + // condition + key so the DebugView reflects last-known state until the + // first source snapshot replaces it. + var t = new NativeAlarmTransition( + row.SourceReference, string.Empty, string.Empty, AlarmTransitionKind.Snapshot, + condition, string.Empty, string.Empty, string.Empty, string.Empty, string.Empty, + null, row.LastTransitionAt, string.Empty, string.Empty); + _alarms[row.SourceReference] = t; + Emit(t, t.Condition); + } + } + + private void HandleTransition(NativeAlarmTransitionUpdate update) + { + var t = update.Transition; + switch (t.Kind) + { + case AlarmTransitionKind.Snapshot: + _snapshotBuffer[t.SourceReference] = t; + break; + + case AlarmTransitionKind.SnapshotComplete: + ApplySnapshotSwap(); + break; + + default: + ApplyLiveTransition(t); + break; + } + } + + /// + /// Atomically replaces the mirrored set with the buffered snapshot: conditions + /// no longer present emit a return-to-normal (and drop out); present conditions + /// are upserted, persisted, and emitted. + /// + private void ApplySnapshotSwap() + { + foreach (var (sourceRef, prior) in _alarms) + { + if (!_snapshotBuffer.ContainsKey(sourceRef)) + { + Emit(prior, prior.Condition with { Active = false }); + PersistDelete(sourceRef); + } + } + + _alarms.Clear(); + foreach (var (sourceRef, t) in _snapshotBuffer) + { + _alarms[sourceRef] = t; + PersistUpsert(t); + Emit(t, t.Condition); + } + + _snapshotBuffer.Clear(); + EnforceCap(); + } + + private void ApplyLiveTransition(NativeAlarmTransition t) + { + // Ignore stale (out-of-order) transitions for a condition we already hold. + if (_alarms.TryGetValue(t.SourceReference, out var existing) && t.TransitionTime < existing.TransitionTime) + { + return; + } + + _alarms[t.SourceReference] = t; + PersistUpsert(t); + Emit(t, t.Condition); + + // Retention: a resolved condition (inactive AND acknowledged) drops out of + // the mirror — the return-to-normal has already been emitted above. + if (!t.Condition.Active && t.Condition.Acknowledged) + { + _alarms.Remove(t.SourceReference); + PersistDelete(t.SourceReference); + } + + EnforceCap(); + } + + private void HandleSourceUnavailable(NativeAlarmSourceUnavailable msg) + { + // Keep last-known conditions (uncertain) rather than clearing them; the + // reconnect snapshot reconciles state. No emit — avoids a flap. + _logger.LogWarning( + "Native alarm feed unavailable for {Source} ({Connection}) on {Instance}; retaining {Count} mirrored alarms pending reconnect snapshot", + _source.CanonicalName, msg.ConnectionName, _instanceName, _alarms.Count); + } + + private void HandleSubscribeResponse(SubscribeAlarmsResponse resp) + { + if (resp.Success) + { + _logger.LogInformation("Native alarm subscription established for {Source} on {Instance}", + _source.CanonicalName, _instanceName); + return; + } + + _logger.LogWarning( + "Native alarm subscription failed for {Source} on {Instance}: {Error}; retrying in {RetryMs}ms", + _source.CanonicalName, _instanceName, resp.ErrorMessage, _options.NativeAlarmRetryIntervalMs); + + _retryTimer?.Cancel(); + _retryTimer = Context.System.Scheduler.ScheduleTellOnceCancelable( + TimeSpan.FromMilliseconds(_options.NativeAlarmRetryIntervalMs), + Self, RetrySubscribe.Instance, Self); + } + + /// Caps the mirrored set, dropping the oldest conditions (and logging — no silent truncation). + private void EnforceCap() + { + var cap = _options.MirroredAlarmCapPerSource; + if (_alarms.Count <= cap) + { + return; + } + + var overflow = _alarms.Values + .OrderBy(a => a.TransitionTime) + .Take(_alarms.Count - cap) + .Select(a => a.SourceReference) + .ToList(); + + foreach (var sourceRef in overflow) + { + _alarms.Remove(sourceRef); + PersistDelete(sourceRef); + _logger.LogWarning( + "Native alarm cap {Cap} exceeded for {Source} on {Instance}; dropped oldest mirrored alarm {Ref}", + cap, _source.CanonicalName, _instanceName, sourceRef); + } + } + + /// Builds and tells the parent an enriched for a condition. + private void Emit(NativeAlarmTransition t, AlarmConditionState condition) + { + var change = new AlarmStateChanged( + _instanceName, + t.SourceReference, + condition.Active ? AlarmState.Active : AlarmState.Normal, + condition.Severity, + t.TransitionTime) + { + Kind = _nativeKind, + Condition = condition, + SourceReference = t.SourceReference, + AlarmTypeName = t.AlarmTypeName, + Category = t.Category, + Message = t.Message, + OperatorUser = t.OperatorUser, + OperatorComment = t.OperatorComment, + OriginalRaiseTime = t.OriginalRaiseTime, + CurrentValue = t.CurrentValue, + LimitValue = t.LimitValue + }; + + _instanceActor.Tell(change); + } + + private void PersistUpsert(NativeAlarmTransition t) + { + var json = JsonSerializer.Serialize(t.Condition); + _storage.UpsertNativeAlarmAsync(_instanceName, _source.CanonicalName, t.SourceReference, json, t.TransitionTime) + .ContinueWith( + task => _logger.LogWarning(task.Exception, + "Failed to persist native alarm {Ref} on {Instance}", t.SourceReference, _instanceName), + TaskContinuationOptions.OnlyOnFaulted); + } + + private void PersistDelete(string sourceReference) + { + _storage.DeleteNativeAlarmAsync(_instanceName, _source.CanonicalName, sourceReference) + .ContinueWith( + task => _logger.LogWarning(task.Exception, + "Failed to delete native alarm {Ref} on {Instance}", sourceReference, _instanceName), + TaskContinuationOptions.OnlyOnFaulted); + } + + // ── Internal messages ── + + private sealed record RehydrationCompleted(IReadOnlyList Rows); + + private sealed class RetrySubscribe + { + public static readonly RetrySubscribe Instance = new(); + private RetrySubscribe() { } + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/NativeAlarmActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/NativeAlarmActorTests.cs new file mode 100644 index 00000000..ac9614b9 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/NativeAlarmActorTests.cs @@ -0,0 +1,132 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; +using ZB.MOM.WW.ScadaBridge.SiteRuntime; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; + +/// +/// Task 15: NativeAlarmActor mirrors a source's native alarms — subscribe on start, +/// emit enriched AlarmStateChanged, snapshot atomic swap, retention, persistence. +/// +public class NativeAlarmActorTests : TestKit, IDisposable +{ + private readonly string _dbFile; + private readonly SiteStorageService _storage; + private readonly SiteRuntimeOptions _options = new(); + + public NativeAlarmActorTests() + { + _dbFile = Path.Combine(Path.GetTempPath(), $"naa-{Guid.NewGuid():N}.db"); + _storage = new SiteStorageService($"Data Source={_dbFile}", NullLogger.Instance); + _storage.InitializeAsync().GetAwaiter().GetResult(); + } + + private static ResolvedNativeAlarmSource Source() => new() + { + CanonicalName = "Pressure", + ConnectionName = "Opc", + SourceReference = "ns=2;s=T01" + }; + + private static NativeAlarmTransition Transition( + string sourceRef, AlarmTransitionKind kind, AlarmConditionState condition, DateTimeOffset? time = null) => + new(sourceRef, "T01", "AnalogLimit.Hi", kind, condition, + "Process", "hi", "hi", "", "", null, time ?? DateTimeOffset.UtcNow, "92", "90"); + + private IActorRef Spawn(IActorRef instanceActor, IActorRef dclManager) => + ActorOf(Props.Create(() => new NativeAlarmActor( + Source(), "inst", instanceActor, dclManager, _storage, _options, NullLogger.Instance))); + + [Fact] + public void SubscribeOnStart_SendsRequestForSourceBinding() + { + var dcl = CreateTestProbe(); + Spawn(CreateTestProbe().Ref, dcl.Ref); + + var req = dcl.ExpectMsg(); + Assert.Equal("inst", req.InstanceUniqueName); + Assert.Equal("Opc", req.ConnectionName); + Assert.Equal("ns=2;s=T01", req.SourceReference); + } + + [Fact] + public void Raise_EmitsEnrichedAlarmStateChanged() + { + var instance = CreateTestProbe(); + var dcl = CreateTestProbe(); + var actor = Spawn(instance.Ref, dcl.Ref); + dcl.ExpectMsg(); + + actor.Tell(new NativeAlarmTransitionUpdate("Opc", Transition( + "T01.Hi", AlarmTransitionKind.Raise, + new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 800)))); + + var emitted = instance.ExpectMsg(); + Assert.Equal(AlarmKind.NativeOpcUa, emitted.Kind); + Assert.Equal("T01.Hi", emitted.SourceReference); + Assert.Equal(AlarmState.Active, emitted.State); + Assert.Equal(800, emitted.Condition.Severity); + } + + [Fact] + public void SnapshotComplete_WithMissingPriorAlarm_EmitsReturnToNormal() + { + var instance = CreateTestProbe(); + var dcl = CreateTestProbe(); + var actor = Spawn(instance.Ref, dcl.Ref); + dcl.ExpectMsg(); + + actor.Tell(new NativeAlarmTransitionUpdate("Opc", Transition( + "T01.Hi", AlarmTransitionKind.Raise, + new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 800)))); + instance.ExpectMsg(m => m.State == AlarmState.Active); + + // A fresh snapshot that no longer contains T01.Hi means it cleared at the source. + actor.Tell(new NativeAlarmTransitionUpdate("Opc", Transition( + "T01.Hi", AlarmTransitionKind.SnapshotComplete, + new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 800)))); + + var cleared = instance.ExpectMsg(); + Assert.Equal("T01.Hi", cleared.SourceReference); + Assert.Equal(AlarmState.Normal, cleared.State); + } + + [Fact] + public void OlderTransition_IsIgnored() + { + var instance = CreateTestProbe(); + var dcl = CreateTestProbe(); + var actor = Spawn(instance.Ref, dcl.Ref); + dcl.ExpectMsg(); + + var t0 = DateTimeOffset.UtcNow; + actor.Tell(new NativeAlarmTransitionUpdate("Opc", Transition( + "T01.Hi", AlarmTransitionKind.Raise, + new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 800), t0))); + instance.ExpectMsg(m => m.State == AlarmState.Active); + + // An out-of-order (older) transition must not overwrite newer state. + actor.Tell(new NativeAlarmTransitionUpdate("Opc", Transition( + "T01.Hi", AlarmTransitionKind.Clear, + new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), t0.AddSeconds(-30)))); + + instance.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + void IDisposable.Dispose() + { + Shutdown(); + if (File.Exists(_dbFile)) + { + File.Delete(_dbFile); + } + } +}