using Akka.Actor; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Messages.Artifacts; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.InboundApi; using ScadaLink.Commons.Messages.Lifecycle; using ScadaLink.Commons.Messages.ScriptExecution; using ScadaLink.Commons.Types.Enums; using ScadaLink.HealthMonitoring; using ScadaLink.SiteRuntime.Messages; using ScadaLink.SiteRuntime.Persistence; using ScadaLink.SiteRuntime.Scripts; using ScadaLink.SiteRuntime.Streaming; namespace ScadaLink.SiteRuntime.Actors; /// /// Site-side Deployment Manager — runs as a cluster singleton within the site cluster. /// On startup, reads all deployed configs from SQLite and creates Instance Actors /// for enabled instances in staggered batches. /// /// Handles: DeployInstance, DisableInstance, EnableInstance, DeleteInstance. /// /// Supervision strategy: OneForOneStrategy with Resume for Instance Actors /// so that a single instance failure does not cascade to siblings. /// public class DeploymentManagerActor : ReceiveActor, IWithTimers { private readonly SiteStorageService _storage; private readonly ScriptCompilationService _compilationService; private readonly SharedScriptLibrary _sharedScriptLibrary; private readonly SiteStreamManager? _streamManager; private readonly SiteRuntimeOptions _options; private readonly ILogger _logger; private readonly IActorRef? _dclManager; private readonly IActorRef? _replicationActor; private readonly ISiteHealthCollector? _healthCollector; private readonly IServiceProvider? _serviceProvider; private readonly Dictionary _instanceActors = new(); private int _totalDeployedCount; public ITimerScheduler Timers { get; set; } = null!; public DeploymentManagerActor( SiteStorageService storage, ScriptCompilationService compilationService, SharedScriptLibrary sharedScriptLibrary, SiteStreamManager? streamManager, SiteRuntimeOptions options, ILogger logger, IActorRef? dclManager = null, IActorRef? replicationActor = null, ISiteHealthCollector? healthCollector = null, IServiceProvider? serviceProvider = null) { _storage = storage; _compilationService = compilationService; _sharedScriptLibrary = sharedScriptLibrary; _streamManager = streamManager; _options = options; _dclManager = dclManager; _replicationActor = replicationActor; _healthCollector = healthCollector; _serviceProvider = serviceProvider; _logger = logger; // Lifecycle commands Receive(HandleDeploy); Receive(HandleDisable); Receive(HandleEnable); Receive(HandleDelete); // WP-33: Handle system-wide artifact deployment Receive(HandleDeployArtifacts); // Debug View — route to Instance Actors Receive(RouteDebugViewSubscribe); Receive(RouteDebugViewUnsubscribe); Receive(RouteDebugSnapshot); // Inbound API Route.To().Call() — route to Instance Actors Receive(RouteInboundApiCall); // Internal startup messages Receive(HandleStartupConfigsLoaded); Receive(HandleStartNextBatch); // Internal enable result Receive(HandleEnableResult); // Internal deploy persistence result Receive(HandleDeployPersistenceResult); } protected override void PreStart() { base.PreStart(); _healthCollector?.SetActiveNode(true); _logger.LogInformation("DeploymentManagerActor starting — loading deployed configs from SQLite..."); // Load all configs asynchronously and pipe to self _storage.GetAllDeployedConfigsAsync().ContinueWith(t => { if (t.IsCompletedSuccessfully) return new StartupConfigsLoaded(t.Result, null); return new StartupConfigsLoaded([], t.Exception?.GetBaseException().Message); }).PipeTo(Self); } protected override void PostStop() { _healthCollector?.SetActiveNode(false); base.PostStop(); } /// /// OneForOneStrategy: Resume on exceptions to preserve Instance Actor state, /// Stop only on ActorInitializationException (actor failed to start). /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: -1, withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { if (ex is ActorInitializationException) { _logger.LogError(ex, "Instance Actor failed to initialize, stopping"); return Directive.Stop; } _logger.LogWarning(ex, "Instance Actor threw exception, resuming"); return Directive.Resume; })); } /// /// Processes the loaded configs from SQLite and begins staggered Instance Actor creation. /// private void HandleStartupConfigsLoaded(StartupConfigsLoaded msg) { if (msg.Error != null) { _logger.LogError("Failed to load deployed configs: {Error}", msg.Error); return; } // Load and compile shared scripts from SQLite before creating Instance Actors LoadSharedScriptsFromStorage(); var enabledConfigs = msg.Configs.Where(c => c.IsEnabled).ToList(); _totalDeployedCount = msg.Configs.Count; _logger.LogInformation( "Loaded {Total} deployed configs ({Enabled} enabled) from SQLite", msg.Configs.Count, enabledConfigs.Count); UpdateInstanceCounts(); if (enabledConfigs.Count == 0) return; // Start the first batch immediately var batchState = new BatchState(enabledConfigs, 0); Self.Tell(new StartNextBatch(batchState)); } /// /// Creates Instance Actors in batches with a configurable delay between batches /// to prevent reconnection storms on failover. /// private void HandleStartNextBatch(StartNextBatch msg) { var state = msg.State; var batchSize = _options.StartupBatchSize; var startIdx = state.NextIndex; var endIdx = Math.Min(startIdx + batchSize, state.Configs.Count); _logger.LogDebug( "Creating Instance Actors batch [{Start}..{End}) of {Total}", startIdx, endIdx, state.Configs.Count); for (var i = startIdx; i < endIdx; i++) { var config = state.Configs[i]; EnsureDclConnections(config.ConfigJson); CreateInstanceActor(config.InstanceUniqueName, config.ConfigJson); } UpdateInstanceCounts(); // Schedule next batch if there are more, using Timers (IWithTimers) if (endIdx < state.Configs.Count) { var nextState = new BatchState(state.Configs, endIdx); Timers.StartSingleTimer( "startup-batch", new StartNextBatch(nextState), TimeSpan.FromMilliseconds(_options.StartupBatchDelayMs)); } else { _logger.LogInformation( "All {Count} Instance Actors created", state.Configs.Count); } } /// /// Handles a new deployment: stores config in SQLite, clears previous static overrides, /// and creates or replaces the Instance Actor. /// private void HandleDeploy(DeployInstanceCommand command) { var instanceName = command.InstanceUniqueName; _logger.LogInformation( "Deploying instance {Instance}, deploymentId={DeploymentId}", instanceName, command.DeploymentId); // Stop existing actor if present (redeployment replaces) if (_instanceActors.TryGetValue(instanceName, out var existing)) { Context.Stop(existing); _instanceActors.Remove(instanceName); // Wait for the child to be removed from the children collection // by yielding and retrying — Context.Stop is processed before the next message Context.System.Scheduler.ScheduleTellOnce( TimeSpan.FromMilliseconds(500), Self, command, Sender); return; } // Ensure DCL connections exist for any data-sourced attributes EnsureDclConnections(command.FlattenedConfigurationJson); // Create the Instance Actor immediately (no existing actor to replace) CreateInstanceActor(instanceName, command.FlattenedConfigurationJson); _totalDeployedCount++; UpdateInstanceCounts(); // Persist to SQLite and clear static overrides asynchronously var sender = Sender; Task.Run(async () => { await _storage.StoreDeployedConfigAsync( instanceName, command.FlattenedConfigurationJson, command.DeploymentId, command.RevisionHash, isEnabled: true); // Static overrides are reset on redeployment per design decision await _storage.ClearStaticOverridesAsync(instanceName); // Replicate to standby node _replicationActor?.Tell(new ReplicateConfigDeploy( instanceName, command.FlattenedConfigurationJson, command.DeploymentId, command.RevisionHash, true)); return new DeployPersistenceResult(command.DeploymentId, instanceName, true, null, sender); }).ContinueWith(t => { if (t.IsCompletedSuccessfully) return t.Result; return new DeployPersistenceResult( command.DeploymentId, instanceName, false, t.Exception?.GetBaseException().Message, sender); }).PipeTo(Self); // Reply immediately — deployment is applied (actor is running) sender.Tell(new DeploymentStatusResponse( command.DeploymentId, instanceName, DeploymentStatus.Success, null, DateTimeOffset.UtcNow)); } private void HandleDeployPersistenceResult(DeployPersistenceResult result) { if (!result.Success) { _logger.LogError( "Failed to persist deployment {DeploymentId} for {Instance}: {Error}", result.DeploymentId, result.InstanceName, result.Error); } } /// /// Disables an instance: stops the actor and marks as disabled in SQLite. /// private void HandleDisable(DisableInstanceCommand command) { var instanceName = command.InstanceUniqueName; if (_instanceActors.TryGetValue(instanceName, out var actor)) { Context.Stop(actor); _instanceActors.Remove(instanceName); } UpdateInstanceCounts(); var sender = Sender; _storage.SetInstanceEnabledAsync(instanceName, false).ContinueWith(t => { if (t.IsCompletedSuccessfully) _replicationActor?.Tell(new ReplicateConfigSetEnabled(instanceName, false)); return new InstanceLifecycleResponse( command.CommandId, instanceName, t.IsCompletedSuccessfully, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); }).PipeTo(sender); _logger.LogInformation("Instance {Instance} disabled", instanceName); } /// /// Enables an instance: marks as enabled in SQLite and re-creates the Instance Actor /// from the stored config. /// private void HandleEnable(EnableInstanceCommand command) { var instanceName = command.InstanceUniqueName; var sender = Sender; Task.Run(async () => { await _storage.SetInstanceEnabledAsync(instanceName, true); _replicationActor?.Tell(new ReplicateConfigSetEnabled(instanceName, true)); var configs = await _storage.GetAllDeployedConfigsAsync(); var config = configs.FirstOrDefault(c => c.InstanceUniqueName == instanceName); return new EnableResult(command, config, null, sender); }).ContinueWith(t => { if (t.IsCompletedSuccessfully) return t.Result; return new EnableResult(command, null, t.Exception?.GetBaseException().Message, sender); }).PipeTo(Self); } /// /// Processes enable result in the actor context (thread-safe). /// private void HandleEnableResult(EnableResult result) { var instanceName = result.Command.InstanceUniqueName; if (result.Error != null || result.Config == null) { var error = result.Error ?? $"No deployed config found for {instanceName}"; result.OriginalSender.Tell(new InstanceLifecycleResponse( result.Command.CommandId, instanceName, false, error, DateTimeOffset.UtcNow)); return; } if (!_instanceActors.ContainsKey(instanceName)) { CreateInstanceActor(instanceName, result.Config.ConfigJson); } UpdateInstanceCounts(); result.OriginalSender.Tell(new InstanceLifecycleResponse( result.Command.CommandId, instanceName, true, null, DateTimeOffset.UtcNow)); _logger.LogInformation("Instance {Instance} enabled", instanceName); } /// /// Deletes an instance: stops the actor and removes config from SQLite. /// Note: store-and-forward messages are NOT cleared per design decision. /// private void HandleDelete(DeleteInstanceCommand command) { var instanceName = command.InstanceUniqueName; if (_instanceActors.TryGetValue(instanceName, out var actor)) { Context.Stop(actor); _instanceActors.Remove(instanceName); } _totalDeployedCount = Math.Max(0, _totalDeployedCount - 1); UpdateInstanceCounts(); var sender = Sender; _storage.RemoveDeployedConfigAsync(instanceName).ContinueWith(t => { if (t.IsCompletedSuccessfully) _replicationActor?.Tell(new ReplicateConfigRemove(instanceName)); return new InstanceLifecycleResponse( command.CommandId, instanceName, t.IsCompletedSuccessfully, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); }).PipeTo(sender); _logger.LogInformation("Instance {Instance} deleted", instanceName); } // ── DCL connection management ── private readonly HashSet _createdConnections = new(); /// /// Parses the flattened config to find bound data connections and ensures /// the DCL Manager has corresponding connection actors created. /// private void EnsureDclConnections(string configJson) { if (_dclManager == null) return; try { var config = System.Text.Json.JsonSerializer.Deserialize(configJson); if (config?.Connections == null) return; foreach (var (name, connConfig) in config.Connections) { if (_createdConnections.Contains(name)) continue; var primaryDetails = new Dictionary(); if (!string.IsNullOrEmpty(connConfig.ConfigurationJson)) { try { // Parse as JsonElement to handle mixed value types (string, int, bool) using var doc = System.Text.Json.JsonDocument.Parse(connConfig.ConfigurationJson); foreach (var prop in doc.RootElement.EnumerateObject()) { primaryDetails[prop.Name] = prop.Value.ToString(); } } catch { /* Ignore parse errors */ } } Dictionary? backupDetails = null; if (!string.IsNullOrEmpty(connConfig.BackupConfigurationJson)) { try { backupDetails = new Dictionary(); using var doc = System.Text.Json.JsonDocument.Parse(connConfig.BackupConfigurationJson); foreach (var prop in doc.RootElement.EnumerateObject()) { backupDetails[prop.Name] = prop.Value.ToString(); } } catch { backupDetails = null; /* Ignore parse errors */ } } _dclManager.Tell(new Commons.Messages.DataConnection.CreateConnectionCommand( name, connConfig.Protocol, primaryDetails, backupDetails, connConfig.FailoverRetryCount)); _createdConnections.Add(name); _logger.LogInformation( "Created DCL connection {Connection} (protocol={Protocol})", name, connConfig.Protocol); } } catch (Exception ex) { _logger.LogWarning(ex, "Failed to parse flattened config for DCL connections"); } } // ── Shared Script Loading ── private void LoadSharedScriptsFromStorage() { try { var scripts = _storage.GetAllSharedScriptsAsync().GetAwaiter().GetResult(); var compiled = 0; foreach (var script in scripts) { if (_sharedScriptLibrary.CompileAndRegister(script.Name, script.Code)) compiled++; } _logger.LogInformation( "Loaded {Compiled}/{Total} shared scripts from SQLite", compiled, scripts.Count); } catch (Exception ex) { _logger.LogError(ex, "Failed to load shared scripts from SQLite"); } } // ── Debug View routing ── private void RouteDebugViewSubscribe(SubscribeDebugViewRequest request) { if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor)) { instanceActor.Forward(request); } else { _logger.LogWarning( "Debug view subscribe for unknown instance {Instance}", request.InstanceUniqueName); Sender.Tell(new DebugViewSnapshot( request.InstanceUniqueName, Array.Empty(), Array.Empty(), DateTimeOffset.UtcNow)); } } private void RouteDebugViewUnsubscribe(UnsubscribeDebugViewRequest request) { if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor)) { instanceActor.Forward(request); } } private void RouteDebugSnapshot(DebugSnapshotRequest request) { if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor)) { instanceActor.Forward(request); } else { _logger.LogWarning( "Debug snapshot for unknown instance {Instance}", request.InstanceUniqueName); Sender.Tell(new DebugViewSnapshot( request.InstanceUniqueName, Array.Empty(), Array.Empty(), DateTimeOffset.UtcNow)); } } // ── Inbound API routing ── private void RouteInboundApiCall(RouteToCallRequest request) { if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor)) { // Convert to ScriptCallRequest and Ask the Instance Actor var scriptCall = new ScriptCallRequest( request.ScriptName, request.Parameters, 0, request.CorrelationId); var sender = Sender; instanceActor.Ask(scriptCall, TimeSpan.FromSeconds(30)) .ContinueWith(t => { if (t.IsCompletedSuccessfully) { var result = t.Result; return new RouteToCallResponse( request.CorrelationId, result.Success, result.ReturnValue, result.ErrorMessage, DateTimeOffset.UtcNow); } return new RouteToCallResponse( request.CorrelationId, false, null, t.Exception?.GetBaseException().Message ?? "Script call timed out", DateTimeOffset.UtcNow); }).PipeTo(sender); } else { Sender.Tell(new RouteToCallResponse( request.CorrelationId, false, null, $"Instance '{request.InstanceUniqueName}' not found on this site.", DateTimeOffset.UtcNow)); } } /// /// WP-33: Handles system-wide artifact deployment (shared scripts, external systems, etc.). /// Persists artifacts to SiteStorageService and recompiles shared scripts. /// private void HandleDeployArtifacts(DeployArtifactsCommand command) { _logger.LogInformation( "Deploying system artifacts, deploymentId={DeploymentId}", command.DeploymentId); var sender = Sender; Task.Run(async () => { try { // WP-33: Store shared scripts and recompile if (command.SharedScripts != null) { foreach (var script in command.SharedScripts) { await _storage.StoreSharedScriptAsync(script.Name, script.Code, script.ParameterDefinitions, script.ReturnDefinition); // WP-33: Shared scripts recompiled on update _sharedScriptLibrary.CompileAndRegister(script.Name, script.Code); } } // WP-33: Store external system definitions if (command.ExternalSystems != null) { foreach (var es in command.ExternalSystems) { await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl, es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson); } } // WP-33: Store database connection definitions if (command.DatabaseConnections != null) { foreach (var db in command.DatabaseConnections) { await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString, db.MaxRetries, db.RetryDelay); } } // WP-33: Store notification lists if (command.NotificationLists != null) { foreach (var nl in command.NotificationLists) { await _storage.StoreNotificationListAsync(nl.Name, nl.RecipientEmails); } } // Store data connection definitions (OPC UA endpoints, etc.) if (command.DataConnections != null) { foreach (var dc in command.DataConnections) { await _storage.StoreDataConnectionDefinitionAsync( dc.Name, dc.Protocol, dc.PrimaryConfigurationJson, dc.BackupConfigurationJson, dc.FailoverRetryCount); } } // Store SMTP configurations if (command.SmtpConfigurations != null) { foreach (var smtp in command.SmtpConfigurations) { await _storage.StoreSmtpConfigurationAsync( smtp.Name, smtp.Server, smtp.Port, smtp.AuthMode, smtp.FromAddress, smtp.Username, smtp.Password, smtp.OAuthConfig); } } // Replicate artifacts to standby node _replicationActor?.Tell(new ReplicateArtifacts(command)); return new ArtifactDeploymentResponse( command.DeploymentId, "", true, null, DateTimeOffset.UtcNow); } catch (Exception ex) { return new ArtifactDeploymentResponse( command.DeploymentId, "", false, ex.Message, DateTimeOffset.UtcNow); } }).PipeTo(sender); } /// /// Creates a child Instance Actor with the given name and configuration JSON. /// internal void CreateInstanceActor(string instanceName, string configJson) { if (_instanceActors.ContainsKey(instanceName)) { _logger.LogWarning("Instance Actor {Instance} already exists, skipping creation", instanceName); return; } var loggerFactory = new LoggerFactory(); var props = Props.Create(() => new InstanceActor( instanceName, configJson, _storage, _compilationService, _sharedScriptLibrary, _streamManager, _options, loggerFactory.CreateLogger(), _dclManager, _healthCollector, _serviceProvider)); var actorRef = Context.ActorOf(props, instanceName); _instanceActors[instanceName] = actorRef; _logger.LogDebug("Created Instance Actor for {Instance}", instanceName); } /// /// Gets the count of active Instance Actors (for testing/diagnostics). /// internal int InstanceActorCount => _instanceActors.Count; /// /// Updates the health collector with current instance counts. /// Total deployed = _totalDeployedCount, enabled = running actors, disabled = difference. /// private void UpdateInstanceCounts() { _healthCollector?.SetInstanceCounts( deployed: _totalDeployedCount, enabled: _instanceActors.Count, disabled: _totalDeployedCount - _instanceActors.Count); } // ── Internal messages ── internal record StartupConfigsLoaded(List Configs, string? Error); internal record StartNextBatch(BatchState State); internal record BatchState(List Configs, int NextIndex); internal record EnableResult( EnableInstanceCommand Command, DeployedInstance? Config, string? Error, IActorRef OriginalSender); internal record DeployPersistenceResult( string DeploymentId, string InstanceName, bool Success, string? Error, IActorRef OriginalSender); }