using System.Text.Json; using Akka.Actor; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming; using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; /// /// Task 15: Native Alarm Actor — child of Instance Actor, peer to the computed /// . Mirrors a single source binding's native alarms /// read-only: subscribes through the DCL, applies snapshot/live transitions, /// retains active+unacked conditions, persists to site SQLite, and emits an /// enriched upward for every transition. /// /// State is keyed by (the /// stable per-condition key). A snapshot replay (Snapshot* sentinels) atomically /// swaps the mirrored set so conditions that cleared while disconnected emit a /// return-to-normal. Persistence is best-effort/fire-and-forget — a failed write /// never blocks the actor or aborts the emit. /// /// Supervision: parent Instance Actor uses Resume for coordinator children. /// public class NativeAlarmActor : ReceiveActor { private readonly ResolvedNativeAlarmSource _source; private readonly string _instanceName; private readonly IActorRef _instanceActor; private readonly IActorRef _dclManager; private readonly SiteStorageService _storage; private readonly SiteRuntimeOptions _options; private readonly ILogger _logger; private readonly AlarmKind _nativeKind; /// Current mirrored conditions, keyed by source reference. private readonly Dictionary _alarms = new(); /// Buffer accumulating a snapshot replay until . private readonly Dictionary _snapshotBuffer = new(); private ICancelable? _retryTimer; /// Initializes a new for a single resolved source binding. /// The resolved native alarm source (connection + source reference + filter). /// The owning instance's unique name. /// Parent instance actor; receives emitted . /// DCL target the subscribe request is sent to. /// Site-local SQLite store for rehydration + persistence. /// Site runtime options (alarm cap, retry interval). /// Logger for diagnostics. /// Alarm kind to stamp on emitted events (OPC UA vs MxAccess); set by the /// Instance Actor from the connection protocol. Defaults to . public NativeAlarmActor( ResolvedNativeAlarmSource source, string instanceName, IActorRef instanceActor, IActorRef dclManager, SiteStorageService storage, SiteRuntimeOptions options, ILogger logger, AlarmKind nativeKind = AlarmKind.NativeOpcUa) { _source = source; _instanceName = instanceName; _instanceActor = instanceActor; _dclManager = dclManager; _storage = storage; _options = options; _logger = logger; _nativeKind = nativeKind; Receive(HandleRehydration); Receive(HandleTransition); Receive(HandleSourceUnavailable); Receive(HandleSubscribeResponse); Receive(_ => SendSubscribe()); } /// protected override void PreStart() { base.PreStart(); // Rehydrate last-known state from SQLite (non-blocking), then subscribe. // A fresh source snapshot will reconcile/replace this once it arrives. _storage.GetNativeAlarmsAsync(_instanceName, _source.CanonicalName) .PipeTo(Self, Self, rows => new RehydrationCompleted(rows)); SendSubscribe(); _logger.LogInformation( "NativeAlarmActor started for {Source} ({Connection}:{Ref}) on {Instance}", _source.CanonicalName, _source.ConnectionName, _source.SourceReference, _instanceName); } /// protected override void PostStop() { _retryTimer?.Cancel(); base.PostStop(); } private void SendSubscribe() { _dclManager.Tell(new SubscribeAlarmsRequest( CorrelationId: Guid.NewGuid().ToString("N"), InstanceUniqueName: _instanceName, ConnectionName: _source.ConnectionName, SourceReference: _source.SourceReference, ConditionFilter: _source.ConditionFilter, Timestamp: DateTimeOffset.UtcNow)); } private void HandleRehydration(RehydrationCompleted msg) { foreach (var row in msg.Rows) { // A live transition that arrived before the read completed wins. if (_alarms.ContainsKey(row.SourceReference)) { continue; } AlarmConditionState? condition; try { condition = JsonSerializer.Deserialize(row.ConditionJson); } catch (JsonException ex) { _logger.LogWarning(ex, "Discarding unreadable persisted native alarm {Ref} on {Instance}", row.SourceReference, _instanceName); continue; } if (condition is null) { continue; } // Metadata (type/category/message) is not persisted — rehydrate the // condition + key so the DebugView reflects last-known state until the // first source snapshot replaces it. var t = new NativeAlarmTransition( row.SourceReference, string.Empty, string.Empty, AlarmTransitionKind.Snapshot, condition, string.Empty, string.Empty, string.Empty, string.Empty, string.Empty, null, row.LastTransitionAt, string.Empty, string.Empty); _alarms[row.SourceReference] = t; Emit(t, t.Condition); } } private void HandleTransition(NativeAlarmTransitionUpdate update) { var t = update.Transition; switch (t.Kind) { case AlarmTransitionKind.Snapshot: _snapshotBuffer[t.SourceReference] = t; break; case AlarmTransitionKind.SnapshotComplete: ApplySnapshotSwap(); break; default: ApplyLiveTransition(t); break; } } /// /// Atomically replaces the mirrored set with the buffered snapshot: conditions /// no longer present emit a return-to-normal (and drop out); present conditions /// are upserted, persisted, and emitted. /// private void ApplySnapshotSwap() { foreach (var (sourceRef, prior) in _alarms) { if (!_snapshotBuffer.ContainsKey(sourceRef)) { Emit(prior, prior.Condition with { Active = false }); PersistDelete(sourceRef); } } _alarms.Clear(); foreach (var (sourceRef, t) in _snapshotBuffer) { _alarms[sourceRef] = t; PersistUpsert(t); Emit(t, t.Condition); } _snapshotBuffer.Clear(); EnforceCap(); } private void ApplyLiveTransition(NativeAlarmTransition t) { // Ignore stale (out-of-order) transitions for a condition we already hold. if (_alarms.TryGetValue(t.SourceReference, out var existing) && t.TransitionTime < existing.TransitionTime) { return; } _alarms[t.SourceReference] = t; PersistUpsert(t); Emit(t, t.Condition); // Retention: a resolved condition (inactive AND acknowledged) drops out of // the mirror — the return-to-normal has already been emitted above. if (!t.Condition.Active && t.Condition.Acknowledged) { _alarms.Remove(t.SourceReference); PersistDelete(t.SourceReference); } EnforceCap(); } private void HandleSourceUnavailable(NativeAlarmSourceUnavailable msg) { // Keep last-known conditions (uncertain) rather than clearing them; the // reconnect snapshot reconciles state. No emit — avoids a flap. _logger.LogWarning( "Native alarm feed unavailable for {Source} ({Connection}) on {Instance}; retaining {Count} mirrored alarms pending reconnect snapshot", _source.CanonicalName, msg.ConnectionName, _instanceName, _alarms.Count); } private void HandleSubscribeResponse(SubscribeAlarmsResponse resp) { if (resp.Success) { _logger.LogInformation("Native alarm subscription established for {Source} on {Instance}", _source.CanonicalName, _instanceName); return; } _logger.LogWarning( "Native alarm subscription failed for {Source} on {Instance}: {Error}; retrying in {RetryMs}ms", _source.CanonicalName, _instanceName, resp.ErrorMessage, _options.NativeAlarmRetryIntervalMs); _retryTimer?.Cancel(); _retryTimer = Context.System.Scheduler.ScheduleTellOnceCancelable( TimeSpan.FromMilliseconds(_options.NativeAlarmRetryIntervalMs), Self, RetrySubscribe.Instance, Self); } /// Caps the mirrored set, dropping the oldest conditions (and logging — no silent truncation). private void EnforceCap() { var cap = _options.MirroredAlarmCapPerSource; if (_alarms.Count <= cap) { return; } var overflow = _alarms.Values .OrderBy(a => a.TransitionTime) .Take(_alarms.Count - cap) .Select(a => a.SourceReference) .ToList(); foreach (var sourceRef in overflow) { _alarms.Remove(sourceRef); PersistDelete(sourceRef); _logger.LogWarning( "Native alarm cap {Cap} exceeded for {Source} on {Instance}; dropped oldest mirrored alarm {Ref}", cap, _source.CanonicalName, _instanceName, sourceRef); } } /// Builds and tells the parent an enriched for a condition. private void Emit(NativeAlarmTransition t, AlarmConditionState condition) { var change = new AlarmStateChanged( _instanceName, t.SourceReference, condition.Active ? AlarmState.Active : AlarmState.Normal, condition.Severity, t.TransitionTime) { Kind = _nativeKind, Condition = condition, SourceReference = t.SourceReference, AlarmTypeName = t.AlarmTypeName, Category = t.Category, Message = t.Message, OperatorUser = t.OperatorUser, OperatorComment = t.OperatorComment, OriginalRaiseTime = t.OriginalRaiseTime, CurrentValue = t.CurrentValue, LimitValue = t.LimitValue }; _instanceActor.Tell(change); } private void PersistUpsert(NativeAlarmTransition t) { var json = JsonSerializer.Serialize(t.Condition); _storage.UpsertNativeAlarmAsync(_instanceName, _source.CanonicalName, t.SourceReference, json, t.TransitionTime) .ContinueWith( task => _logger.LogWarning(task.Exception, "Failed to persist native alarm {Ref} on {Instance}", t.SourceReference, _instanceName), TaskContinuationOptions.OnlyOnFaulted); } private void PersistDelete(string sourceReference) { _storage.DeleteNativeAlarmAsync(_instanceName, _source.CanonicalName, sourceReference) .ContinueWith( task => _logger.LogWarning(task.Exception, "Failed to delete native alarm {Ref} on {Instance}", sourceReference, _instanceName), TaskContinuationOptions.OnlyOnFaulted); } // ── Internal messages ── private sealed record RehydrationCompleted(IReadOnlyList Rows); private sealed class RetrySubscribe { public static readonly RetrySubscribe Instance = new(); private RetrySubscribe() { } } }