using Akka.Actor; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Instance; using ScadaLink.Commons.Messages.Lifecycle; using ScadaLink.Commons.Messages.ScriptExecution; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Flattening; using ScadaLink.HealthMonitoring; using ScadaLink.SiteRuntime.Persistence; using ScadaLink.SiteRuntime.Scripts; using ScadaLink.SiteRuntime.Streaming; using System.Text.Json; namespace ScadaLink.SiteRuntime.Actors; /// /// Represents a single deployed instance at runtime. Holds the in-memory attribute state /// (loaded from FlattenedConfiguration + static overrides from SQLite). /// /// The Instance Actor is the single source of truth for runtime instance state. /// WP-24: All state mutations are serialized through the actor mailbox. /// Multiple Script Execution Actors run concurrently; state mutations through this actor. /// /// WP-15/16: Creates child Script Actors and Alarm Actors on startup. /// WP-22: Tell for tag value updates, attribute notifications, stream publishing. /// Ask for CallScript, debug snapshot. /// WP-25: Debug view backend — snapshot + stream subscription. /// public class InstanceActor : ReceiveActor { private readonly string _instanceUniqueName; private readonly SiteStorageService _storage; private readonly ScriptCompilationService _compilationService; private readonly SharedScriptLibrary _sharedScriptLibrary; private readonly SiteStreamManager? _streamManager; private readonly SiteRuntimeOptions _options; private readonly ILogger _logger; private readonly ISiteHealthCollector? _healthCollector; private readonly IServiceProvider? _serviceProvider; private readonly Dictionary _attributes = new(); private readonly Dictionary _attributeQualities = new(); private readonly Dictionary _attributeTimestamps = new(); private readonly Dictionary _alarmStates = new(); private readonly Dictionary _alarmTimestamps = new(); private readonly Dictionary _alarmPriorities = new(); private readonly Dictionary _scriptActors = new(); private readonly Dictionary _alarmActors = new(); private FlattenedConfiguration? _configuration; // DCL manager actor reference for subscribing to tag values private readonly IActorRef? _dclManager; // Maps tag paths back to attribute canonical names for DCL updates private readonly Dictionary _tagPathToAttribute = new(); public InstanceActor( string instanceUniqueName, string configJson, SiteStorageService storage, ScriptCompilationService compilationService, SharedScriptLibrary sharedScriptLibrary, SiteStreamManager? streamManager, SiteRuntimeOptions options, ILogger logger, IActorRef? dclManager = null, ISiteHealthCollector? healthCollector = null, IServiceProvider? serviceProvider = null) { _instanceUniqueName = instanceUniqueName; _storage = storage; _compilationService = compilationService; _sharedScriptLibrary = sharedScriptLibrary; _streamManager = streamManager; _options = options; _logger = logger; _dclManager = dclManager; _healthCollector = healthCollector; _serviceProvider = serviceProvider; // Deserialize the flattened configuration _configuration = JsonSerializer.Deserialize(configJson); // Load default attribute values from the flattened configuration // Data-sourced attributes start with Uncertain quality until the first DCL value arrives. // Static attributes start with Good quality. if (_configuration != null) { foreach (var attr in _configuration.Attributes) { _attributes[attr.CanonicalName] = attr.Value; _attributeQualities[attr.CanonicalName] = string.IsNullOrEmpty(attr.DataSourceReference) ? "Good" : "Uncertain"; } } // Handle attribute queries (Tell pattern -- sender gets response) Receive(HandleGetAttribute); // Handle static attribute writes Receive(HandleSetStaticAttribute); // Handle lifecycle messages Receive(_ => { _logger.LogInformation("Instance {Instance} received disable command", _instanceUniqueName); Sender.Tell(new InstanceLifecycleResponse( _.CommandId, _instanceUniqueName, true, null, DateTimeOffset.UtcNow)); }); Receive(_ => { _logger.LogInformation("Instance {Instance} received enable command", _instanceUniqueName); Sender.Tell(new InstanceLifecycleResponse( _.CommandId, _instanceUniqueName, true, null, DateTimeOffset.UtcNow)); }); // WP-15: Handle script call requests — route to appropriate Script Actor (Ask pattern) Receive(HandleScriptCallRequest); // WP-22/23: Handle attribute value changes from DCL (Tell pattern) Receive(HandleAttributeValueChanged); // Handle tag value updates from DCL — convert to AttributeValueChanged Receive(HandleTagValueUpdate); Receive(_ => { }); // Ack from DCL subscribe — no action needed Receive(HandleConnectionQualityChanged); // WP-16: Handle alarm state changes from Alarm Actors (Tell pattern) Receive(HandleAlarmStateChanged); // WP-25: Debug view subscribe/unsubscribe (Ask pattern for snapshot) Receive(HandleSubscribeDebugView); Receive(HandleUnsubscribeDebugView); // Debug snapshot (one-shot, no subscription) Receive(HandleDebugSnapshot); // Handle internal messages Receive(HandleOverridesLoaded); } protected override void PreStart() { base.PreStart(); _logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName); // Asynchronously load static overrides from SQLite and pipe to self var self = Self; _storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t => { if (t.IsCompletedSuccessfully) return new LoadOverridesResult(t.Result, null); return new LoadOverridesResult(new Dictionary(), t.Exception?.GetBaseException().Message); }).PipeTo(self); // Create child Script Actors and Alarm Actors from configuration CreateChildActors(); // Subscribe to DCL for data-sourced attributes SubscribeToDcl(); } /// /// Supervision: Resume for child coordinator actors (Script/Alarm Actors preserve state). /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: -1, withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { _logger.LogWarning(ex, "Child actor on instance {Instance} threw exception, resuming", _instanceUniqueName); return Directive.Resume; })); } /// /// Returns the current attribute value. Uses Tell pattern; sender gets the response. /// private void HandleGetAttribute(GetAttributeRequest request) { var found = _attributes.TryGetValue(request.AttributeName, out var value); _attributeQualities.TryGetValue(request.AttributeName, out var quality); Sender.Tell(new GetAttributeResponse( request.CorrelationId, _instanceUniqueName, request.AttributeName, value, found, quality ?? "Good", DateTimeOffset.UtcNow)); } /// /// Updates a static attribute in memory and persists the override to SQLite. /// WP-24: State mutation serialized through this actor's mailbox. /// private void HandleSetStaticAttribute(SetStaticAttributeCommand command) { _attributes[command.AttributeName] = command.Value; // Publish attribute change to stream (WP-23) and notify children var changed = new AttributeValueChanged( _instanceUniqueName, command.AttributeName, command.AttributeName, command.Value, "Good", DateTimeOffset.UtcNow); PublishAndNotifyChildren(changed); // Persist asynchronously -- fire and forget since the actor is the source of truth // and SetAttribute is called from scripts via Tell (no response consumer). var instanceName = _instanceUniqueName; var attributeName = command.AttributeName; var logger = _logger; _storage.SetStaticOverrideAsync(_instanceUniqueName, command.AttributeName, command.Value) .ContinueWith(t => { logger.LogWarning( t.Exception?.GetBaseException(), "Failed to persist static override for {Instance}.{Attribute}; in-memory state is authoritative", instanceName, attributeName); }, TaskContinuationOptions.OnlyOnFaulted); } /// /// WP-15: Routes script call requests to the appropriate Script Actor. /// Uses Ask pattern (WP-22). /// private void HandleScriptCallRequest(ScriptCallRequest request) { if (_scriptActors.TryGetValue(request.ScriptName, out var scriptActor)) { // Forward the request to the Script Actor, preserving the original sender scriptActor.Forward(request); } else { Sender.Tell(new ScriptCallResult( request.CorrelationId, false, null, $"Script '{request.ScriptName}' not found on instance '{_instanceUniqueName}'.")); } } /// /// WP-22/23: Handles attribute value changes from DCL or static writes. /// Updates in-memory state, publishes to stream, and notifies children. /// private void HandleAttributeValueChanged(AttributeValueChanged changed) { // WP-24: State mutation serialized through this actor _attributes[changed.AttributeName] = changed.Value; _attributeQualities[changed.AttributeName] = changed.Quality; _attributeTimestamps[changed.AttributeName] = changed.Timestamp; PublishAndNotifyChildren(changed); } /// /// Handles tag value updates from DCL. Maps the tag path back to the attribute /// canonical name and converts to an AttributeValueChanged for unified processing. /// private void HandleTagValueUpdate(TagValueUpdate update) { if (_tagPathToAttribute.TryGetValue(update.TagPath, out var attrName)) { // Normalize array values to JSON strings so they survive Akka serialization var value = update.Value is Array ? System.Text.Json.JsonSerializer.Serialize(update.Value, update.Value.GetType()) : update.Value; var changed = new AttributeValueChanged( _instanceUniqueName, update.TagPath, attrName, value, update.Quality.ToString(), update.Timestamp); HandleAttributeValueChanged(changed); } } private void HandleConnectionQualityChanged(ConnectionQualityChanged qualityChanged) { _logger.LogWarning("Connection {Connection} quality changed to {Quality} for instance {Instance}", qualityChanged.ConnectionName, qualityChanged.Quality, _instanceUniqueName); if (_configuration == null) return; // Mark all attributes bound to this connection with the new quality // and publish to the site stream so the debug view updates in real-time. // We intentionally do NOT notify script/alarm actors here — the value // hasn't changed, only the quality, and firing scripts/alarms would // cause spurious evaluations. var qualityStr = qualityChanged.Quality.ToString(); foreach (var attr in _configuration.Attributes) { if (attr.BoundDataConnectionName == qualityChanged.ConnectionName && !string.IsNullOrEmpty(attr.DataSourceReference)) { _attributeQualities[attr.CanonicalName] = qualityStr; _attributeTimestamps[attr.CanonicalName] = qualityChanged.Timestamp; // Publish quality change to stream (current value, new quality) _attributes.TryGetValue(attr.CanonicalName, out var currentValue); _streamManager?.PublishAttributeValueChanged(new AttributeValueChanged( _instanceUniqueName, attr.DataSourceReference, attr.CanonicalName, currentValue, qualityStr, qualityChanged.Timestamp)); } } } /// /// Subscribes to DCL for all data-sourced attributes. Groups tag paths by connection /// name and sends SubscribeTagsRequest to the DCL manager. /// private void SubscribeToDcl() { if (_dclManager == null || _configuration == null) return; // Group attributes by their bound connection name var byConnection = new Dictionary>(); foreach (var attr in _configuration.Attributes) { if (string.IsNullOrEmpty(attr.DataSourceReference) || string.IsNullOrEmpty(attr.BoundDataConnectionName)) continue; _tagPathToAttribute[attr.DataSourceReference] = attr.CanonicalName; if (!byConnection.ContainsKey(attr.BoundDataConnectionName)) byConnection[attr.BoundDataConnectionName] = new List(); byConnection[attr.BoundDataConnectionName].Add(attr.DataSourceReference); } // Send subscription requests to DCL for each connection foreach (var (connectionName, tagPaths) in byConnection) { var request = new SubscribeTagsRequest( Guid.NewGuid().ToString("N"), _instanceUniqueName, connectionName, tagPaths, DateTimeOffset.UtcNow); _dclManager.Tell(request, Self); _logger.LogInformation( "Instance {Instance} subscribed to {Count} tags on connection {Connection}", _instanceUniqueName, tagPaths.Count, connectionName); } } /// /// WP-16: Handles alarm state changes from Alarm Actors. /// Updates in-memory alarm state and publishes to stream. /// private void HandleAlarmStateChanged(AlarmStateChanged changed) { _alarmStates[changed.AlarmName] = changed.State; _alarmTimestamps[changed.AlarmName] = changed.Timestamp; // WP-23: Publish to site-wide stream _streamManager?.PublishAlarmStateChanged(changed); } /// /// WP-25: Debug view subscribe — returns snapshot and begins streaming. /// private void HandleSubscribeDebugView(SubscribeDebugViewRequest request) { // Build snapshot from current state var now = DateTimeOffset.UtcNow; var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( _instanceUniqueName, kvp.Key, kvp.Key, kvp.Value, _attributeQualities.GetValueOrDefault(kvp.Key, "Good"), _attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList(); var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged( _instanceUniqueName, name, _alarmStates.GetValueOrDefault(name, AlarmState.Normal), _alarmPriorities.GetValueOrDefault(name, 0), _alarmTimestamps[name])).ToList(); var snapshot = new DebugViewSnapshot( _instanceUniqueName, attributeValues, alarmStates, DateTimeOffset.UtcNow); Sender.Tell(snapshot); _logger.LogDebug( "Debug view snapshot sent for {Instance}, correlationId={Id}", _instanceUniqueName, request.CorrelationId); } /// /// WP-25: Debug view unsubscribe — removes subscription. /// private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request) { _logger.LogDebug( "Debug view unsubscribe for {Instance}, correlationId={Id}", _instanceUniqueName, request.CorrelationId); } /// /// One-shot debug snapshot — returns current state without registering a subscriber. /// private void HandleDebugSnapshot(DebugSnapshotRequest request) { var now = DateTimeOffset.UtcNow; var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( _instanceUniqueName, kvp.Key, kvp.Key, kvp.Value, _attributeQualities.GetValueOrDefault(kvp.Key, "Good"), _attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList(); var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged( _instanceUniqueName, name, _alarmStates.GetValueOrDefault(name, AlarmState.Normal), _alarmPriorities.GetValueOrDefault(name, 0), _alarmTimestamps[name])).ToList(); var snapshot = new DebugViewSnapshot( _instanceUniqueName, attributeValues, alarmStates, DateTimeOffset.UtcNow); Sender.Tell(snapshot); } /// /// Publishes attribute change to stream and notifies child Script/Alarm actors. /// WP-22: Tell for attribute notifications (fire-and-forget, never blocks). /// private void PublishAndNotifyChildren(AttributeValueChanged changed) { // WP-23: Publish to site-wide stream _streamManager?.PublishAttributeValueChanged(changed); // Notify Script Actors (for value-change and conditional triggers) foreach (var scriptActor in _scriptActors.Values) { scriptActor.Tell(changed); } // Notify Alarm Actors (for alarm evaluation) foreach (var alarmActor in _alarmActors.Values) { alarmActor.Tell(changed); } } /// /// Applies static overrides loaded from SQLite on top of default values. /// private void HandleOverridesLoaded(LoadOverridesResult result) { if (result.Error != null) { _logger.LogWarning( "Failed to load static overrides for {Instance}: {Error}", _instanceUniqueName, result.Error); return; } foreach (var kvp in result.Overrides) { _attributes[kvp.Key] = kvp.Value; } _logger.LogDebug( "Loaded {Count} static overrides for {Instance}", result.Overrides.Count, _instanceUniqueName); } /// /// Creates child Script Actors and Alarm Actors from the flattened configuration. /// WP-15: Script Actors spawned per script definition. /// WP-16: Alarm Actors spawned per alarm definition, as peers to Script Actors. /// WP-32: Compilation errors reject entire instance deployment (logged but actor still starts). /// private void CreateChildActors() { if (_configuration == null) return; // Create Script Actors foreach (var script in _configuration.Scripts) { var compilationResult = _compilationService.Compile(script.CanonicalName, script.Code); if (!compilationResult.IsSuccess) { _logger.LogError( "Script '{Script}' on instance '{Instance}' failed to compile: {Errors}", script.CanonicalName, _instanceUniqueName, string.Join("; ", compilationResult.Errors)); continue; } var props = Props.Create(() => new ScriptActor( script.CanonicalName, _instanceUniqueName, Self, compilationResult.CompiledScript, script, _sharedScriptLibrary, _options, _logger, _healthCollector, _serviceProvider)); var actorRef = Context.ActorOf(props, $"script-{script.CanonicalName}"); _scriptActors[script.CanonicalName] = actorRef; } // Create Alarm Actors foreach (var alarm in _configuration.Alarms) { Microsoft.CodeAnalysis.Scripting.Script? onTriggerScript = null; // Compile on-trigger script if defined if (!string.IsNullOrEmpty(alarm.OnTriggerScriptCanonicalName)) { var triggerScriptDef = _configuration.Scripts .FirstOrDefault(s => s.CanonicalName == alarm.OnTriggerScriptCanonicalName); if (triggerScriptDef != null) { var result = _compilationService.Compile( $"alarm-trigger-{alarm.CanonicalName}", triggerScriptDef.Code); if (result.IsSuccess) { onTriggerScript = result.CompiledScript; } else { _logger.LogWarning( "Alarm trigger script for {Alarm} on {Instance} failed to compile", alarm.CanonicalName, _instanceUniqueName); } } } var props = Props.Create(() => new AlarmActor( alarm.CanonicalName, _instanceUniqueName, Self, alarm, onTriggerScript, _sharedScriptLibrary, _options, _logger, _healthCollector)); var actorRef = Context.ActorOf(props, $"alarm-{alarm.CanonicalName}"); _alarmActors[alarm.CanonicalName] = actorRef; _alarmPriorities[alarm.CanonicalName] = alarm.PriorityLevel; _alarmTimestamps[alarm.CanonicalName] = DateTimeOffset.UtcNow; } _logger.LogInformation( "Instance {Instance}: created {Scripts} script actors and {Alarms} alarm actors", _instanceUniqueName, _scriptActors.Count, _alarmActors.Count); } /// /// Read-only access to current attribute count (for testing/diagnostics). /// public int AttributeCount => _attributes.Count; /// /// Read-only access to script actor count (for testing/diagnostics). /// public int ScriptActorCount => _scriptActors.Count; /// /// Read-only access to alarm actor count (for testing/diagnostics). /// public int AlarmActorCount => _alarmActors.Count; /// /// Internal message for async override loading result. /// internal record LoadOverridesResult(Dictionary Overrides, string? Error); }