diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index 0d3aa801..be304872 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -839,6 +839,12 @@ akka {{ _logger.LogInformation("SiteReplicationActor created and S&F replication handler wired"); + // Notify-and-fetch (Task 10): the active singleton fetches a deployment's + // flattened config from central over HTTP when a RefreshDeploymentCommand + // arrives. Resolve the fetcher from the same provider the actor already uses. + var deploymentConfigFetcher = + _serviceProvider.GetService(); + // Create the Deployment Manager as a cluster singleton var singletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new DeploymentManagerActor( @@ -851,7 +857,9 @@ akka {{ dclManager, replicationActor, siteHealthCollector, - _serviceProvider)), + _serviceProvider, + null, + deploymentConfigFetcher)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithRole(siteRole) diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs index 4717e951..9927cc4c 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs @@ -13,6 +13,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Messages.ScriptExecution; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; using ZB.MOM.WW.ScadaBridge.SiteEventLogging; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; @@ -50,6 +51,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers private readonly IActorRef? _replicationActor; private readonly ISiteHealthCollector? _healthCollector; private readonly IServiceProvider? _serviceProvider; + /// + /// Notify-and-fetch (Task 10): fetches a deployment's flattened config from central + /// over HTTP when a arrives. Optional — null on + /// nodes/tests that never receive a refresh; the active site path supplies it via DI. + /// + private readonly IDeploymentConfigFetcher? _configFetcher; private readonly Dictionary _instanceActors = new(); /// /// Tracks Instance Actors that are terminating as part of a redeployment, keyed by @@ -97,6 +104,11 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers /// Optional site health collector. /// Optional service provider for resolving per-instance services. /// Optional logger factory for creating Instance Actor loggers. + /// + /// Optional notify-and-fetch config fetcher (Task 10). Required for the + /// path; null on nodes/tests that never + /// receive a refresh. + /// public DeploymentManagerActor( SiteStorageService storage, ScriptCompilationService compilationService, @@ -108,7 +120,8 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers IActorRef? replicationActor = null, ISiteHealthCollector? healthCollector = null, IServiceProvider? serviceProvider = null, - ILoggerFactory? loggerFactory = null) + ILoggerFactory? loggerFactory = null, + IDeploymentConfigFetcher? configFetcher = null) { _storage = storage; _compilationService = compilationService; @@ -119,6 +132,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers _replicationActor = replicationActor; _healthCollector = healthCollector; _serviceProvider = serviceProvider; + _configFetcher = configFetcher; _logger = logger; // SiteRuntime-015: reuse a single logger factory for all Instance Actors. // Prefer an explicitly injected factory, fall back to one resolved from @@ -129,11 +143,19 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers ?? Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance; // Lifecycle commands - Receive(HandleDeploy); + Receive(cmd => HandleDeploy(cmd, Sender)); Receive(HandleDisable); Receive(HandleEnable); Receive(HandleDelete); + // Notify-and-fetch (Task 10): central sends a small RefreshDeploymentCommand; + // the active singleton fetches the flattened config over HTTP, then reuses the + // existing apply path. The two internal results carry the fetched config (or the + // fetch error) back onto the actor thread along with the captured original sender. + Receive(HandleRefreshDeployment); + Receive(HandleRefreshFetched); + Receive(HandleRefreshFetchFailed); + // DeploymentManager-006: query-the-site-before-redeploy idempotency. // Central asks for the instance's currently-applied deployment identity // before re-sending a deployment whose prior record is stuck InProgress @@ -356,7 +378,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers /// terminating child is watched and the in-flight command is buffered until the /// signal arrives. /// - private void HandleDeploy(DeployInstanceCommand command) + /// + /// The actor to reply to with the eventual . + /// Passed explicitly (rather than read from ) so the + /// notify-and-fetch path () can supply the ORIGINAL + /// central sender after the async config fetch, where Sender is no longer valid. + /// The redeploy-buffer path carries it on so the buffered + /// apply still replies to the right actor. + /// + private void HandleDeploy(DeployInstanceCommand command, IActorRef replyTo) { var instanceName = command.InstanceUniqueName; _logger.LogInformation( @@ -369,7 +399,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers if (_instanceActors.TryGetValue(instanceName, out var existing)) { _instanceActors.Remove(instanceName); - _pendingRedeploys[existing] = new PendingRedeploy(command, Sender); + _pendingRedeploys[existing] = new PendingRedeploy(command, replyTo); _terminatingActorsByName[instanceName] = existing; Context.Watch(existing); Context.Stop(existing); @@ -396,12 +426,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers $"superseded by newer deployment {command.DeploymentId} before predecessor finished terminating", DateTimeOffset.UtcNow)); } - _pendingRedeploys[terminatingRef] = new PendingRedeploy(command, Sender); + _pendingRedeploys[terminatingRef] = new PendingRedeploy(command, replyTo); return; } // Fresh deployment — no existing actor to replace. - ApplyDeployment(command, Sender, isRedeploy: false); + ApplyDeployment(command, replyTo, isRedeploy: false); } /// @@ -423,6 +453,72 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers ApplyDeployment(pending.Command, pending.OriginalSender, isRedeploy: true); } + /// + /// Notify-and-fetch (Task 10): handles a small central→site + /// . Fetches the deployment's flattened config + /// from central over HTTP via , then pipes the + /// result back to self so the existing apply path runs on the actor thread with the + /// ORIGINAL sender preserved (the central Ask's temp actor). The reply is the existing + /// , so the central deploy completes unchanged. + /// + private void HandleRefreshDeployment(RefreshDeploymentCommand cmd) + { + // Capture the Ask temp-actor sender BEFORE the async continuation: Akka's Sender + // is only valid during synchronous message handling and is no longer the original + // sender once the ContinueWith/PipeTo continuation runs on a thread-pool thread. + var replyTo = Sender; + + if (_configFetcher is null) + { + replyTo.Tell(new DeploymentStatusResponse( + cmd.DeploymentId, cmd.InstanceUniqueName, DeploymentStatus.Failed, + "Deployment config fetcher not available on this node.", DateTimeOffset.UtcNow)); + return; + } + + _logger.LogInformation( + "Fetching config for deployment {DeploymentId} instance {Instance} (notify-and-fetch)", + cmd.DeploymentId, cmd.InstanceUniqueName); + + // CancellationToken.None: the fetch is bounded by HttpClient.Timeout. On a singleton + // handover mid-fetch the PipeTo lands in dead letters and the central Ask times out + // (then reconciles) — acceptable, rare. + _configFetcher.FetchAsync(cmd.CentralFetchBaseUrl, cmd.DeploymentId, cmd.FetchToken, CancellationToken.None) + .ContinueWith(t => t.IsCompletedSuccessfully + ? (object)new RefreshFetched(cmd, t.Result, replyTo) + : new RefreshFetchFailed(cmd, t.Exception?.GetBaseException().Message ?? "fetch failed", replyTo)) + .PipeTo(Self); + } + + /// + /// Notify-and-fetch (Task 10): the config fetch succeeded — reconstruct the in-process + /// apply DTO and reuse the existing apply path, + /// threading the original central sender through so the + /// reaches it. + /// + private void HandleRefreshFetched(RefreshFetched msg) + { + var command = new DeployInstanceCommand( + msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Cmd.RevisionHash, + msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp); + HandleDeploy(command, msg.ReplyTo); + } + + /// + /// Notify-and-fetch (Task 10): the config fetch failed — report + /// to the original central sender so the deploy + /// completes (rather than the central Ask hanging to timeout). Nothing is applied. + /// + private void HandleRefreshFetchFailed(RefreshFetchFailed msg) + { + _logger.LogError( + "Config fetch failed for deployment {DeploymentId} instance {Instance}: {Error}", + msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Error); + msg.ReplyTo.Tell(new DeploymentStatusResponse( + msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, DeploymentStatus.Failed, + $"Communication failure: {msg.Error}", DateTimeOffset.UtcNow)); + } + /// /// Creates the Instance Actor, persists the config, and replies to the deployer. /// A redeployment is an update of an existing instance, so the deployed-instance @@ -1590,6 +1686,19 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers /// internal record PendingRedeploy(DeployInstanceCommand Command, IActorRef OriginalSender); + /// + /// Notify-and-fetch (Task 10): piped back to self when the deployment's flattened + /// config has been fetched from central. Carries the original central sender so the + /// reused apply path replies to it. + /// + private sealed record RefreshFetched(RefreshDeploymentCommand Cmd, string ConfigJson, IActorRef ReplyTo); + + /// + /// Notify-and-fetch (Task 10): piped back to self when the config fetch failed. + /// Carries the original central sender so a Failed status is reported to it. + /// + private sealed record RefreshFetchFailed(RefreshDeploymentCommand Cmd, string Error, IActorRef ReplyTo); + /// /// SiteRuntime-021: internal message dispatched from /// 's off-thread persistence task back diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs index 9e822a4b..1c6925a4 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs @@ -10,6 +10,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.TestSupport; @@ -48,7 +49,8 @@ public class DeploymentManagerActorTests : TestKit, IDisposable } private IActorRef CreateDeploymentManager( - SiteRuntimeOptions? options = null, IServiceProvider? serviceProvider = null) + SiteRuntimeOptions? options = null, IServiceProvider? serviceProvider = null, + IDeploymentConfigFetcher? configFetcher = null) { options ??= new SiteRuntimeOptions(); return ActorOf(Props.Create(() => new DeploymentManagerActor( @@ -62,7 +64,8 @@ public class DeploymentManagerActorTests : TestKit, IDisposable null, null, serviceProvider, - null))); + null, + configFetcher))); } private static string MakeConfigJson(string instanceName) @@ -675,4 +678,132 @@ public class DeploymentManagerActorTests : TestKit, IDisposable Assert.False(snapshot.InstanceNotFound, "A deployed (but empty) instance must NOT set InstanceNotFound."); } + + // ── Task 10: notify-and-fetch — RefreshDeploymentCommand → fetch → apply ── + + [Fact] + public async Task RefreshDeployment_FetchSucceeds_AppliesConfigAndRepliesSuccess() + { + // The active singleton receives a small RefreshDeploymentCommand, fetches the + // flattened config over HTTP via IDeploymentConfigFetcher, then runs the existing + // apply path: the config is persisted to SQLite and a Success status is replied. + var fetcher = new FakeConfigFetcher(MakeConfigJson("FetchedPump")); + var actor = CreateDeploymentManager(configFetcher: fetcher); + await Task.Delay(500); // empty startup + + actor.Tell(new RefreshDeploymentCommand( + "dep-fetch-1", "FetchedPump", "sha256:fetch", "admin", DateTimeOffset.UtcNow, + "http://central:9000", "tok-123")); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(DeploymentStatus.Success, response.Status); + Assert.Equal("FetchedPump", response.InstanceUniqueName); + Assert.Equal("dep-fetch-1", response.DeploymentId); + + // The fetcher was called with the command's fetch coordinates. + Assert.Equal("http://central:9000", fetcher.LastBaseUrl); + Assert.Equal("dep-fetch-1", fetcher.LastDeploymentId); + Assert.Equal("tok-123", fetcher.LastToken); + + // The existing apply path ran end-to-end: the fetched config is persisted. + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Contains(configs, c => c.InstanceUniqueName == "FetchedPump"); + } + + [Fact] + public async Task RefreshDeployment_FetchFails_RepliesFailedAndDoesNotApply() + { + // A config fetch failure (DeploymentConfigFetchException surfaces as a faulted + // Task) must reply Failed with a "Communication failure:" message and must NOT + // apply anything — no config persisted, no Instance Actor created. + var fetcher = new FakeConfigFetcher( + new DeploymentConfigFetchException("central unreachable", isSuperseded: false)); + var actor = CreateDeploymentManager(configFetcher: fetcher); + await Task.Delay(500); + + actor.Tell(new RefreshDeploymentCommand( + "dep-fetch-fail", "UnfetchedPump", "sha256:x", "admin", DateTimeOffset.UtcNow, + "http://central:9000", "tok-x")); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(DeploymentStatus.Failed, response.Status); + Assert.Equal("UnfetchedPump", response.InstanceUniqueName); + Assert.NotNull(response.ErrorMessage); + Assert.StartsWith("Communication failure:", response.ErrorMessage!); + + // No apply occurred: nothing was persisted for the instance. + await Task.Delay(500); + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "UnfetchedPump"); + } + + [Fact] + public void RefreshDeployment_NullFetcher_RepliesFailed() + { + // A node with no configured fetcher (the `_configFetcher is null` guard) must reply + // Failed rather than NRE-crashing or silently dropping the central Ask. + var actor = CreateDeploymentManager(); // no configFetcher + + actor.Tell(new RefreshDeploymentCommand( + "dep-null", "Pump", "sha256:x", "admin", DateTimeOffset.UtcNow, + "http://central:9000", "tok")); + + var response = ExpectMsg(TimeSpan.FromSeconds(2)); + Assert.Equal(DeploymentStatus.Failed, response.Status); + Assert.Equal("Pump", response.InstanceUniqueName); + } + + [Fact] + public async Task RefreshDeployment_ReplyGoesToOriginalSender_AcrossAsyncFetch() + { + // The reply must reach the ORIGINAL sender (central's Ask temp actor), proving + // the captured replyTo survives the async fetch + PipeTo continuation — where + // Akka's Sender is no longer valid. Send with an explicit probe as the sender + // and assert the probe (not the default test actor) receives the response. + var fetcher = new FakeConfigFetcher(MakeConfigJson("SenderPump")); + var probe = CreateTestProbe(); + var actor = CreateDeploymentManager(configFetcher: fetcher); + await Task.Delay(500); + + actor.Tell(new RefreshDeploymentCommand( + "dep-sender", "SenderPump", "sha256:s", "admin", DateTimeOffset.UtcNow, + "http://central:9000", "tok-s"), probe.Ref); + + var response = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(DeploymentStatus.Success, response.Status); + Assert.Equal("SenderPump", response.InstanceUniqueName); + } + + /// + /// In-test fake : returns a canned config JSON + /// (notify-and-fetch success) or throws a canned exception (fetch failure), and records + /// the fetch coordinates it was called with. It is async so a throw surfaces as a faulted + /// — mirroring the real HttpDeploymentConfigFetcher; a synchronous + /// throw would instead crash the actor before the ContinueWith/PipeTo could produce a + /// RefreshFetchFailed. + /// + private sealed class FakeConfigFetcher : IDeploymentConfigFetcher + { + private readonly string? _result; + private readonly Exception? _error; + + public FakeConfigFetcher(string result) => _result = result; + public FakeConfigFetcher(Exception error) => _error = error; + + public string? LastBaseUrl { get; private set; } + public string? LastDeploymentId { get; private set; } + public string? LastToken { get; private set; } + + public async Task FetchAsync( + string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct) + { + LastBaseUrl = centralFetchBaseUrl; + LastDeploymentId = deploymentId; + LastToken = token; + await Task.Yield(); + if (_error != null) + throw _error; + return _result!; + } + } }