diff --git a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs index 20f669cb..585f42e0 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs @@ -9,6 +9,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle; using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; using ZB.MOM.WW.ScadaBridge.Communication; @@ -25,7 +26,7 @@ namespace ZB.MOM.WW.ScadaBridge.DeploymentManager; /// 3. Flatten configuration via TemplateEngine (captures template state at time of flatten -- WP-16) /// 4. Validate flattened configuration /// 5. Compute revision hash and diff -/// 6. Send DeployInstanceCommand to site via CommunicationService +/// 6. Stage a PendingDeployment row + send RefreshDeploymentCommand (notify-and-fetch; site fetches the config over HTTP) /// 7. Track deployment status with optimistic concurrency (WP-4) /// 8. Store deployed config snapshot (WP-8) /// 9. Audit log all actions @@ -45,6 +46,7 @@ public class DeploymentService private readonly RevisionHashService _revisionHashService; private readonly IDeploymentStatusNotifier _statusNotifier; private readonly DeploymentManagerOptions _options; + private readonly CommunicationOptions _commOptions; private readonly ILogger _logger; /// @@ -72,6 +74,13 @@ public class DeploymentService /// /// Notifier for pushing deployment status changes to the UI. /// Deployment manager configuration options. + /// + /// Central-site communication options. The notify-and-fetch deploy path reads + /// (carried in the + /// RefreshDeploymentCommand so sites need no standing config) and + /// (the staged + /// PendingDeployment row's lifetime). + /// /// Logger instance. public DeploymentService( IDeploymentManagerRepository repository, @@ -84,6 +93,7 @@ public class DeploymentService RevisionHashService revisionHashService, IDeploymentStatusNotifier statusNotifier, IOptions options, + IOptions communicationOptions, ILogger logger) { _repository = repository; @@ -96,6 +106,7 @@ public class DeploymentService _revisionHashService = revisionHashService; _statusNotifier = statusNotifier; _options = options.Value; + _commOptions = communicationOptions.Value; _logger = logger; } @@ -209,6 +220,16 @@ public class DeploymentService if (reconciled != null) return Result.Success(reconciled); + // Notify-and-fetch: the site fetches the staged config from + // CentralFetchBaseUrl, so a deploy is impossible without it. Fail fast + // here — BEFORE creating an InProgress record or staging a pending row — + // so the operator sees a clear configuration error instead of a + // confusing downstream site-fetch failure (and no InProgress record is + // stranded). + if (string.IsNullOrEmpty(_commOptions.CentralFetchBaseUrl)) + return Result.Failure( + "CentralFetchBaseUrl is not configured — required for deployment (notify-and-fetch)."); + // WP-4: Create the deployment record directly in InProgress. // // DeploymentManager-022: the previous code wrote the record as Pending, @@ -236,16 +257,39 @@ public class DeploymentService try { - // WP-1: Send to site via CommunicationService + // Notify-and-fetch: instead of shipping the (potentially oversized, + // silently-dropped >128 KB) flattened config inline in a + // DeployInstanceCommand, stage it in a PendingDeployment row and send + // a small RefreshDeploymentCommand. The site fetches the config from + // CentralFetchBaseUrl over HTTP using the per-deployment fetch token. var siteId = await ResolveSiteIdentifierAsync(instance.SiteId, cancellationToken); - var command = new DeployInstanceCommand( - deploymentId, instance.UniqueName, revisionHash, configJson, user, DateTimeOffset.UtcNow); + + var token = DeploymentFetchToken.Generate(); + var stagedAt = DateTimeOffset.UtcNow; + await _repository.AddPendingDeploymentAsync(new PendingDeployment( + deploymentId, instanceId, revisionHash, configJson, token, + stagedAt, stagedAt + _commOptions.PendingDeploymentTtl), cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + var command = new RefreshDeploymentCommand( + deploymentId, instance.UniqueName, revisionHash, user, stagedAt, + _commOptions.CentralFetchBaseUrl, token); _logger.LogInformation( - "Sending deployment {DeploymentId} for instance {Instance} to site {SiteId}", + "Sending deployment {DeploymentId} for instance {Instance} to site {SiteId} (notify-and-fetch)", deploymentId, instance.UniqueName, siteId); - var response = await _communicationService.DeployInstanceAsync(siteId, command, cancellationToken); + // Cleanup of the staged PendingDeployment is TTL-based ONLY — the row + // is deliberately NOT deleted on success or in the catch. On a + // central-side Ask timeout the site may have applied AND told the + // standby node to fetch; deleting now would 404 that in-flight + // standby fetch and break failover. Supersession bounds pending rows + // to ≤1 per instance and the fetch endpoint enforces the TTL, so + // leaving rows for TTL purge is safe. + // TODO(notify-and-fetch): wire PurgeExpiredPendingDeploymentsAsync + // into a central maintenance cadence (none exists in DeploymentManager + // today; deferred — supersession + endpoint TTL keep this safe). + var response = await _communicationService.RefreshDeploymentAsync(siteId, command, cancellationToken); // WP-1: Update status based on site response. record.Status = response.Status; @@ -320,6 +364,12 @@ public class DeploymentService try { await _repository.UpdateDeploymentRecordAsync(record, CancellationToken.None); + // Note: if the staging SaveChangesAsync above was interrupted, an + // Added PendingDeployment may still be tracked and will be + // committed by this cleanup save. That row is orphaned (no + // RefreshDeploymentCommand was sent, so no site holds its token) + // and is removed by TTL purge / superseded by the next deploy -- + // harmless. await _repository.SaveChangesAsync(CancellationToken.None); NotifyStatusChange(record); diff --git a/tests/ZB.MOM.WW.ScadaBridge.CentralUI.Tests/TopologyPageTests.cs b/tests/ZB.MOM.WW.ScadaBridge.CentralUI.Tests/TopologyPageTests.cs index 756fb69e..f55efa4d 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.CentralUI.Tests/TopologyPageTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.CentralUI.Tests/TopologyPageTests.cs @@ -58,6 +58,9 @@ public class TopologyPageTests : BunitContext { OperationLockTimeout = TimeSpan.FromSeconds(5) })); + // Notify-and-fetch (Task 6): DeploymentService now also depends on + // IOptions; register it so the page's DI graph resolves. + Services.AddSingleton(Options.Create(new CommunicationOptions())); Services.AddSingleton>(NullLogger.Instance); // DeploymentService gained a DiffService dependency (DeploymentManager // contract change); register it so the page's DI graph resolves. diff --git a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs index c6331a8b..00a3e22e 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs @@ -54,9 +54,35 @@ public class DeploymentServiceTests : TestKit new RevisionHashService(), new DeploymentStatusNotifier(NullLogger.Instance), options, + DeployCommOptions(), NullLogger.Instance); } + /// + /// Notify-and-fetch (Task 6): the central HTTP base URL carried in the + /// RefreshDeploymentCommand. Must be non-empty for the deploy path to + /// clear the fail-fast configuration guard. + /// + private const string TestFetchBaseUrl = "https://central.test:9000"; + + /// How long a staged PendingDeployment stays valid in these tests. + private static readonly TimeSpan TestPendingTtl = TimeSpan.FromMinutes(5); + + /// + /// Builds the the notify-and-fetch deploy + /// path reads: a non-empty + /// (so the fail-fast guard passes) and a fixed + /// . Pass an empty base + /// URL to exercise the fail-fast path. + /// + private static IOptions DeployCommOptions( + string centralFetchBaseUrl = TestFetchBaseUrl) => + Options.Create(new CommunicationOptions + { + CentralFetchBaseUrl = centralFetchBaseUrl, + PendingDeploymentTtl = TestPendingTtl + }); + // ── WP-1: Deployment flow ── [Fact] @@ -139,6 +165,7 @@ public class DeploymentServiceTests : TestKit new RevisionHashService(), new DeploymentStatusNotifier(NullLogger.Instance), Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }), + DeployCommOptions(), NullLogger.Instance); var result = await service.DeployInstanceAsync(99, "admin"); @@ -924,9 +951,13 @@ public class DeploymentServiceTests : TestKit /// /// Builds a DeploymentService whose CommunicationService is backed by the - /// supplied actor, so the site query and deploy commands can be observed. + /// supplied actor, so the site query and deploy notifies can be observed. + /// seeds the DeploymentService's own + /// ; pass "" to drive + /// the notify-and-fetch fail-fast guard. /// - private DeploymentService CreateServiceWithCommActor(IActorRef commActor) + private DeploymentService CreateServiceWithCommActor( + IActorRef commActor, string centralFetchBaseUrl = TestFetchBaseUrl) { var comms = new CommunicationService( Options.Create(new CommunicationOptions @@ -944,6 +975,7 @@ public class DeploymentServiceTests : TestKit new RevisionHashService(), new DeploymentStatusNotifier(NullLogger.Instance), Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }), + DeployCommOptions(centralFetchBaseUrl), NullLogger.Instance); } @@ -1159,6 +1191,127 @@ public class DeploymentServiceTests : TestKit Assert.Equal(1, counters.DeployCount); } + // ── Task 6: notify-and-fetch send path (stage PendingDeployment + send RefreshDeploymentCommand) ── + + [Fact] + public async Task DeployInstanceAsync_Success_StagesPendingConfigThenSendsRefreshNotify() + { + // The central send path must stage the flattened config in a + // PendingDeployment row and then notify the site with a small + // RefreshDeploymentCommand (NOT ship the fat config inline), so the site + // fetches the config over HTTP. The staged token + fetch base URL + + // deployment id must travel in the notify. + var instance = new Instance("NotifyFetchInst") + { + Id = 80, SiteId = 1, State = InstanceState.NotDeployed + }; + _repo.GetInstanceByIdAsync(80, Arg.Any()).Returns(instance); + SetupValidPipeline(80, "NotifyFetchInst", "sha256:target"); + _repo.GetCurrentDeploymentStatusAsync(80, Arg.Any()) + .Returns((DeploymentRecord?)null); + + PendingDeployment? staged = null; + await _repo.AddPendingDeploymentAsync( + Arg.Do(p => staged = p), Arg.Any()); + + var counters = new ReconcileProbeCounters(); + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(80, "admin"); + + Assert.True(result.IsSuccess); + + // A PendingDeployment was staged with the deploy's identity + config. + Assert.NotNull(staged); + Assert.Equal(80, staged!.InstanceId); + Assert.Equal("sha256:target", staged.RevisionHash); + Assert.False(string.IsNullOrEmpty(staged.DeploymentId)); + Assert.False(string.IsNullOrEmpty(staged.Token)); + Assert.False(string.IsNullOrEmpty(staged.ConfigurationJson)); + // The TTL window is createdAt + CommunicationOptions.PendingDeploymentTtl. + Assert.Equal(TestPendingTtl, staged.ExpiresAtUtc - staged.CreatedAtUtc); + + // The site was notified via RefreshDeploymentCommand (the deploy path no + // longer ships DeployInstanceCommand), carrying the SAME deployment id, + // the configured fetch base URL, and the staged fetch token. + Assert.Equal(1, counters.DeployCount); + Assert.NotNull(counters.LastRefresh); + Assert.Equal(staged.DeploymentId, counters.LastRefresh!.DeploymentId); + Assert.Equal("NotifyFetchInst", counters.LastRefresh.InstanceUniqueName); + Assert.Equal("sha256:target", counters.LastRefresh.RevisionHash); + Assert.Equal(TestFetchBaseUrl, counters.LastRefresh.CentralFetchBaseUrl); + Assert.Equal(staged.Token, counters.LastRefresh.FetchToken); + + // The staged config was committed before/with the notify. + await _repo.Received().AddPendingDeploymentAsync( + Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task DeployInstanceAsync_Success_DoesNotDeletePendingRow() + { + // Cleanup is TTL-based only: deleting the pending row on success would + // 404 a standby node whose HTTP fetch is still in flight after a + // central-side Ask timeout. The row must be left for TTL purge. + var instance = new Instance("NoDeleteInst") + { + Id = 81, SiteId = 1, State = InstanceState.NotDeployed + }; + _repo.GetInstanceByIdAsync(81, Arg.Any()).Returns(instance); + SetupValidPipeline(81, "NoDeleteInst", "sha256:target"); + _repo.GetCurrentDeploymentStatusAsync(81, Arg.Any()) + .Returns((DeploymentRecord?)null); + + var counters = new ReconcileProbeCounters(); + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(81, "admin"); + + Assert.True(result.IsSuccess); + // The pending row is NOT deleted on success — TTL purge owns cleanup. + await _repo.DidNotReceive().DeletePendingDeploymentByIdAsync( + Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task DeployInstanceAsync_CentralFetchBaseUrlEmpty_FailsFastWithoutNotifyOrStaging() + { + // Notify-and-fetch requires CentralFetchBaseUrl. With it empty the deploy + // must fail fast with a clear configuration error BEFORE staging a + // pending row, creating a deployment record, or notifying the site — so + // the operator never sees a confusing downstream site-fetch error. + var instance = new Instance("NoFetchUrlInst") + { + Id = 82, SiteId = 1, State = InstanceState.NotDeployed + }; + _repo.GetInstanceByIdAsync(82, Arg.Any()).Returns(instance); + SetupValidPipeline(82, "NoFetchUrlInst", "sha256:target"); + _repo.GetCurrentDeploymentStatusAsync(82, Arg.Any()) + .Returns((DeploymentRecord?)null); + + var counters = new ReconcileProbeCounters(); + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor, centralFetchBaseUrl: ""); + + var result = await service.DeployInstanceAsync(82, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("CentralFetchBaseUrl", result.Error); + + // No site notify, no staged config, no deployment record. + Assert.Equal(0, counters.DeployCount); + Assert.Null(counters.LastRefresh); + await _repo.DidNotReceive().AddPendingDeploymentAsync( + Arg.Any(), Arg.Any()); + await _repo.DidNotReceive().AddDeploymentRecordAsync( + Arg.Any(), Arg.Any()); + } + // ── DeploymentManager-015: reconciliation must perform the normal success side effects ── [Fact] @@ -1386,6 +1539,7 @@ public class DeploymentServiceTests : TestKit OperationLockTimeout = TimeSpan.FromSeconds(5), LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300) }), + DeployCommOptions(), NullLogger.Instance); var sw = System.Diagnostics.Stopwatch.StartNew(); @@ -1443,6 +1597,7 @@ public class DeploymentServiceTests : TestKit OperationLockTimeout = TimeSpan.FromSeconds(5), LifecycleCommandTimeout = deadline, }), + DeployCommOptions(), NullLogger.Instance); var result = await service.DisableInstanceAsync(61, "operator-jane"); @@ -1492,6 +1647,7 @@ public class DeploymentServiceTests : TestKit OperationLockTimeout = TimeSpan.FromSeconds(5), LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300), }), + DeployCommOptions(), NullLogger.Instance); var result = await service.EnableInstanceAsync(62, "operator-jane"); @@ -1538,6 +1694,7 @@ public class DeploymentServiceTests : TestKit OperationLockTimeout = TimeSpan.FromSeconds(5), LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300), }), + DeployCommOptions(), NullLogger.Instance); var result = await service.DeleteInstanceAsync(63, "operator-jane"); @@ -1616,6 +1773,9 @@ public class DeploymentServiceTests : TestKit /// defers each deploy reply via the scheduler, so if two deploys for the /// same instance were NOT serialized by the operation lock their windows /// would overlap and MaxConcurrent would exceed 1. + /// + /// Notify-and-fetch (Task 6): the deploy path now sends a + /// instead of DeployInstanceCommand. /// private class SerializationProbeActor : ReceiveActor, IWithTimers { @@ -1625,7 +1785,7 @@ public class DeploymentServiceTests : TestKit { Receive(env => { - if (env.Message is DeployInstanceCommand d) + if (env.Message is RefreshDeploymentCommand d) { lock (counters.Gate) { @@ -1661,7 +1821,7 @@ public class DeploymentServiceTests : TestKit }); } - private sealed record CompleteDeploy(DeployInstanceCommand Command, IActorRef ReplyTo); + private sealed record CompleteDeploy(RefreshDeploymentCommand Command, IActorRef ReplyTo); } /// @@ -1673,14 +1833,27 @@ public class DeploymentServiceTests : TestKit { public int QueryCount; public int DeployCount; + + /// + /// Notify-and-fetch (Task 6): the most recent + /// the probe received, so a test + /// can assert the deploy path notified the site with the staged token, + /// fetch base URL, and deployment id. + /// + public RefreshDeploymentCommand? LastRefresh; } /// /// Stand-in CentralCommunicationActor for reconciliation tests. Counts the - /// site queries and deploy commands it receives (into a per-test + /// site queries and deploy notifies it receives (into a per-test /// instance), answers queries with a /// configurable applied revision hash, and (optionally) drops the query to /// simulate an unreachable site so the central Ask times out. + /// + /// Notify-and-fetch (Task 6): the deploy path now sends a + /// (not the fat + /// DeployInstanceCommand), so the probe counts that message as a + /// "deploy" and records it for assertion. /// private class ReconcileProbeActor : ReceiveActor { @@ -1701,8 +1874,9 @@ public class DeploymentServiceTests : TestKit // failQuery: drop the message -> caller's Ask times out. break; - case DeployInstanceCommand d: + case RefreshDeploymentCommand d: Interlocked.Increment(ref counters.DeployCount); + counters.LastRefresh = d; Sender.Tell(new DeploymentStatusResponse( d.DeploymentId, d.InstanceUniqueName, DeploymentStatus.Success, null, DateTimeOffset.UtcNow)); diff --git a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentStatusNotifierTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentStatusNotifierTests.cs index 52d82f9a..ba975448 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentStatusNotifierTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentStatusNotifierTests.cs @@ -61,6 +61,10 @@ public class DeploymentStatusNotifierTests : TestKit _service = new DeploymentService( _repo, siteRepo, _pipeline, _comms, _lockManager, _audit, new DiffService(), new RevisionHashService(), _notifier, options, + // Notify-and-fetch (Task 6): a non-empty CentralFetchBaseUrl lets the + // deploy path reach the send (and, here, the catch block) rather than + // tripping the fail-fast configuration guard. + Options.Create(new CommunicationOptions { CentralFetchBaseUrl = "https://central.test:9000" }), NullLogger.Instance); }