using Akka.Actor; using Microsoft.CodeAnalysis.Scripting; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Instance; using ScadaLink.Commons.Messages.ScriptExecution; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Flattening; using ScadaLink.HealthMonitoring; using ScadaLink.SiteRuntime.Persistence; using ScadaLink.SiteRuntime.Scripts; using ScadaLink.SiteRuntime.Streaming; using System.Text.Json; namespace ScadaLink.SiteRuntime.Actors; /// /// Represents a single deployed instance at runtime. Holds the in-memory attribute state /// (loaded from FlattenedConfiguration + static overrides from SQLite). /// /// The Instance Actor is the single source of truth for runtime instance state. /// WP-24: All state mutations are serialized through the actor mailbox. /// Multiple Script Execution Actors run concurrently; state mutations through this actor. /// /// WP-15/16: Creates child Script Actors and Alarm Actors on startup. /// WP-22: Tell for tag value updates, attribute notifications, stream publishing. /// Ask for CallScript, debug snapshot. /// WP-25: Debug view backend — snapshot + stream subscription. /// public class InstanceActor : ReceiveActor { private readonly string _instanceUniqueName; private readonly SiteStorageService _storage; private readonly ScriptCompilationService _compilationService; private readonly SharedScriptLibrary _sharedScriptLibrary; private readonly SiteStreamManager? _streamManager; private readonly SiteRuntimeOptions _options; private readonly ILogger _logger; private readonly ISiteHealthCollector? _healthCollector; private readonly IServiceProvider? _serviceProvider; private readonly Dictionary _attributes = new(); private readonly Dictionary _attributeQualities = new(); private readonly Dictionary _attributeTimestamps = new(); private readonly Dictionary _alarmStates = new(); private readonly Dictionary _alarmTimestamps = new(); private readonly Dictionary _alarmPriorities = new(); private readonly Dictionary _scriptActors = new(); private readonly Dictionary _alarmActors = new(); private FlattenedConfiguration? _configuration; // DCL manager actor reference for subscribing to tag values private readonly IActorRef? _dclManager; // Maps each tag path to every attribute canonical name that references it. // A tag path can back more than one attribute (e.g. two composed modules // whose members reference the same PLC node), so a tag value update must // fan out to all of them — not just the last one registered. private readonly Dictionary> _tagPathToAttributes = new(); public InstanceActor( string instanceUniqueName, string configJson, SiteStorageService storage, ScriptCompilationService compilationService, SharedScriptLibrary sharedScriptLibrary, SiteStreamManager? streamManager, SiteRuntimeOptions options, ILogger logger, IActorRef? dclManager = null, ISiteHealthCollector? healthCollector = null, IServiceProvider? serviceProvider = null) { _instanceUniqueName = instanceUniqueName; _storage = storage; _compilationService = compilationService; _sharedScriptLibrary = sharedScriptLibrary; _streamManager = streamManager; _options = options; _logger = logger; _dclManager = dclManager; _healthCollector = healthCollector; _serviceProvider = serviceProvider; // Deserialize the flattened configuration _configuration = JsonSerializer.Deserialize(configJson); // Load default attribute values from the flattened configuration // Data-sourced attributes start with Uncertain quality until the first DCL value arrives. // Static attributes start with Good quality. if (_configuration != null) { foreach (var attr in _configuration.Attributes) { _attributes[attr.CanonicalName] = attr.Value; _attributeQualities[attr.CanonicalName] = string.IsNullOrEmpty(attr.DataSourceReference) ? "Good" : "Uncertain"; } } // Handle attribute queries (Tell pattern -- sender gets response) Receive(HandleGetAttribute); // Handle static attribute writes Receive(HandleSetStaticAttribute); // SiteRuntime-019: the disable/enable lifecycle is owned entirely by the // Deployment Manager — DeploymentManagerActor.HandleDisable/HandleEnable // stop or re-create the Instance Actor directly and reply to the caller. // DisableInstanceCommand / EnableInstanceCommand are never routed to the // Instance Actor, so no handlers are registered here. (The previous no-op // handlers were dead code that implied a non-existent instance-side // acknowledgement contract.) // WP-15: Handle script call requests — route to appropriate Script Actor (Ask pattern) Receive(HandleScriptCallRequest); // WP-22/23: Handle attribute value changes from DCL (Tell pattern) Receive(HandleAttributeValueChanged); // Handle tag value updates from DCL — convert to AttributeValueChanged Receive(HandleTagValueUpdate); Receive(_ => { }); // Ack from DCL subscribe — no action needed Receive(HandleConnectionQualityChanged); // WP-16: Handle alarm state changes from Alarm Actors (Tell pattern) Receive(HandleAlarmStateChanged); // WP-25: Debug view subscribe/unsubscribe (Ask pattern for snapshot) Receive(HandleSubscribeDebugView); Receive(HandleUnsubscribeDebugView); // Debug snapshot (one-shot, no subscription) Receive(HandleDebugSnapshot); // Handle internal messages Receive(HandleOverridesLoaded); } protected override void PreStart() { base.PreStart(); _logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName); // Asynchronously load static overrides from SQLite and pipe to self var self = Self; _storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t => { if (t.IsCompletedSuccessfully) return new LoadOverridesResult(t.Result, null); return new LoadOverridesResult(new Dictionary(), t.Exception?.GetBaseException().Message); }).PipeTo(self); // Create child Script Actors and Alarm Actors from configuration CreateChildActors(); // Subscribe to DCL for data-sourced attributes SubscribeToDcl(); } /// /// Supervision: Resume for child coordinator actors (Script/Alarm Actors preserve state). /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: -1, withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { _logger.LogWarning(ex, "Child actor on instance {Instance} threw exception, resuming", _instanceUniqueName); return Directive.Resume; })); } /// /// Returns the current attribute value. Uses Tell pattern; sender gets the response. /// private void HandleGetAttribute(GetAttributeRequest request) { var found = _attributes.TryGetValue(request.AttributeName, out var value); _attributeQualities.TryGetValue(request.AttributeName, out var quality); Sender.Tell(new GetAttributeResponse( request.CorrelationId, _instanceUniqueName, request.AttributeName, value, found, quality ?? "Good", DateTimeOffset.UtcNow)); } /// /// Handles an attribute write (Instance.SetAttribute / Inbound API). /// WP-24: State mutation serialized through this actor's mailbox. /// /// The write is routed by the attribute's data binding: /// * Data-sourced attribute → forwards a to the /// DCL, which writes the physical device. The in-memory value is NOT /// optimistically updated and NO static override is persisted — the /// confirmed device value arrives later via the subscription. Success or /// failure of the device write is returned to the caller. /// * Static attribute → updates the in-memory value and persists the override /// to SQLite. /// /// Either way the caller receives a . /// private void HandleSetStaticAttribute(SetStaticAttributeCommand command) { // Resolve the target attribute's data binding from the flattened config. var resolved = _configuration?.Attributes .FirstOrDefault(a => a.CanonicalName == command.AttributeName); var isDataSourced = resolved != null && !string.IsNullOrEmpty(resolved.DataSourceReference) && !string.IsNullOrEmpty(resolved.BoundDataConnectionName); if (isDataSourced) { HandleSetDataAttribute(command, resolved!); return; } HandleSetStaticAttributeCore(command); } /// /// Static attribute write: updates in-memory state, publishes the change, /// persists the override to SQLite, and replies with success. /// private void HandleSetStaticAttributeCore(SetStaticAttributeCommand command) { _attributes[command.AttributeName] = command.Value; // Publish attribute change to stream (WP-23) and notify children var changed = new AttributeValueChanged( _instanceUniqueName, command.AttributeName, command.AttributeName, command.Value, "Good", DateTimeOffset.UtcNow); PublishAndNotifyChildren(changed); // Persist asynchronously -- fire and forget since the actor is the source of truth. var instanceName = _instanceUniqueName; var attributeName = command.AttributeName; var logger = _logger; _storage.SetStaticOverrideAsync(_instanceUniqueName, command.AttributeName, command.Value) .ContinueWith(t => { logger.LogWarning( t.Exception?.GetBaseException(), "Failed to persist static override for {Instance}.{Attribute}; in-memory state is authoritative", instanceName, attributeName); }, TaskContinuationOptions.OnlyOnFaulted); Sender.Tell(new SetStaticAttributeResponse( command.CorrelationId, _instanceUniqueName, command.AttributeName, true, null, DateTimeOffset.UtcNow)); } /// /// Data-sourced attribute write: forwards a write request to the DCL and pipes /// the device write result back to the caller. The in-memory value is left /// untouched (it is refreshed by the subscription when the device confirms); /// no static override is persisted for a data-sourced attribute. /// private void HandleSetDataAttribute(SetStaticAttributeCommand command, ResolvedAttribute resolved) { var caller = Sender; var correlationId = command.CorrelationId; var attributeName = command.AttributeName; var instanceName = _instanceUniqueName; if (_dclManager == null) { _logger.LogWarning( "SetAttribute on data-sourced attribute {Instance}.{Attribute} cannot be routed — no DCL manager configured", instanceName, attributeName); caller.Tell(new SetStaticAttributeResponse( correlationId, instanceName, attributeName, false, "Data Connection Layer not available for write.", DateTimeOffset.UtcNow)); return; } var writeRequest = new WriteTagRequest( correlationId, resolved.BoundDataConnectionName!, resolved.DataSourceReference!, command.Value, DateTimeOffset.UtcNow); // Ask the DCL and pipe the result back to the original caller. The DCL // returns the failure synchronously so the script can handle it. _dclManager.Ask(writeRequest, TimeSpan.FromSeconds(30)) .ContinueWith(t => { if (t.IsCompletedSuccessfully) return new SetStaticAttributeResponse( correlationId, instanceName, attributeName, t.Result.Success, t.Result.ErrorMessage, DateTimeOffset.UtcNow); return new SetStaticAttributeResponse( correlationId, instanceName, attributeName, false, t.Exception?.GetBaseException().Message ?? "DCL write timed out", DateTimeOffset.UtcNow); }).PipeTo(caller); } /// /// WP-15: Routes script call requests to the appropriate Script Actor. /// Uses Ask pattern (WP-22). /// private void HandleScriptCallRequest(ScriptCallRequest request) { if (_scriptActors.TryGetValue(request.ScriptName, out var scriptActor)) { // Forward the request to the Script Actor, preserving the original // sender. The whole record is forwarded unchanged, so any // ParentExecutionId (Audit Log #23) set by an inbound-API-routed // call is carried through to the Script Actor verbatim. scriptActor.Forward(request); } else { Sender.Tell(new ScriptCallResult( request.CorrelationId, false, null, $"Script '{request.ScriptName}' not found on instance '{_instanceUniqueName}'.")); } } /// /// WP-22/23: Handles attribute value changes from DCL or static writes. /// Updates in-memory state, publishes to stream, and notifies children. /// private void HandleAttributeValueChanged(AttributeValueChanged changed) { // WP-24: State mutation serialized through this actor _attributes[changed.AttributeName] = changed.Value; _attributeQualities[changed.AttributeName] = changed.Quality; _attributeTimestamps[changed.AttributeName] = changed.Timestamp; PublishAndNotifyChildren(changed); } /// /// Handles tag value updates from DCL. Maps the tag path back to the attribute /// canonical name and converts to an AttributeValueChanged for unified processing. /// private void HandleTagValueUpdate(TagValueUpdate update) { if (!_tagPathToAttributes.TryGetValue(update.TagPath, out var attrNames)) return; // Normalize array values to JSON strings so they survive Akka serialization var value = update.Value is Array ? System.Text.Json.JsonSerializer.Serialize(update.Value, update.Value.GetType()) : update.Value; // One tag path may back several attributes — update every one of them. foreach (var attrName in attrNames) { var changed = new AttributeValueChanged( _instanceUniqueName, update.TagPath, attrName, value, update.Quality.ToString(), update.Timestamp); HandleAttributeValueChanged(changed); } } private void HandleConnectionQualityChanged(ConnectionQualityChanged qualityChanged) { _logger.LogWarning("Connection {Connection} quality changed to {Quality} for instance {Instance}", qualityChanged.ConnectionName, qualityChanged.Quality, _instanceUniqueName); if (_configuration == null) return; // Mark all attributes bound to this connection with the new quality // and publish to the site stream so the debug view updates in real-time. // We intentionally do NOT notify script/alarm actors here — the value // hasn't changed, only the quality, and firing scripts/alarms would // cause spurious evaluations. var qualityStr = qualityChanged.Quality.ToString(); foreach (var attr in _configuration.Attributes) { if (attr.BoundDataConnectionName == qualityChanged.ConnectionName && !string.IsNullOrEmpty(attr.DataSourceReference)) { _attributeQualities[attr.CanonicalName] = qualityStr; _attributeTimestamps[attr.CanonicalName] = qualityChanged.Timestamp; // Publish quality change to stream (current value, new quality) _attributes.TryGetValue(attr.CanonicalName, out var currentValue); _streamManager?.PublishAttributeValueChanged(new AttributeValueChanged( _instanceUniqueName, attr.DataSourceReference, attr.CanonicalName, currentValue, qualityStr, qualityChanged.Timestamp)); } } } /// /// Subscribes to DCL for all data-sourced attributes. Groups tag paths by connection /// name and sends SubscribeTagsRequest to the DCL manager. /// private void SubscribeToDcl() { if (_dclManager == null || _configuration == null) return; // Group attributes by their bound connection name var byConnection = new Dictionary>(); foreach (var attr in _configuration.Attributes) { if (string.IsNullOrEmpty(attr.DataSourceReference) || string.IsNullOrEmpty(attr.BoundDataConnectionName)) continue; // Record every attribute that references this tag path so a single // tag value update fans out to all of them. if (!_tagPathToAttributes.TryGetValue(attr.DataSourceReference, out var attrs)) { attrs = new List(); _tagPathToAttributes[attr.DataSourceReference] = attrs; } attrs.Add(attr.CanonicalName); if (!byConnection.TryGetValue(attr.BoundDataConnectionName, out var connTags)) { connTags = new List(); byConnection[attr.BoundDataConnectionName] = connTags; } // Subscribe each distinct tag path once per connection — a tag shared // by several attributes still needs only one DCL subscription. if (!connTags.Contains(attr.DataSourceReference)) connTags.Add(attr.DataSourceReference); } // Send subscription requests to DCL for each connection foreach (var (connectionName, tagPaths) in byConnection) { var request = new SubscribeTagsRequest( Guid.NewGuid().ToString("N"), _instanceUniqueName, connectionName, tagPaths, DateTimeOffset.UtcNow); _dclManager.Tell(request, Self); _logger.LogInformation( "Instance {Instance} subscribed to {Count} tags on connection {Connection}", _instanceUniqueName, tagPaths.Count, connectionName); } } /// /// WP-16: Handles alarm state changes from Alarm Actors. /// Updates in-memory alarm state and publishes to stream. /// private void HandleAlarmStateChanged(AlarmStateChanged changed) { _alarmStates[changed.AlarmName] = changed.State; _alarmTimestamps[changed.AlarmName] = changed.Timestamp; // WP-23: Publish to site-wide stream _streamManager?.PublishAlarmStateChanged(changed); } /// /// WP-25: Debug view subscribe — returns snapshot and begins streaming. /// private void HandleSubscribeDebugView(SubscribeDebugViewRequest request) { // Build snapshot from current state var now = DateTimeOffset.UtcNow; var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( _instanceUniqueName, kvp.Key, kvp.Key, kvp.Value, _attributeQualities.GetValueOrDefault(kvp.Key, "Good"), _attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList(); var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged( _instanceUniqueName, name, _alarmStates.GetValueOrDefault(name, AlarmState.Normal), _alarmPriorities.GetValueOrDefault(name, 0), _alarmTimestamps[name])).ToList(); var snapshot = new DebugViewSnapshot( _instanceUniqueName, attributeValues, alarmStates, DateTimeOffset.UtcNow); Sender.Tell(snapshot); _logger.LogDebug( "Debug view snapshot sent for {Instance}, correlationId={Id}", _instanceUniqueName, request.CorrelationId); } /// /// WP-25: Debug view unsubscribe (SiteRuntime-013). /// This handler is a deliberate no-op acknowledgement: the Instance Actor holds /// no per-subscriber state. The real debug-stream subscription lifecycle lives in /// /// (Subscribe / Unsubscribe / RemoveSubscriber); the gRPC stream is torn down /// there when the central side cancels the call. Nothing is removed here. /// private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request) { // No subscription state in the Instance Actor — see the XML doc above. _logger.LogDebug( "Debug view unsubscribe for {Instance}, correlationId={Id} " + "(no-op; subscription teardown handled by SiteStreamManager)", _instanceUniqueName, request.CorrelationId); } /// /// One-shot debug snapshot — returns current state without registering a subscriber. /// private void HandleDebugSnapshot(DebugSnapshotRequest request) { var now = DateTimeOffset.UtcNow; var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( _instanceUniqueName, kvp.Key, kvp.Key, kvp.Value, _attributeQualities.GetValueOrDefault(kvp.Key, "Good"), _attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList(); var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged( _instanceUniqueName, name, _alarmStates.GetValueOrDefault(name, AlarmState.Normal), _alarmPriorities.GetValueOrDefault(name, 0), _alarmTimestamps[name])).ToList(); var snapshot = new DebugViewSnapshot( _instanceUniqueName, attributeValues, alarmStates, DateTimeOffset.UtcNow); Sender.Tell(snapshot); } /// /// Publishes attribute change to stream and notifies child Script/Alarm actors. /// WP-22: Tell for attribute notifications (fire-and-forget, never blocks). /// private void PublishAndNotifyChildren(AttributeValueChanged changed) { // WP-23: Publish to site-wide stream _streamManager?.PublishAttributeValueChanged(changed); // Notify Script Actors (for value-change and conditional triggers) foreach (var scriptActor in _scriptActors.Values) { scriptActor.Tell(changed); } // Notify Alarm Actors (for alarm evaluation) foreach (var alarmActor in _alarmActors.Values) { alarmActor.Tell(changed); } } /// /// Applies static overrides loaded from SQLite on top of default values. /// private void HandleOverridesLoaded(LoadOverridesResult result) { if (result.Error != null) { _logger.LogWarning( "Failed to load static overrides for {Instance}: {Error}", _instanceUniqueName, result.Error); return; } foreach (var kvp in result.Overrides) { _attributes[kvp.Key] = kvp.Value; } _logger.LogDebug( "Loaded {Count} static overrides for {Instance}", result.Overrides.Count, _instanceUniqueName); } /// /// Creates child Script Actors and Alarm Actors from the flattened configuration. /// WP-15: Script Actors spawned per script definition. /// WP-16: Alarm Actors spawned per alarm definition, as peers to Script Actors. /// WP-32: Compilation errors reject entire instance deployment (logged but actor still starts). /// /// SiteRuntime-017: each child is seeded from a private point-in-time snapshot /// of _attributes, NOT the live dictionary. The snapshot is taken here on /// the Instance Actor thread, so it is race-free; handing the live mutable /// by reference /// would let a child constructor enumerate it on the child's mailbox thread while /// this actor mutates it in HandleAttributeValueChanged. /// private void CreateChildActors() { if (_configuration == null) return; // SiteRuntime-017: snapshot the live attribute dictionary once, on the // Instance Actor thread, before any child is constructed. Each child // Props closure captures this immutable copy instead of the mutable // _attributes field, so no child constructor ever enumerates a // dictionary this actor is concurrently mutating. var attributeSnapshot = new Dictionary(_attributes); // Create Script Actors foreach (var script in _configuration.Scripts) { var compilationResult = _compilationService.Compile(script.CanonicalName, script.Code); if (!compilationResult.IsSuccess) { _logger.LogError( "Script '{Script}' on instance '{Instance}' failed to compile: {Errors}", script.CanonicalName, _instanceUniqueName, string.Join("; ", compilationResult.Errors)); continue; } // Compile the trigger expression for Expression-triggered scripts. var triggerExpression = CompileTriggerExpression( script.TriggerType, script.TriggerConfiguration, $"script-trigger-{script.CanonicalName}"); var props = Props.Create(() => new ScriptActor( script.CanonicalName, _instanceUniqueName, Self, compilationResult.CompiledScript, script, _sharedScriptLibrary, _options, _logger, triggerExpression, attributeSnapshot, _healthCollector, _serviceProvider)); var actorRef = Context.ActorOf(props, $"script-{script.CanonicalName}"); _scriptActors[script.CanonicalName] = actorRef; } // Create Alarm Actors foreach (var alarm in _configuration.Alarms) { Script? onTriggerScript = null; // Compile on-trigger script if defined if (!string.IsNullOrEmpty(alarm.OnTriggerScriptCanonicalName)) { var triggerScriptDef = _configuration.Scripts .FirstOrDefault(s => s.CanonicalName == alarm.OnTriggerScriptCanonicalName); if (triggerScriptDef != null) { var result = _compilationService.Compile( $"alarm-trigger-{alarm.CanonicalName}", triggerScriptDef.Code); if (result.IsSuccess) { onTriggerScript = result.CompiledScript; } else { _logger.LogWarning( "Alarm trigger script for {Alarm} on {Instance} failed to compile", alarm.CanonicalName, _instanceUniqueName); } } } // Compile the trigger expression for Expression-triggered alarms. var triggerExpression = CompileTriggerExpression( alarm.TriggerType, alarm.TriggerConfiguration, $"alarm-trigger-expr-{alarm.CanonicalName}"); var props = Props.Create(() => new AlarmActor( alarm.CanonicalName, _instanceUniqueName, Self, alarm, onTriggerScript, _sharedScriptLibrary, _options, _logger, triggerExpression, attributeSnapshot, _healthCollector)); var actorRef = Context.ActorOf(props, $"alarm-{alarm.CanonicalName}"); _alarmActors[alarm.CanonicalName] = actorRef; _alarmPriorities[alarm.CanonicalName] = alarm.PriorityLevel; _alarmTimestamps[alarm.CanonicalName] = DateTimeOffset.UtcNow; } _logger.LogInformation( "Instance {Instance}: created {Scripts} script actors and {Alarms} alarm actors", _instanceUniqueName, _scriptActors.Count, _alarmActors.Count); } /// /// Compiles the boolean trigger expression for an Expression-triggered /// script or alarm. Returns null for non-Expression triggers, a blank /// expression, or a compilation failure (logged) — in which case the /// trigger is inert and the actor still starts. /// private Script? CompileTriggerExpression( string? triggerType, string? triggerConfigJson, string compileName) { if (!string.Equals(triggerType, "Expression", StringComparison.OrdinalIgnoreCase)) return null; var expression = TriggerExpressionGlobals.ExtractExpression(triggerConfigJson); if (expression == null) return null; var result = _compilationService.CompileTriggerExpression(compileName, expression); if (result.IsSuccess) return result.CompiledScript; _logger.LogError( "Trigger expression for {Name} on {Instance} failed to compile: {Errors}", compileName, _instanceUniqueName, string.Join("; ", result.Errors)); return null; } /// /// Read-only access to current attribute count (for testing/diagnostics). /// public int AttributeCount => _attributes.Count; /// /// Read-only access to script actor count (for testing/diagnostics). /// public int ScriptActorCount => _scriptActors.Count; /// /// Read-only access to alarm actor count (for testing/diagnostics). /// public int AlarmActorCount => _alarmActors.Count; /// /// Internal message for async override loading result. /// internal record LoadOverridesResult(Dictionary Overrides, string? Error); }