using Akka.Actor;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.InboundApi;
using ScadaLink.Commons.Messages.Instance;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Messages.ScriptExecution;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.HealthMonitoring;
using ScadaLink.SiteRuntime.Messages;
using ScadaLink.SiteRuntime.Persistence;
using ScadaLink.SiteRuntime.Scripts;
using ScadaLink.SiteRuntime.Streaming;
namespace ScadaLink.SiteRuntime.Actors;
///
/// Site-side Deployment Manager — runs as a cluster singleton within the site cluster.
/// On startup, reads all deployed configs from SQLite and creates Instance Actors
/// for enabled instances in staggered batches.
///
/// Handles: DeployInstance, DisableInstance, EnableInstance, DeleteInstance.
///
/// Supervision strategy: OneForOneStrategy with Resume for Instance Actors
/// so that a single instance failure does not cascade to siblings.
///
public class DeploymentManagerActor : ReceiveActor, IWithTimers
{
private readonly SiteStorageService _storage;
private readonly ScriptCompilationService _compilationService;
private readonly SharedScriptLibrary _sharedScriptLibrary;
private readonly SiteStreamManager? _streamManager;
private readonly SiteRuntimeOptions _options;
private readonly ILogger _logger;
///
/// Shared logger factory used to mint loggers
/// (SiteRuntime-015). Reused across every
/// call rather than newing a per-instance factory that is never disposed.
/// When the host injects its configured factory the Instance Actor logs are
/// routed through the application's logging providers.
///
private readonly ILoggerFactory _loggerFactory;
private readonly IActorRef? _dclManager;
private readonly IActorRef? _replicationActor;
private readonly ISiteHealthCollector? _healthCollector;
private readonly IServiceProvider? _serviceProvider;
private readonly Dictionary _instanceActors = new();
///
/// Tracks Instance Actors that are terminating as part of a redeployment, keyed by
/// the terminating actor ref. The buffered command is applied once
/// confirms the child has fully stopped (SiteRuntime-003).
///
private readonly Dictionary _pendingRedeploys = new();
private int _totalDeployedCount;
public ITimerScheduler Timers { get; set; } = null!;
public DeploymentManagerActor(
SiteStorageService storage,
ScriptCompilationService compilationService,
SharedScriptLibrary sharedScriptLibrary,
SiteStreamManager? streamManager,
SiteRuntimeOptions options,
ILogger logger,
IActorRef? dclManager = null,
IActorRef? replicationActor = null,
ISiteHealthCollector? healthCollector = null,
IServiceProvider? serviceProvider = null,
ILoggerFactory? loggerFactory = null)
{
_storage = storage;
_compilationService = compilationService;
_sharedScriptLibrary = sharedScriptLibrary;
_streamManager = streamManager;
_options = options;
_dclManager = dclManager;
_replicationActor = replicationActor;
_healthCollector = healthCollector;
_serviceProvider = serviceProvider;
_logger = logger;
// SiteRuntime-015: reuse a single logger factory for all Instance Actors.
// Prefer an explicitly injected factory, fall back to one resolved from
// the service provider, and only as a last resort use NullLoggerFactory —
// never a per-instance `new LoggerFactory()` that leaks undisposed.
_loggerFactory = loggerFactory
?? serviceProvider?.GetService(typeof(ILoggerFactory)) as ILoggerFactory
?? Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance;
// Lifecycle commands
Receive(HandleDeploy);
Receive(HandleDisable);
Receive(HandleEnable);
Receive(HandleDelete);
// DeploymentManager-006: query-the-site-before-redeploy idempotency.
// Central asks for the instance's currently-applied deployment identity
// before re-sending a deployment whose prior record is stuck InProgress
// or Failed due to a timeout.
Receive(HandleDeploymentStateQuery);
// WP-33: Handle system-wide artifact deployment
Receive(HandleDeployArtifacts);
// Debug View — route to Instance Actors
Receive(RouteDebugViewSubscribe);
Receive(RouteDebugViewUnsubscribe);
Receive(RouteDebugSnapshot);
// Inbound API Route.To().Call() — route to Instance Actors
Receive(RouteInboundApiCall);
Receive(RouteInboundApiGetAttributes);
Receive(RouteInboundApiSetAttributes);
// Internal startup messages
Receive(HandleStartupConfigsLoaded);
Receive(HandleSharedScriptsLoaded);
Receive(HandleStartNextBatch);
// Internal enable result
Receive(HandleEnableResult);
// Internal deploy persistence result
Receive(HandleDeployPersistenceResult);
// Terminated signal — drains a buffered redeployment once the previous
// Instance Actor has fully stopped (SiteRuntime-003).
Receive(HandleTerminated);
}
protected override void PreStart()
{
base.PreStart();
_healthCollector?.SetActiveNode(true);
_logger.LogInformation("DeploymentManagerActor starting — loading deployed configs from SQLite...");
// Load all configs asynchronously and pipe to self
_storage.GetAllDeployedConfigsAsync().ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new StartupConfigsLoaded(t.Result, null);
return new StartupConfigsLoaded([], t.Exception?.GetBaseException().Message);
}).PipeTo(Self);
}
protected override void PostStop()
{
_healthCollector?.SetActiveNode(false);
base.PostStop();
}
///
/// OneForOneStrategy: Resume on exceptions to preserve Instance Actor state,
/// Stop only on ActorInitializationException (actor failed to start).
///
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1,
withinTimeRange: TimeSpan.FromMinutes(1),
decider: Decider.From(ex =>
{
if (ex is ActorInitializationException)
{
_logger.LogError(ex, "Instance Actor failed to initialize, stopping");
return Directive.Stop;
}
_logger.LogWarning(ex, "Instance Actor threw exception, resuming");
return Directive.Resume;
}));
}
///
/// Processes the loaded configs from SQLite.
///
/// SiteRuntime-008: shared scripts must be compiled before Instance Actors are
/// created, but the SQLite read and Roslyn compilation must not block the
/// singleton's mailbox. The compilation is run on a background task and a
/// message is piped back; only then does
/// staggered Instance Actor creation begin. The deployed configs are stashed on the
/// actor field in the meantime.
///
private void HandleStartupConfigsLoaded(StartupConfigsLoaded msg)
{
if (msg.Error != null)
{
_logger.LogError("Failed to load deployed configs: {Error}", msg.Error);
return;
}
var enabledConfigs = msg.Configs.Where(c => c.IsEnabled).ToList();
_totalDeployedCount = msg.Configs.Count;
_logger.LogInformation(
"Loaded {Total} deployed configs ({Enabled} enabled) from SQLite",
msg.Configs.Count, enabledConfigs.Count);
UpdateInstanceCounts();
// Load and compile shared scripts off the actor thread, then resume startup.
LoadSharedScriptsFromStorage(enabledConfigs);
}
///
/// SiteRuntime-008: once shared scripts have been compiled off-thread, begins
/// staggered Instance Actor creation for the enabled configs captured at startup.
///
private void HandleSharedScriptsLoaded(SharedScriptsLoaded msg)
{
_logger.LogInformation(
"Loaded {Compiled}/{Total} shared scripts from SQLite",
msg.CompiledCount, msg.TotalCount);
if (msg.EnabledConfigs.Count == 0)
return;
// Start the first batch immediately
var batchState = new BatchState(msg.EnabledConfigs, 0);
Self.Tell(new StartNextBatch(batchState));
}
///
/// Creates Instance Actors in batches with a configurable delay between batches
/// to prevent reconnection storms on failover.
///
private void HandleStartNextBatch(StartNextBatch msg)
{
var state = msg.State;
var batchSize = _options.StartupBatchSize;
var startIdx = state.NextIndex;
var endIdx = Math.Min(startIdx + batchSize, state.Configs.Count);
_logger.LogDebug(
"Creating Instance Actors batch [{Start}..{End}) of {Total}",
startIdx, endIdx, state.Configs.Count);
for (var i = startIdx; i < endIdx; i++)
{
var config = state.Configs[i];
EnsureDclConnections(config.ConfigJson);
CreateInstanceActor(config.InstanceUniqueName, config.ConfigJson);
}
UpdateInstanceCounts();
// Schedule next batch if there are more, using Timers (IWithTimers)
if (endIdx < state.Configs.Count)
{
var nextState = new BatchState(state.Configs, endIdx);
Timers.StartSingleTimer(
"startup-batch",
new StartNextBatch(nextState),
TimeSpan.FromMilliseconds(_options.StartupBatchDelayMs));
}
else
{
_logger.LogInformation(
"All {Count} Instance Actors created", state.Configs.Count);
}
}
///
/// Handles a new deployment: stores config in SQLite, clears previous static overrides,
/// and creates or replaces the Instance Actor.
///
/// Redeployment of an already-running instance must wait for the previous Instance
/// Actor to fully terminate (including PostStop on its descendants) before the
/// replacement is created — otherwise can collide on
/// the still-registered child name. Instead of guessing with a fixed timer, the
/// terminating child is watched and the in-flight command is buffered until the
/// signal arrives.
///
private void HandleDeploy(DeployInstanceCommand command)
{
var instanceName = command.InstanceUniqueName;
_logger.LogInformation(
"Deploying instance {Instance}, deploymentId={DeploymentId}",
instanceName, command.DeploymentId);
// Redeployment replaces a running instance. Watch + stop the existing actor
// and buffer this command until its Terminated signal confirms the child
// (and its whole subtree) has fully stopped and freed its actor name.
if (_instanceActors.TryGetValue(instanceName, out var existing))
{
_instanceActors.Remove(instanceName);
_pendingRedeploys[existing] = new PendingRedeploy(command, Sender);
Context.Watch(existing);
Context.Stop(existing);
UpdateInstanceCounts();
return;
}
// Fresh deployment — no existing actor to replace.
ApplyDeployment(command, Sender, isRedeploy: false);
}
///
/// Recreates an Instance Actor once its predecessor has fully terminated during a
/// redeployment, draining the buffered .
///
private void HandleTerminated(Terminated terminated)
{
if (!_pendingRedeploys.Remove(terminated.ActorRef, out var pending))
return;
ApplyDeployment(pending.Command, pending.OriginalSender, isRedeploy: true);
}
///
/// Creates the Instance Actor, persists the config, and replies to the deployer.
/// A redeployment is an update of an existing instance, so the deployed-instance
/// counter is only incremented for genuinely new deployments.
///
/// SiteRuntime-005: the deployer is not told
/// until SQLite persistence has committed. The site's deployed-config store is the
/// durable source of truth — a config that was never persisted would be silently lost
/// on the next restart/failover, so reporting Success before the row is committed is
/// incorrect. The reply is sent from once
/// the persistence outcome is known.
///
private void ApplyDeployment(DeployInstanceCommand command, IActorRef sender, bool isRedeploy)
{
var instanceName = command.InstanceUniqueName;
// Ensure DCL connections exist for any data-sourced attributes
EnsureDclConnections(command.FlattenedConfigurationJson);
// Create the Instance Actor immediately
CreateInstanceActor(instanceName, command.FlattenedConfigurationJson);
if (!isRedeploy)
_totalDeployedCount++;
UpdateInstanceCounts();
// Persist to SQLite and clear static overrides asynchronously
Task.Run(async () =>
{
await _storage.StoreDeployedConfigAsync(
instanceName,
command.FlattenedConfigurationJson,
command.DeploymentId,
command.RevisionHash,
isEnabled: true);
// Static overrides are reset on redeployment per design decision
await _storage.ClearStaticOverridesAsync(instanceName);
// Replicate to standby node
_replicationActor?.Tell(new ReplicateConfigDeploy(
instanceName, command.FlattenedConfigurationJson,
command.DeploymentId, command.RevisionHash, true));
return new DeployPersistenceResult(
command.DeploymentId, instanceName, true, null, sender, isRedeploy);
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return t.Result;
return new DeployPersistenceResult(
command.DeploymentId, instanceName, false,
t.Exception?.GetBaseException().Message, sender, isRedeploy);
}).PipeTo(Self);
}
///
/// SiteRuntime-005: reports the deployment outcome to central only after the
/// persistence result is known. On a persistence failure the Instance Actor that was
/// created optimistically is stopped and the deployed-instance counter rolled back,
/// so the in-memory state stays consistent with durable storage, and central is told
/// the deployment .
///
private void HandleDeployPersistenceResult(DeployPersistenceResult result)
{
if (result.Success)
{
result.OriginalSender.Tell(new DeploymentStatusResponse(
result.DeploymentId,
result.InstanceName,
DeploymentStatus.Success,
null,
DateTimeOffset.UtcNow));
return;
}
_logger.LogError(
"Failed to persist deployment {DeploymentId} for {Instance}: {Error}",
result.DeploymentId, result.InstanceName, result.Error);
// Persistence failed — undo the optimistic actor creation and counter bump so
// the site does not advertise an instance it cannot durably recover.
if (_instanceActors.Remove(result.InstanceName, out var orphan))
Context.Stop(orphan);
if (!result.IsRedeploy)
_totalDeployedCount = Math.Max(0, _totalDeployedCount - 1);
UpdateInstanceCounts();
result.OriginalSender.Tell(new DeploymentStatusResponse(
result.DeploymentId,
result.InstanceName,
DeploymentStatus.Failed,
result.Error ?? "Deployment persistence failed",
DateTimeOffset.UtcNow));
}
///
/// Disables an instance: stops the actor and marks as disabled in SQLite.
///
private void HandleDisable(DisableInstanceCommand command)
{
var instanceName = command.InstanceUniqueName;
if (_instanceActors.TryGetValue(instanceName, out var actor))
{
Context.Stop(actor);
_instanceActors.Remove(instanceName);
}
UpdateInstanceCounts();
var sender = Sender;
_storage.SetInstanceEnabledAsync(instanceName, false).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
_replicationActor?.Tell(new ReplicateConfigSetEnabled(instanceName, false));
return new InstanceLifecycleResponse(
command.CommandId,
instanceName,
t.IsCompletedSuccessfully,
t.Exception?.GetBaseException().Message,
DateTimeOffset.UtcNow);
}).PipeTo(sender);
_logger.LogInformation("Instance {Instance} disabled", instanceName);
}
///
/// Enables an instance: marks as enabled in SQLite and re-creates the Instance Actor
/// from the stored config.
///
private void HandleEnable(EnableInstanceCommand command)
{
var instanceName = command.InstanceUniqueName;
var sender = Sender;
Task.Run(async () =>
{
await _storage.SetInstanceEnabledAsync(instanceName, true);
_replicationActor?.Tell(new ReplicateConfigSetEnabled(instanceName, true));
var configs = await _storage.GetAllDeployedConfigsAsync();
var config = configs.FirstOrDefault(c => c.InstanceUniqueName == instanceName);
return new EnableResult(command, config, null, sender);
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return t.Result;
return new EnableResult(command, null, t.Exception?.GetBaseException().Message, sender);
}).PipeTo(Self);
}
///
/// Processes enable result in the actor context (thread-safe).
///
private void HandleEnableResult(EnableResult result)
{
var instanceName = result.Command.InstanceUniqueName;
if (result.Error != null || result.Config == null)
{
var error = result.Error ?? $"No deployed config found for {instanceName}";
result.OriginalSender.Tell(new InstanceLifecycleResponse(
result.Command.CommandId, instanceName, false, error, DateTimeOffset.UtcNow));
return;
}
if (!_instanceActors.ContainsKey(instanceName))
{
CreateInstanceActor(instanceName, result.Config.ConfigJson);
}
UpdateInstanceCounts();
result.OriginalSender.Tell(new InstanceLifecycleResponse(
result.Command.CommandId, instanceName, true, null, DateTimeOffset.UtcNow));
_logger.LogInformation("Instance {Instance} enabled", instanceName);
}
///
/// Deletes an instance: stops the actor and removes config from SQLite.
/// Note: store-and-forward messages are NOT cleared per design decision.
///
private void HandleDelete(DeleteInstanceCommand command)
{
var instanceName = command.InstanceUniqueName;
if (_instanceActors.TryGetValue(instanceName, out var actor))
{
Context.Stop(actor);
_instanceActors.Remove(instanceName);
}
_totalDeployedCount = Math.Max(0, _totalDeployedCount - 1);
UpdateInstanceCounts();
var sender = Sender;
_storage.RemoveDeployedConfigAsync(instanceName).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
_replicationActor?.Tell(new ReplicateConfigRemove(instanceName));
return new InstanceLifecycleResponse(
command.CommandId,
instanceName,
t.IsCompletedSuccessfully,
t.Exception?.GetBaseException().Message,
DateTimeOffset.UtcNow);
}).PipeTo(sender);
_logger.LogInformation("Instance {Instance} deleted", instanceName);
}
///
/// DeploymentManager-006: answers a central query for the instance's
/// currently-applied deployment identity. The site's deployed-config store
/// (SQLite) is the authoritative record — it covers both enabled and
/// disabled instances, and survives node restart/failover. If the instance
/// has no stored config, the response reports IsDeployed = false with
/// null identity so central falls through to a normal deploy.
///
private void HandleDeploymentStateQuery(DeploymentStateQueryRequest request)
{
var sender = Sender;
var instanceName = request.InstanceUniqueName;
_storage.GetAllDeployedConfigsAsync().ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully)
{
_logger.LogError(
t.Exception?.GetBaseException(),
"Failed to read deployed configs for deployment state query of {Instance}",
instanceName);
// Treat a storage read failure as "unknown" — central falls
// through to a normal deploy and relies on site-side
// stale-rejection as the safety net.
return new DeploymentStateQueryResponse(
request.CorrelationId, instanceName, false, null, null, DateTimeOffset.UtcNow);
}
var config = t.Result.FirstOrDefault(c => c.InstanceUniqueName == instanceName);
return config == null
? new DeploymentStateQueryResponse(
request.CorrelationId, instanceName, false, null, null, DateTimeOffset.UtcNow)
: new DeploymentStateQueryResponse(
request.CorrelationId, instanceName, true,
config.DeploymentId, config.RevisionHash, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
// ── DCL connection management ──
///
/// Tracks the configuration last sent to the DCL for each connection name, keyed by
/// a hash of the connection's protocol/endpoints/credentials/failover count
/// (SiteRuntime-010). A name whose hash is unchanged is skipped; a name whose config
/// changed re-issues a CreateConnectionCommand so the DCL adopts the new
/// configuration instead of keeping a stale connection after a redeployment.
///
private readonly Dictionary _createdConnections = new();
///
/// Sets up DCL connections from the flattened config. Idempotent on unchanged
/// configuration, but re-issues the create command when a connection's endpoint,
/// credentials, backup endpoint, or failover retry count has changed since it was
/// last sent (SiteRuntime-010).
///
private void EnsureDclConnections(string configJson)
{
if (_dclManager == null) return;
try
{
var config = System.Text.Json.JsonSerializer.Deserialize(configJson);
if (config?.Connections == null) return;
foreach (var (name, connConfig) in config.Connections)
{
var configHash = ComputeConnectionConfigHash(connConfig);
if (_createdConnections.TryGetValue(name, out var lastHash) && lastHash == configHash)
continue;
var primaryDetails = FlattenConnectionConfig(connConfig.Protocol, connConfig.ConfigurationJson);
var backupDetails = string.IsNullOrEmpty(connConfig.BackupConfigurationJson)
? null
: FlattenConnectionConfig(connConfig.Protocol, connConfig.BackupConfigurationJson);
_dclManager.Tell(new Commons.Messages.DataConnection.CreateConnectionCommand(
name, connConfig.Protocol, primaryDetails, backupDetails, connConfig.FailoverRetryCount));
var changed = _createdConnections.ContainsKey(name);
_createdConnections[name] = configHash;
_logger.LogInformation(
"{Action} DCL connection {Connection} (protocol={Protocol})",
changed ? "Updated" : "Created", name, connConfig.Protocol);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to parse flattened config for DCL connections");
}
}
///
/// Computes a stable hash over the configuration fields that affect how the DCL
/// connects, so a changed endpoint/credential/backup/failover count is detected
/// (SiteRuntime-010).
///
private static string ComputeConnectionConfigHash(
Commons.Types.Flattening.ConnectionConfig connConfig)
{
var material = string.Join(
"",
connConfig.Protocol,
connConfig.ConfigurationJson ?? string.Empty,
connConfig.BackupConfigurationJson ?? string.Empty,
connConfig.FailoverRetryCount.ToString());
var bytes = System.Security.Cryptography.SHA256.HashData(
System.Text.Encoding.UTF8.GetBytes(material));
return Convert.ToHexString(bytes);
}
private static IDictionary FlattenConnectionConfig(string protocol, string? json)
{
if (string.IsNullOrEmpty(json))
return new Dictionary();
if (string.Equals(protocol, "OpcUa", StringComparison.OrdinalIgnoreCase))
{
var (config, _) = Commons.Serialization.OpcUaEndpointConfigSerializer.Deserialize(json);
return Commons.Serialization.OpcUaEndpointConfigSerializer.ToFlatDict(config);
}
// Fallback: assume legacy flat-dict shape for any future / unknown protocol.
try
{
var dict = new Dictionary();
using var doc = System.Text.Json.JsonDocument.Parse(json);
foreach (var prop in doc.RootElement.EnumerateObject())
dict[prop.Name] = prop.Value.ToString();
return dict;
}
catch
{
return new Dictionary();
}
}
// ── Shared Script Loading ──
///
/// SiteRuntime-008: reads and compiles all shared scripts on a background task so the
/// SQLite read and Roslyn compilation never block the singleton's mailbox thread. The
/// result is piped back as a message, carrying the
/// enabled configs to resume staggered Instance Actor creation on the actor thread.
///
private void LoadSharedScriptsFromStorage(List enabledConfigs)
{
Task.Run(async () =>
{
var scripts = await _storage.GetAllSharedScriptsAsync();
var compiled = 0;
foreach (var script in scripts)
{
if (_sharedScriptLibrary.CompileAndRegister(script.Name, script.Code))
compiled++;
}
return new SharedScriptsLoaded(enabledConfigs, compiled, scripts.Count);
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return t.Result;
_logger.LogError(
t.Exception?.GetBaseException(), "Failed to load shared scripts from SQLite");
// A shared-script load failure must not abandon startup — proceed with
// Instance Actor creation; scripts that need a missing shared script fail
// at execution time and are recorded to the site event log.
return new SharedScriptsLoaded(enabledConfigs, 0, 0);
}).PipeTo(Self);
}
// ── Debug View routing ──
private void RouteDebugViewSubscribe(SubscribeDebugViewRequest request)
{
if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
instanceActor.Forward(request);
}
else
{
_logger.LogWarning(
"Debug view subscribe for unknown instance {Instance}", request.InstanceUniqueName);
Sender.Tell(new DebugViewSnapshot(
request.InstanceUniqueName, Array.Empty(),
Array.Empty(), DateTimeOffset.UtcNow));
}
}
private void RouteDebugViewUnsubscribe(UnsubscribeDebugViewRequest request)
{
if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
instanceActor.Forward(request);
}
}
private void RouteDebugSnapshot(DebugSnapshotRequest request)
{
if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
instanceActor.Forward(request);
}
else
{
_logger.LogWarning(
"Debug snapshot for unknown instance {Instance}", request.InstanceUniqueName);
Sender.Tell(new DebugViewSnapshot(
request.InstanceUniqueName, Array.Empty(),
Array.Empty(), DateTimeOffset.UtcNow));
}
}
// ── Inbound API routing ──
private void RouteInboundApiCall(RouteToCallRequest request)
{
if (_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
// Convert to ScriptCallRequest and Ask the Instance Actor.
// Audit Log #23 (ParentExecutionId): carry the inbound request's
// ExecutionId down as ParentExecutionId so the routed script
// execution can record its spawner.
var scriptCall = new ScriptCallRequest(
request.ScriptName, request.Parameters, 0, request.CorrelationId,
ParentExecutionId: request.ParentExecutionId);
var sender = Sender;
instanceActor.Ask(scriptCall, TimeSpan.FromSeconds(30))
.ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
var result = t.Result;
return new RouteToCallResponse(
request.CorrelationId, result.Success, result.ReturnValue,
result.ErrorMessage, DateTimeOffset.UtcNow);
}
return new RouteToCallResponse(
request.CorrelationId, false, null,
t.Exception?.GetBaseException().Message ?? "Script call timed out",
DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
else
{
Sender.Tell(new RouteToCallResponse(
request.CorrelationId, false, null,
$"Instance '{request.InstanceUniqueName}' not found on this site.",
DateTimeOffset.UtcNow));
}
}
///
/// Reads attribute values from a deployed instance for a Route.To().GetAttribute(s)
/// call (or a central Test Run bound to the instance). Asks the Instance Actor
/// per attribute and combines the results.
///
private void RouteInboundApiGetAttributes(RouteToGetAttributesRequest request)
{
if (!_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
Sender.Tell(new RouteToGetAttributesResponse(
request.CorrelationId, new Dictionary(), false,
$"Instance '{request.InstanceUniqueName}' not found on this site.",
DateTimeOffset.UtcNow));
return;
}
var sender = Sender;
var names = request.AttributeNames;
var asks = names
.Select(name => instanceActor.Ask(
new GetAttributeRequest(
request.CorrelationId, request.InstanceUniqueName, name, DateTimeOffset.UtcNow),
TimeSpan.FromSeconds(30)))
.ToArray();
Task.WhenAll(asks).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
var values = new Dictionary();
for (var i = 0; i < names.Count; i++)
values[names[i]] = t.Result[i].Found ? t.Result[i].Value : null;
return new RouteToGetAttributesResponse(
request.CorrelationId, values, true, null, DateTimeOffset.UtcNow);
}
return new RouteToGetAttributesResponse(
request.CorrelationId, new Dictionary(), false,
t.Exception?.GetBaseException().Message ?? "Attribute read timed out",
DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
///
/// Writes attribute values on a deployed instance for a Route.To().SetAttribute(s)
/// call (or a central Test Run bound to the instance). Each write is Ask'd to the
/// Instance Actor, which routes data-sourced attributes through the DCL and static
/// attributes to a persisted override. The response reflects the real per-attribute
/// outcome (a non-existent attribute or a failed device write reports failure),
/// rather than an unconditional optimistic ack.
///
private void RouteInboundApiSetAttributes(RouteToSetAttributesRequest request)
{
if (!_instanceActors.TryGetValue(request.InstanceUniqueName, out var instanceActor))
{
Sender.Tell(new RouteToSetAttributesResponse(
request.CorrelationId, false,
$"Instance '{request.InstanceUniqueName}' not found on this site.",
DateTimeOffset.UtcNow));
return;
}
var sender = Sender;
var correlationId = request.CorrelationId;
var asks = request.AttributeValues
.Select(kvp => instanceActor.Ask(
new SetStaticAttributeCommand(
correlationId, request.InstanceUniqueName, kvp.Key, kvp.Value, DateTimeOffset.UtcNow),
TimeSpan.FromSeconds(30)))
.ToArray();
Task.WhenAll(asks).ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully)
return new RouteToSetAttributesResponse(
correlationId, false,
t.Exception?.GetBaseException().Message ?? "Attribute write timed out",
DateTimeOffset.UtcNow);
var failures = t.Result
.Where(r => !r.Success)
.Select(r => $"{r.AttributeName}: {r.ErrorMessage}")
.ToArray();
return failures.Length == 0
? new RouteToSetAttributesResponse(correlationId, true, null, DateTimeOffset.UtcNow)
: new RouteToSetAttributesResponse(
correlationId, false, string.Join("; ", failures), DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
///
/// WP-33: Handles system-wide artifact deployment (shared scripts, external systems, etc.).
/// Persists artifacts to SiteStorageService and recompiles shared scripts.
///
private void HandleDeployArtifacts(DeployArtifactsCommand command)
{
_logger.LogInformation(
"Deploying system artifacts, deploymentId={DeploymentId}", command.DeploymentId);
var sender = Sender;
Task.Run(async () =>
{
try
{
// WP-33: Store shared scripts and recompile
if (command.SharedScripts != null)
{
foreach (var script in command.SharedScripts)
{
await _storage.StoreSharedScriptAsync(script.Name, script.Code,
script.ParameterDefinitions, script.ReturnDefinition);
// WP-33: Shared scripts recompiled on update
_sharedScriptLibrary.CompileAndRegister(script.Name, script.Code);
}
}
// WP-33: Store external system definitions
if (command.ExternalSystems != null)
{
foreach (var es in command.ExternalSystems)
{
await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl,
es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson);
}
}
// WP-33: Store database connection definitions
if (command.DatabaseConnections != null)
{
foreach (var db in command.DatabaseConnections)
{
await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString,
db.MaxRetries, db.RetryDelay);
}
}
// WP-33: Store notification lists
if (command.NotificationLists != null)
{
foreach (var nl in command.NotificationLists)
{
await _storage.StoreNotificationListAsync(nl.Name, nl.RecipientEmails);
}
}
// Store data connection definitions (OPC UA endpoints, etc.)
if (command.DataConnections != null)
{
foreach (var dc in command.DataConnections)
{
await _storage.StoreDataConnectionDefinitionAsync(
dc.Name, dc.Protocol, dc.PrimaryConfigurationJson,
dc.BackupConfigurationJson, dc.FailoverRetryCount);
}
}
// Store SMTP configurations
if (command.SmtpConfigurations != null)
{
foreach (var smtp in command.SmtpConfigurations)
{
await _storage.StoreSmtpConfigurationAsync(
smtp.Name, smtp.Server, smtp.Port, smtp.AuthMode,
smtp.FromAddress, smtp.Username, smtp.Password, smtp.OAuthConfig);
}
}
// Replicate artifacts to standby node
_replicationActor?.Tell(new ReplicateArtifacts(command));
return new ArtifactDeploymentResponse(
command.DeploymentId, "", true, null, DateTimeOffset.UtcNow);
}
catch (Exception ex)
{
return new ArtifactDeploymentResponse(
command.DeploymentId, "", false, ex.Message, DateTimeOffset.UtcNow);
}
}).PipeTo(sender);
}
///
/// Creates a child Instance Actor with the given name and configuration JSON.
///
internal void CreateInstanceActor(string instanceName, string configJson)
{
if (_instanceActors.ContainsKey(instanceName))
{
_logger.LogWarning("Instance Actor {Instance} already exists, skipping creation", instanceName);
return;
}
// SiteRuntime-015: reuse the shared, host-configured logger factory
// instead of allocating (and leaking) a fresh LoggerFactory per instance.
var props = Props.Create(() => new InstanceActor(
instanceName,
configJson,
_storage,
_compilationService,
_sharedScriptLibrary,
_streamManager,
_options,
_loggerFactory.CreateLogger(),
_dclManager,
_healthCollector,
_serviceProvider));
var actorRef = Context.ActorOf(props, instanceName);
_instanceActors[instanceName] = actorRef;
_logger.LogDebug("Created Instance Actor for {Instance}", instanceName);
}
///
/// Gets the count of active Instance Actors (for testing/diagnostics).
///
internal int InstanceActorCount => _instanceActors.Count;
///
/// Updates the health collector with current instance counts.
/// Total deployed = _totalDeployedCount, enabled = running actors, disabled = difference.
///
private void UpdateInstanceCounts()
{
_healthCollector?.SetInstanceCounts(
deployed: _totalDeployedCount,
enabled: _instanceActors.Count,
disabled: _totalDeployedCount - _instanceActors.Count);
}
// ── Internal messages ──
internal record StartupConfigsLoaded(List Configs, string? Error);
///
/// Internal message piped back once shared scripts have been compiled off-thread
/// (SiteRuntime-008). Carries the enabled configs so staggered Instance Actor
/// creation resumes on the actor thread.
///
internal record SharedScriptsLoaded(
List EnabledConfigs, int CompiledCount, int TotalCount);
internal record StartNextBatch(BatchState State);
internal record BatchState(List Configs, int NextIndex);
internal record EnableResult(
EnableInstanceCommand Command, DeployedInstance? Config, string? Error, IActorRef OriginalSender);
internal record DeployPersistenceResult(
string DeploymentId, string InstanceName, bool Success, string? Error,
IActorRef OriginalSender, bool IsRedeploy);
///
/// A redeployment command buffered until the previous Instance Actor terminates.
///
internal record PendingRedeploy(DeployInstanceCommand Command, IActorRef OriginalSender);
}