feat(site): DeploymentManagerActor fetches config then applies (notify-and-fetch)

This commit is contained in:
Joseph Doherty
2026-06-26 13:47:28 -04:00
parent 3955cb4f28
commit 631ce5bfce
3 changed files with 257 additions and 9 deletions
@@ -839,6 +839,12 @@ akka {{
_logger.LogInformation("SiteReplicationActor created and S&F replication handler wired");
// Notify-and-fetch (Task 10): the active singleton fetches a deployment's
// flattened config from central over HTTP when a RefreshDeploymentCommand
// arrives. Resolve the fetcher from the same provider the actor already uses.
var deploymentConfigFetcher =
_serviceProvider.GetService<ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment.IDeploymentConfigFetcher>();
// Create the Deployment Manager as a cluster singleton
var singletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new DeploymentManagerActor(
@@ -851,7 +857,9 @@ akka {{
dclManager,
replicationActor,
siteHealthCollector,
_serviceProvider)),
_serviceProvider,
null,
deploymentConfigFetcher)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithRole(siteRole)
@@ -13,6 +13,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Messages.ScriptExecution;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
@@ -50,6 +51,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
private readonly IActorRef? _replicationActor;
private readonly ISiteHealthCollector? _healthCollector;
private readonly IServiceProvider? _serviceProvider;
/// <summary>
/// Notify-and-fetch (Task 10): fetches a deployment's flattened config from central
/// over HTTP when a <see cref="RefreshDeploymentCommand"/> arrives. Optional — null on
/// nodes/tests that never receive a refresh; the active site path supplies it via DI.
/// </summary>
private readonly IDeploymentConfigFetcher? _configFetcher;
private readonly Dictionary<string, IActorRef> _instanceActors = new();
/// <summary>
/// Tracks Instance Actors that are terminating as part of a redeployment, keyed by
@@ -97,6 +104,11 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
/// <param name="healthCollector">Optional site health collector.</param>
/// <param name="serviceProvider">Optional service provider for resolving per-instance services.</param>
/// <param name="loggerFactory">Optional logger factory for creating Instance Actor loggers.</param>
/// <param name="configFetcher">
/// Optional notify-and-fetch config fetcher (Task 10). Required for the
/// <see cref="RefreshDeploymentCommand"/> path; null on nodes/tests that never
/// receive a refresh.
/// </param>
public DeploymentManagerActor(
SiteStorageService storage,
ScriptCompilationService compilationService,
@@ -108,7 +120,8 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
IActorRef? replicationActor = null,
ISiteHealthCollector? healthCollector = null,
IServiceProvider? serviceProvider = null,
ILoggerFactory? loggerFactory = null)
ILoggerFactory? loggerFactory = null,
IDeploymentConfigFetcher? configFetcher = null)
{
_storage = storage;
_compilationService = compilationService;
@@ -119,6 +132,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
_replicationActor = replicationActor;
_healthCollector = healthCollector;
_serviceProvider = serviceProvider;
_configFetcher = configFetcher;
_logger = logger;
// SiteRuntime-015: reuse a single logger factory for all Instance Actors.
// Prefer an explicitly injected factory, fall back to one resolved from
@@ -129,11 +143,19 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
?? Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance;
// Lifecycle commands
Receive<DeployInstanceCommand>(HandleDeploy);
Receive<DeployInstanceCommand>(cmd => HandleDeploy(cmd, Sender));
Receive<DisableInstanceCommand>(HandleDisable);
Receive<EnableInstanceCommand>(HandleEnable);
Receive<DeleteInstanceCommand>(HandleDelete);
// Notify-and-fetch (Task 10): central sends a small RefreshDeploymentCommand;
// the active singleton fetches the flattened config over HTTP, then reuses the
// existing apply path. The two internal results carry the fetched config (or the
// fetch error) back onto the actor thread along with the captured original sender.
Receive<RefreshDeploymentCommand>(HandleRefreshDeployment);
Receive<RefreshFetched>(HandleRefreshFetched);
Receive<RefreshFetchFailed>(HandleRefreshFetchFailed);
// DeploymentManager-006: query-the-site-before-redeploy idempotency.
// Central asks for the instance's currently-applied deployment identity
// before re-sending a deployment whose prior record is stuck InProgress
@@ -356,7 +378,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
/// terminating child is watched and the in-flight command is buffered until the
/// <see cref="Terminated"/> signal arrives.
/// </summary>
private void HandleDeploy(DeployInstanceCommand command)
/// <param name="replyTo">
/// The actor to reply to with the eventual <see cref="DeploymentStatusResponse"/>.
/// Passed explicitly (rather than read from <see cref="ActorBase.Sender"/>) so the
/// notify-and-fetch path (<see cref="HandleRefreshFetched"/>) can supply the ORIGINAL
/// central sender after the async config fetch, where <c>Sender</c> is no longer valid.
/// The redeploy-buffer path carries it on <see cref="PendingRedeploy"/> so the buffered
/// apply still replies to the right actor.
/// </param>
private void HandleDeploy(DeployInstanceCommand command, IActorRef replyTo)
{
var instanceName = command.InstanceUniqueName;
_logger.LogInformation(
@@ -369,7 +399,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
if (_instanceActors.TryGetValue(instanceName, out var existing))
{
_instanceActors.Remove(instanceName);
_pendingRedeploys[existing] = new PendingRedeploy(command, Sender);
_pendingRedeploys[existing] = new PendingRedeploy(command, replyTo);
_terminatingActorsByName[instanceName] = existing;
Context.Watch(existing);
Context.Stop(existing);
@@ -396,12 +426,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
$"superseded by newer deployment {command.DeploymentId} before predecessor finished terminating",
DateTimeOffset.UtcNow));
}
_pendingRedeploys[terminatingRef] = new PendingRedeploy(command, Sender);
_pendingRedeploys[terminatingRef] = new PendingRedeploy(command, replyTo);
return;
}
// Fresh deployment — no existing actor to replace.
ApplyDeployment(command, Sender, isRedeploy: false);
ApplyDeployment(command, replyTo, isRedeploy: false);
}
/// <summary>
@@ -423,6 +453,72 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
ApplyDeployment(pending.Command, pending.OriginalSender, isRedeploy: true);
}
/// <summary>
/// Notify-and-fetch (Task 10): handles a small central→site
/// <see cref="RefreshDeploymentCommand"/>. Fetches the deployment's flattened config
/// from central over HTTP via <see cref="IDeploymentConfigFetcher"/>, then pipes the
/// result back to self so the existing apply path runs on the actor thread with the
/// ORIGINAL sender preserved (the central Ask's temp actor). The reply is the existing
/// <see cref="DeploymentStatusResponse"/>, so the central deploy completes unchanged.
/// </summary>
private void HandleRefreshDeployment(RefreshDeploymentCommand cmd)
{
// Capture the Ask temp-actor sender BEFORE the async continuation: Akka's Sender
// is only valid during synchronous message handling and is no longer the original
// sender once the ContinueWith/PipeTo continuation runs on a thread-pool thread.
var replyTo = Sender;
if (_configFetcher is null)
{
replyTo.Tell(new DeploymentStatusResponse(
cmd.DeploymentId, cmd.InstanceUniqueName, DeploymentStatus.Failed,
"Deployment config fetcher not available on this node.", DateTimeOffset.UtcNow));
return;
}
_logger.LogInformation(
"Fetching config for deployment {DeploymentId} instance {Instance} (notify-and-fetch)",
cmd.DeploymentId, cmd.InstanceUniqueName);
// CancellationToken.None: the fetch is bounded by HttpClient.Timeout. On a singleton
// handover mid-fetch the PipeTo lands in dead letters and the central Ask times out
// (then reconciles) — acceptable, rare.
_configFetcher.FetchAsync(cmd.CentralFetchBaseUrl, cmd.DeploymentId, cmd.FetchToken, CancellationToken.None)
.ContinueWith(t => t.IsCompletedSuccessfully
? (object)new RefreshFetched(cmd, t.Result, replyTo)
: new RefreshFetchFailed(cmd, t.Exception?.GetBaseException().Message ?? "fetch failed", replyTo))
.PipeTo(Self);
}
/// <summary>
/// Notify-and-fetch (Task 10): the config fetch succeeded — reconstruct the in-process
/// <see cref="DeployInstanceCommand"/> apply DTO and reuse the existing apply path,
/// threading the original central sender through so the
/// <see cref="DeploymentStatusResponse"/> reaches it.
/// </summary>
private void HandleRefreshFetched(RefreshFetched msg)
{
var command = new DeployInstanceCommand(
msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Cmd.RevisionHash,
msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp);
HandleDeploy(command, msg.ReplyTo);
}
/// <summary>
/// Notify-and-fetch (Task 10): the config fetch failed — report
/// <see cref="DeploymentStatus.Failed"/> to the original central sender so the deploy
/// completes (rather than the central Ask hanging to timeout). Nothing is applied.
/// </summary>
private void HandleRefreshFetchFailed(RefreshFetchFailed msg)
{
_logger.LogError(
"Config fetch failed for deployment {DeploymentId} instance {Instance}: {Error}",
msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Error);
msg.ReplyTo.Tell(new DeploymentStatusResponse(
msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, DeploymentStatus.Failed,
$"Communication failure: {msg.Error}", DateTimeOffset.UtcNow));
}
/// <summary>
/// Creates the Instance Actor, persists the config, and replies to the deployer.
/// A redeployment is an update of an existing instance, so the deployed-instance
@@ -1590,6 +1686,19 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
/// </summary>
internal record PendingRedeploy(DeployInstanceCommand Command, IActorRef OriginalSender);
/// <summary>
/// Notify-and-fetch (Task 10): piped back to self when the deployment's flattened
/// config has been fetched from central. Carries the original central sender so the
/// reused apply path replies to it.
/// </summary>
private sealed record RefreshFetched(RefreshDeploymentCommand Cmd, string ConfigJson, IActorRef ReplyTo);
/// <summary>
/// Notify-and-fetch (Task 10): piped back to self when the config fetch failed.
/// Carries the original central sender so a Failed status is reported to it.
/// </summary>
private sealed record RefreshFetchFailed(RefreshDeploymentCommand Cmd, string Error, IActorRef ReplyTo);
/// <summary>
/// SiteRuntime-021: internal message dispatched from
/// <see cref="HandleDeployArtifacts"/>'s off-thread persistence task back
@@ -10,6 +10,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.TestSupport;
@@ -48,7 +49,8 @@ public class DeploymentManagerActorTests : TestKit, IDisposable
}
private IActorRef CreateDeploymentManager(
SiteRuntimeOptions? options = null, IServiceProvider? serviceProvider = null)
SiteRuntimeOptions? options = null, IServiceProvider? serviceProvider = null,
IDeploymentConfigFetcher? configFetcher = null)
{
options ??= new SiteRuntimeOptions();
return ActorOf(Props.Create(() => new DeploymentManagerActor(
@@ -62,7 +64,8 @@ public class DeploymentManagerActorTests : TestKit, IDisposable
null,
null,
serviceProvider,
null)));
null,
configFetcher)));
}
private static string MakeConfigJson(string instanceName)
@@ -675,4 +678,132 @@ public class DeploymentManagerActorTests : TestKit, IDisposable
Assert.False(snapshot.InstanceNotFound,
"A deployed (but empty) instance must NOT set InstanceNotFound.");
}
// ── Task 10: notify-and-fetch — RefreshDeploymentCommand → fetch → apply ──
[Fact]
public async Task RefreshDeployment_FetchSucceeds_AppliesConfigAndRepliesSuccess()
{
// The active singleton receives a small RefreshDeploymentCommand, fetches the
// flattened config over HTTP via IDeploymentConfigFetcher, then runs the existing
// apply path: the config is persisted to SQLite and a Success status is replied.
var fetcher = new FakeConfigFetcher(MakeConfigJson("FetchedPump"));
var actor = CreateDeploymentManager(configFetcher: fetcher);
await Task.Delay(500); // empty startup
actor.Tell(new RefreshDeploymentCommand(
"dep-fetch-1", "FetchedPump", "sha256:fetch", "admin", DateTimeOffset.UtcNow,
"http://central:9000", "tok-123"));
var response = ExpectMsg<DeploymentStatusResponse>(TimeSpan.FromSeconds(5));
Assert.Equal(DeploymentStatus.Success, response.Status);
Assert.Equal("FetchedPump", response.InstanceUniqueName);
Assert.Equal("dep-fetch-1", response.DeploymentId);
// The fetcher was called with the command's fetch coordinates.
Assert.Equal("http://central:9000", fetcher.LastBaseUrl);
Assert.Equal("dep-fetch-1", fetcher.LastDeploymentId);
Assert.Equal("tok-123", fetcher.LastToken);
// The existing apply path ran end-to-end: the fetched config is persisted.
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Contains(configs, c => c.InstanceUniqueName == "FetchedPump");
}
[Fact]
public async Task RefreshDeployment_FetchFails_RepliesFailedAndDoesNotApply()
{
// A config fetch failure (DeploymentConfigFetchException surfaces as a faulted
// Task) must reply Failed with a "Communication failure:" message and must NOT
// apply anything — no config persisted, no Instance Actor created.
var fetcher = new FakeConfigFetcher(
new DeploymentConfigFetchException("central unreachable", isSuperseded: false));
var actor = CreateDeploymentManager(configFetcher: fetcher);
await Task.Delay(500);
actor.Tell(new RefreshDeploymentCommand(
"dep-fetch-fail", "UnfetchedPump", "sha256:x", "admin", DateTimeOffset.UtcNow,
"http://central:9000", "tok-x"));
var response = ExpectMsg<DeploymentStatusResponse>(TimeSpan.FromSeconds(5));
Assert.Equal(DeploymentStatus.Failed, response.Status);
Assert.Equal("UnfetchedPump", response.InstanceUniqueName);
Assert.NotNull(response.ErrorMessage);
Assert.StartsWith("Communication failure:", response.ErrorMessage!);
// No apply occurred: nothing was persisted for the instance.
await Task.Delay(500);
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "UnfetchedPump");
}
[Fact]
public void RefreshDeployment_NullFetcher_RepliesFailed()
{
// A node with no configured fetcher (the `_configFetcher is null` guard) must reply
// Failed rather than NRE-crashing or silently dropping the central Ask.
var actor = CreateDeploymentManager(); // no configFetcher
actor.Tell(new RefreshDeploymentCommand(
"dep-null", "Pump", "sha256:x", "admin", DateTimeOffset.UtcNow,
"http://central:9000", "tok"));
var response = ExpectMsg<DeploymentStatusResponse>(TimeSpan.FromSeconds(2));
Assert.Equal(DeploymentStatus.Failed, response.Status);
Assert.Equal("Pump", response.InstanceUniqueName);
}
[Fact]
public async Task RefreshDeployment_ReplyGoesToOriginalSender_AcrossAsyncFetch()
{
// The reply must reach the ORIGINAL sender (central's Ask temp actor), proving
// the captured replyTo survives the async fetch + PipeTo continuation — where
// Akka's Sender is no longer valid. Send with an explicit probe as the sender
// and assert the probe (not the default test actor) receives the response.
var fetcher = new FakeConfigFetcher(MakeConfigJson("SenderPump"));
var probe = CreateTestProbe();
var actor = CreateDeploymentManager(configFetcher: fetcher);
await Task.Delay(500);
actor.Tell(new RefreshDeploymentCommand(
"dep-sender", "SenderPump", "sha256:s", "admin", DateTimeOffset.UtcNow,
"http://central:9000", "tok-s"), probe.Ref);
var response = probe.ExpectMsg<DeploymentStatusResponse>(TimeSpan.FromSeconds(5));
Assert.Equal(DeploymentStatus.Success, response.Status);
Assert.Equal("SenderPump", response.InstanceUniqueName);
}
/// <summary>
/// In-test fake <see cref="IDeploymentConfigFetcher"/>: returns a canned config JSON
/// (notify-and-fetch success) or throws a canned exception (fetch failure), and records
/// the fetch coordinates it was called with. It is async so a throw surfaces as a faulted
/// <see cref="Task"/> — mirroring the real HttpDeploymentConfigFetcher; a synchronous
/// throw would instead crash the actor before the ContinueWith/PipeTo could produce a
/// RefreshFetchFailed.
/// </summary>
private sealed class FakeConfigFetcher : IDeploymentConfigFetcher
{
private readonly string? _result;
private readonly Exception? _error;
public FakeConfigFetcher(string result) => _result = result;
public FakeConfigFetcher(Exception error) => _error = error;
public string? LastBaseUrl { get; private set; }
public string? LastDeploymentId { get; private set; }
public string? LastToken { get; private set; }
public async Task<string> FetchAsync(
string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct)
{
LastBaseUrl = centralFetchBaseUrl;
LastDeploymentId = deploymentId;
LastToken = token;
await Task.Yield();
if (_error != null)
throw _error;
return _result!;
}
}
}