Phase 3A: Site runtime foundation — Akka cluster, SQLite persistence, Deployment Manager singleton, Instance Actor
- WP-1: Site cluster config (keep-oldest SBR, down-if-alone, 2s/10s failure detection) - WP-2: Site-role host bootstrap (no Kestrel, SQLite paths) - WP-3: SiteStorageService with deployed_configurations + static_attribute_overrides tables - WP-4: DeploymentManagerActor as cluster singleton with staggered Instance Actor creation, OneForOneStrategy/Resume supervision, deploy/disable/enable/delete lifecycle - WP-5: InstanceActor with attribute state, GetAttribute/SetAttribute, SQLite override persistence - WP-6: CoordinatedShutdown verified for graceful singleton handover - WP-7: Dual-node recovery (both seed nodes, min-nr-of-members=1) - WP-8: 31 tests (storage CRUD, actor lifecycle, supervision, negative checks) 389 total tests pass, zero warnings.
This commit is contained in:
358
src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs
Normal file
358
src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs
Normal file
@@ -0,0 +1,358 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ScadaLink.Commons.Messages.Deployment;
|
||||
using ScadaLink.Commons.Messages.Lifecycle;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.SiteRuntime.Persistence;
|
||||
|
||||
namespace ScadaLink.SiteRuntime.Actors;
|
||||
|
||||
/// <summary>
|
||||
/// Site-side Deployment Manager — runs as a cluster singleton within the site cluster.
|
||||
/// On startup, reads all deployed configs from SQLite and creates Instance Actors
|
||||
/// for enabled instances in staggered batches.
|
||||
///
|
||||
/// Handles: DeployInstance, DisableInstance, EnableInstance, DeleteInstance.
|
||||
///
|
||||
/// Supervision strategy: OneForOneStrategy with Resume for Instance Actors
|
||||
/// so that a single instance failure does not cascade to siblings.
|
||||
/// </summary>
|
||||
public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
private readonly SiteStorageService _storage;
|
||||
private readonly SiteRuntimeOptions _options;
|
||||
private readonly ILogger<DeploymentManagerActor> _logger;
|
||||
private readonly Dictionary<string, IActorRef> _instanceActors = new();
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
|
||||
public DeploymentManagerActor(
|
||||
SiteStorageService storage,
|
||||
SiteRuntimeOptions options,
|
||||
ILogger<DeploymentManagerActor> logger)
|
||||
{
|
||||
_storage = storage;
|
||||
_options = options;
|
||||
_logger = logger;
|
||||
|
||||
// Lifecycle commands
|
||||
Receive<DeployInstanceCommand>(HandleDeploy);
|
||||
Receive<DisableInstanceCommand>(HandleDisable);
|
||||
Receive<EnableInstanceCommand>(HandleEnable);
|
||||
Receive<DeleteInstanceCommand>(HandleDelete);
|
||||
|
||||
// Internal startup messages
|
||||
Receive<StartupConfigsLoaded>(HandleStartupConfigsLoaded);
|
||||
Receive<StartNextBatch>(HandleStartNextBatch);
|
||||
|
||||
// Internal enable result
|
||||
Receive<EnableResult>(HandleEnableResult);
|
||||
|
||||
// Internal deploy persistence result
|
||||
Receive<DeployPersistenceResult>(HandleDeployPersistenceResult);
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
{
|
||||
base.PreStart();
|
||||
_logger.LogInformation("DeploymentManagerActor starting — loading deployed configs from SQLite...");
|
||||
|
||||
// Load all configs asynchronously and pipe to self
|
||||
_storage.GetAllDeployedConfigsAsync().ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
return new StartupConfigsLoaded(t.Result, null);
|
||||
return new StartupConfigsLoaded([], t.Exception?.GetBaseException().Message);
|
||||
}).PipeTo(Self);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// OneForOneStrategy: Resume on exceptions to preserve Instance Actor state,
|
||||
/// Stop only on ActorInitializationException (actor failed to start).
|
||||
/// </summary>
|
||||
protected override SupervisorStrategy SupervisorStrategy()
|
||||
{
|
||||
return new OneForOneStrategy(
|
||||
maxNrOfRetries: -1,
|
||||
withinTimeRange: TimeSpan.FromMinutes(1),
|
||||
decider: Decider.From(ex =>
|
||||
{
|
||||
if (ex is ActorInitializationException)
|
||||
{
|
||||
_logger.LogError(ex, "Instance Actor failed to initialize, stopping");
|
||||
return Directive.Stop;
|
||||
}
|
||||
|
||||
_logger.LogWarning(ex, "Instance Actor threw exception, resuming");
|
||||
return Directive.Resume;
|
||||
}));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes the loaded configs from SQLite and begins staggered Instance Actor creation.
|
||||
/// </summary>
|
||||
private void HandleStartupConfigsLoaded(StartupConfigsLoaded msg)
|
||||
{
|
||||
if (msg.Error != null)
|
||||
{
|
||||
_logger.LogError("Failed to load deployed configs: {Error}", msg.Error);
|
||||
return;
|
||||
}
|
||||
|
||||
var enabledConfigs = msg.Configs.Where(c => c.IsEnabled).ToList();
|
||||
_logger.LogInformation(
|
||||
"Loaded {Total} deployed configs ({Enabled} enabled) from SQLite",
|
||||
msg.Configs.Count, enabledConfigs.Count);
|
||||
|
||||
if (enabledConfigs.Count == 0)
|
||||
return;
|
||||
|
||||
// Start the first batch immediately
|
||||
var batchState = new BatchState(enabledConfigs, 0);
|
||||
Self.Tell(new StartNextBatch(batchState));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates Instance Actors in batches with a configurable delay between batches
|
||||
/// to prevent reconnection storms on failover.
|
||||
/// </summary>
|
||||
private void HandleStartNextBatch(StartNextBatch msg)
|
||||
{
|
||||
var state = msg.State;
|
||||
var batchSize = _options.StartupBatchSize;
|
||||
var startIdx = state.NextIndex;
|
||||
var endIdx = Math.Min(startIdx + batchSize, state.Configs.Count);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Creating Instance Actors batch [{Start}..{End}) of {Total}",
|
||||
startIdx, endIdx, state.Configs.Count);
|
||||
|
||||
for (var i = startIdx; i < endIdx; i++)
|
||||
{
|
||||
var config = state.Configs[i];
|
||||
CreateInstanceActor(config.InstanceUniqueName, config.ConfigJson);
|
||||
}
|
||||
|
||||
// Schedule next batch if there are more, using Timers (IWithTimers)
|
||||
if (endIdx < state.Configs.Count)
|
||||
{
|
||||
var nextState = new BatchState(state.Configs, endIdx);
|
||||
Timers.StartSingleTimer(
|
||||
"startup-batch",
|
||||
new StartNextBatch(nextState),
|
||||
TimeSpan.FromMilliseconds(_options.StartupBatchDelayMs));
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"All {Count} Instance Actors created", state.Configs.Count);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a new deployment: stores config in SQLite, clears previous static overrides,
|
||||
/// and creates or replaces the Instance Actor.
|
||||
/// </summary>
|
||||
private void HandleDeploy(DeployInstanceCommand command)
|
||||
{
|
||||
var instanceName = command.InstanceUniqueName;
|
||||
_logger.LogInformation(
|
||||
"Deploying instance {Instance}, deploymentId={DeploymentId}",
|
||||
instanceName, command.DeploymentId);
|
||||
|
||||
// Stop existing actor if present (redeployment replaces)
|
||||
if (_instanceActors.TryGetValue(instanceName, out var existing))
|
||||
{
|
||||
Context.Stop(existing);
|
||||
_instanceActors.Remove(instanceName);
|
||||
}
|
||||
|
||||
// Create the Instance Actor immediately
|
||||
CreateInstanceActor(instanceName, command.FlattenedConfigurationJson);
|
||||
|
||||
// Persist to SQLite and clear static overrides asynchronously
|
||||
var sender = Sender;
|
||||
Task.Run(async () =>
|
||||
{
|
||||
await _storage.StoreDeployedConfigAsync(
|
||||
instanceName,
|
||||
command.FlattenedConfigurationJson,
|
||||
command.DeploymentId,
|
||||
command.RevisionHash,
|
||||
isEnabled: true);
|
||||
|
||||
// Static overrides are reset on redeployment per design decision
|
||||
await _storage.ClearStaticOverridesAsync(instanceName);
|
||||
|
||||
return new DeployPersistenceResult(command.DeploymentId, instanceName, true, null, sender);
|
||||
}).ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
return t.Result;
|
||||
return new DeployPersistenceResult(
|
||||
command.DeploymentId, instanceName, false,
|
||||
t.Exception?.GetBaseException().Message, sender);
|
||||
}).PipeTo(Self);
|
||||
|
||||
// Reply immediately — deployment is applied (actor is running)
|
||||
sender.Tell(new DeploymentStatusResponse(
|
||||
command.DeploymentId,
|
||||
instanceName,
|
||||
DeploymentStatus.Success,
|
||||
null,
|
||||
DateTimeOffset.UtcNow));
|
||||
}
|
||||
|
||||
private void HandleDeployPersistenceResult(DeployPersistenceResult result)
|
||||
{
|
||||
if (!result.Success)
|
||||
{
|
||||
_logger.LogError(
|
||||
"Failed to persist deployment {DeploymentId} for {Instance}: {Error}",
|
||||
result.DeploymentId, result.InstanceName, result.Error);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Disables an instance: stops the actor and marks as disabled in SQLite.
|
||||
/// </summary>
|
||||
private void HandleDisable(DisableInstanceCommand command)
|
||||
{
|
||||
var instanceName = command.InstanceUniqueName;
|
||||
|
||||
if (_instanceActors.TryGetValue(instanceName, out var actor))
|
||||
{
|
||||
Context.Stop(actor);
|
||||
_instanceActors.Remove(instanceName);
|
||||
}
|
||||
|
||||
var sender = Sender;
|
||||
_storage.SetInstanceEnabledAsync(instanceName, false).ContinueWith(t =>
|
||||
{
|
||||
return new InstanceLifecycleResponse(
|
||||
command.CommandId,
|
||||
instanceName,
|
||||
t.IsCompletedSuccessfully,
|
||||
t.Exception?.GetBaseException().Message,
|
||||
DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
|
||||
_logger.LogInformation("Instance {Instance} disabled", instanceName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enables an instance: marks as enabled in SQLite and re-creates the Instance Actor
|
||||
/// from the stored config.
|
||||
/// </summary>
|
||||
private void HandleEnable(EnableInstanceCommand command)
|
||||
{
|
||||
var instanceName = command.InstanceUniqueName;
|
||||
var sender = Sender;
|
||||
|
||||
Task.Run(async () =>
|
||||
{
|
||||
await _storage.SetInstanceEnabledAsync(instanceName, true);
|
||||
var configs = await _storage.GetAllDeployedConfigsAsync();
|
||||
var config = configs.FirstOrDefault(c => c.InstanceUniqueName == instanceName);
|
||||
return new EnableResult(command, config, null, sender);
|
||||
}).ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
return t.Result;
|
||||
return new EnableResult(command, null, t.Exception?.GetBaseException().Message, sender);
|
||||
}).PipeTo(Self);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes enable result in the actor context (thread-safe).
|
||||
/// </summary>
|
||||
private void HandleEnableResult(EnableResult result)
|
||||
{
|
||||
var instanceName = result.Command.InstanceUniqueName;
|
||||
|
||||
if (result.Error != null || result.Config == null)
|
||||
{
|
||||
var error = result.Error ?? $"No deployed config found for {instanceName}";
|
||||
result.OriginalSender.Tell(new InstanceLifecycleResponse(
|
||||
result.Command.CommandId, instanceName, false, error, DateTimeOffset.UtcNow));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_instanceActors.ContainsKey(instanceName))
|
||||
{
|
||||
CreateInstanceActor(instanceName, result.Config.ConfigJson);
|
||||
}
|
||||
|
||||
result.OriginalSender.Tell(new InstanceLifecycleResponse(
|
||||
result.Command.CommandId, instanceName, true, null, DateTimeOffset.UtcNow));
|
||||
|
||||
_logger.LogInformation("Instance {Instance} enabled", instanceName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Deletes an instance: stops the actor and removes config from SQLite.
|
||||
/// Note: store-and-forward messages are NOT cleared per design decision.
|
||||
/// </summary>
|
||||
private void HandleDelete(DeleteInstanceCommand command)
|
||||
{
|
||||
var instanceName = command.InstanceUniqueName;
|
||||
|
||||
if (_instanceActors.TryGetValue(instanceName, out var actor))
|
||||
{
|
||||
Context.Stop(actor);
|
||||
_instanceActors.Remove(instanceName);
|
||||
}
|
||||
|
||||
var sender = Sender;
|
||||
_storage.RemoveDeployedConfigAsync(instanceName).ContinueWith(t =>
|
||||
{
|
||||
return new InstanceLifecycleResponse(
|
||||
command.CommandId,
|
||||
instanceName,
|
||||
t.IsCompletedSuccessfully,
|
||||
t.Exception?.GetBaseException().Message,
|
||||
DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
|
||||
_logger.LogInformation("Instance {Instance} deleted", instanceName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a child Instance Actor with the given name and configuration JSON.
|
||||
/// </summary>
|
||||
internal void CreateInstanceActor(string instanceName, string configJson)
|
||||
{
|
||||
if (_instanceActors.ContainsKey(instanceName))
|
||||
{
|
||||
_logger.LogWarning("Instance Actor {Instance} already exists, skipping creation", instanceName);
|
||||
return;
|
||||
}
|
||||
|
||||
var loggerFactory = new LoggerFactory();
|
||||
var props = Props.Create(() => new InstanceActor(
|
||||
instanceName,
|
||||
configJson,
|
||||
_storage,
|
||||
loggerFactory.CreateLogger<InstanceActor>()));
|
||||
|
||||
var actorRef = Context.ActorOf(props, instanceName);
|
||||
_instanceActors[instanceName] = actorRef;
|
||||
|
||||
_logger.LogDebug("Created Instance Actor for {Instance}", instanceName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the count of active Instance Actors (for testing/diagnostics).
|
||||
/// </summary>
|
||||
internal int InstanceActorCount => _instanceActors.Count;
|
||||
|
||||
// ── Internal messages ──
|
||||
|
||||
internal record StartupConfigsLoaded(List<DeployedInstance> Configs, string? Error);
|
||||
internal record StartNextBatch(BatchState State);
|
||||
internal record BatchState(List<DeployedInstance> Configs, int NextIndex);
|
||||
internal record EnableResult(
|
||||
EnableInstanceCommand Command, DeployedInstance? Config, string? Error, IActorRef OriginalSender);
|
||||
internal record DeployPersistenceResult(
|
||||
string DeploymentId, string InstanceName, bool Success, string? Error, IActorRef OriginalSender);
|
||||
}
|
||||
Reference in New Issue
Block a user