diff --git a/code-reviews/DeploymentManager/findings.md b/code-reviews/DeploymentManager/findings.md index 6ba548f..00d2d0d 100644 --- a/code-reviews/DeploymentManager/findings.md +++ b/code-reviews/DeploymentManager/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 12 | +| Open findings | 11 | ## Summary @@ -231,7 +231,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:84-200,363-368` | **Description** @@ -261,19 +261,32 @@ stale-rejection. **Resolution** -_Unresolved._ Finding confirmed valid against the source — `GetDeploymentStatusAsync` -only reads the local `DeploymentRecord` via `GetDeploymentByDeploymentIdAsync`, -and `DeployInstanceAsync` unconditionally generates a new deployment ID with no -site reconciliation. Left Open: a proper fix is a cross-module new feature, not -a bug fix scoped to `ScadaLink.DeploymentManager`. It requires (1) a new -request/response message contract in `ScadaLink.Commons`, (2) a new -`CommunicationService` query method in `ScadaLink.Communication`, and (3) -site-side handling of the query — all outside the DeploymentManager module — plus -a design decision on the query protocol. The reconciliation logic in -`DeploymentService` cannot be implemented without those. Recommend tracking as a -dedicated cross-module feature work item (or, alternatively, amending the design -doc to delegate reconciliation entirely to site-side stale-rejection — also -outside this module's editable scope). +Resolved 2026-05-16 (commit ``): implemented the cross-module +query-the-site-before-redeploy idempotency feature across Commons, SiteRuntime, +Communication, and DeploymentManager — new `DeploymentStateQueryRequest` / +`DeploymentStateQueryResponse` contracts, a `DeploymentManagerActor` handler +answering from the site's deployed-config store, a +`CommunicationService.QueryDeploymentStateAsync` method routed over the +ClusterClient command/control transport, and reconciliation in +`DeployInstanceAsync` (`TryReconcileWithSiteAsync`) that queries the site only +when a prior record is `InProgress` or `Failed` due to a timeout, marks the +prior record `Success` without re-sending if the site already has the target +revision hash, and falls through to a normal deploy (relying on site-side +stale-rejection) when the query fails. Regression tests: +`RoundTrip_DeploymentStateQueryRequest_Succeeds`, +`RoundTrip_DeploymentStateQueryResponse_Deployed_Succeeds`, +`RoundTrip_DeploymentStateQueryResponse_NotDeployed_NullApplied`, +`DeploymentStateQuery_DeployedInstance_ReturnsAppliedIdentity`, +`DeploymentStateQuery_UnknownInstance_ReturnsNotDeployed`, +`DeploymentStateQuery_ForwardedToDeploymentManager`, +`QueryDeploymentStateAsync_BeforeInitialization_Throws`, +`QueryDeploymentStateAsync_SendsEnvelopeAndReturnsResponse`, +`DeployInstanceAsync_PriorInProgressRecord_SiteHasTargetHash_MarksSuccessWithoutRedeploy`, +`DeployInstanceAsync_PriorInProgressRecord_SiteHasDifferentHash_ProceedsWithDeploy`, +`DeployInstanceAsync_PriorFailedTimeoutRecord_QueriesSite`, +`DeployInstanceAsync_PriorSuccessRecord_SkipsSiteQuery`, +`DeployInstanceAsync_FreshFirstTimeDeploy_SkipsSiteQuery`, +`DeployInstanceAsync_PriorInProgressRecord_QueryFails_FallsThroughToDeploy`. ### DeploymentManager-007 — "Diff View" reduced to a hash comparison with no diff detail diff --git a/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryRequest.cs b/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryRequest.cs new file mode 100644 index 0000000..0144ee2 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryRequest.cs @@ -0,0 +1,13 @@ +namespace ScadaLink.Commons.Messages.Deployment; + +/// +/// Central→site query for the currently-applied deployment state of a single +/// instance. Issued by the Deployment Manager before a re-deploy when a prior +/// deployment record is stuck InProgress or Failed due to a +/// timeout, so the site's actual state can be reconciled against the target +/// revision before re-sending a deployment ("Deployment Identity & Idempotency"). +/// +public record DeploymentStateQueryRequest( + string CorrelationId, + string InstanceUniqueName, + DateTimeOffset Timestamp); diff --git a/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryResponse.cs b/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryResponse.cs new file mode 100644 index 0000000..59021b6 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Deployment/DeploymentStateQueryResponse.cs @@ -0,0 +1,15 @@ +namespace ScadaLink.Commons.Messages.Deployment; + +/// +/// Site→central response carrying the instance's currently-applied deployment +/// state. If is false the instance has no +/// deployed configuration at the site and / +/// are null. +/// +public record DeploymentStateQueryResponse( + string CorrelationId, + string InstanceUniqueName, + bool IsDeployed, + string? AppliedDeploymentId, + string? AppliedRevisionHash, + DateTimeOffset Timestamp); diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 1e8f0c3..0038e8f 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -76,6 +76,11 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); + // DeploymentManager-006: query-the-site-before-redeploy — forward to + // the Deployment Manager, which owns the deployed-config store and + // answers with the instance's currently-applied deployment identity. + Receive(msg => _deploymentManagerProxy.Forward(msg)); + // Pattern 3: Artifact Deployment — forward to artifact handler if registered Receive(msg => { diff --git a/src/ScadaLink.Communication/CommunicationService.cs b/src/ScadaLink.Communication/CommunicationService.cs index 38359ac..fdba5d4 100644 --- a/src/ScadaLink.Communication/CommunicationService.cs +++ b/src/ScadaLink.Communication/CommunicationService.cs @@ -73,6 +73,26 @@ public class CommunicationService envelope, _options.DeploymentTimeout, cancellationToken); } + /// + /// DeploymentManager-006: queries a site for the currently-applied deployment + /// identity of a single instance. Used by the Deployment Manager before a + /// re-deploy to reconcile against the site's actual state. Sent over the + /// existing ClusterClient command/control transport; the Ask times out (no + /// central buffering) if the site is unreachable, and the caller falls + /// through to a normal deploy. + /// + public async Task QueryDeploymentStateAsync( + string siteId, DeploymentStateQueryRequest request, CancellationToken cancellationToken = default) + { + _logger.LogDebug( + "Sending DeploymentStateQueryRequest to site {SiteId}, instance={Instance}, correlationId={CorrelationId}", + siteId, request.InstanceUniqueName, request.CorrelationId); + + var envelope = new SiteEnvelope(siteId, request); + return await GetActor().Ask( + envelope, _options.QueryTimeout, cancellationToken); + } + // ── Pattern 2: Lifecycle ── public async Task DisableInstanceAsync( diff --git a/src/ScadaLink.DeploymentManager/DeploymentService.cs b/src/ScadaLink.DeploymentManager/DeploymentService.cs index 683bacf..9b8f031 100644 --- a/src/ScadaLink.DeploymentManager/DeploymentService.cs +++ b/src/ScadaLink.DeploymentManager/DeploymentService.cs @@ -43,6 +43,14 @@ public class DeploymentService private readonly DeploymentManagerOptions _options; private readonly ILogger _logger; + /// + /// Prefix written to when a + /// deployment fails because the site command timed out or was cancelled. + /// Used by the query-before-redeploy trigger (DeploymentManager-006) to tell + /// a timeout-induced failure apart from other deployment errors. + /// + private const string TimeoutFailurePrefix = "Communication failure:"; + public DeploymentService( IDeploymentManagerRepository repository, ISiteRepository siteRepository, @@ -118,6 +126,18 @@ public class DeploymentService return Result.Failure($"Pre-deployment validation failed: {errors}"); } + // DeploymentManager-006: query-the-site-before-redeploy idempotency. + // If a prior deployment for this instance is stuck InProgress or Failed + // due to a timeout, the site may have actually applied the config. Query + // the site for its currently-applied revision before re-sending so a + // duplicate deployment is not produced (design: "Deployment Identity & + // Idempotency"). A clean prior Success or a fresh first-time deploy + // skips this extra round-trip. + var reconciled = await TryReconcileWithSiteAsync( + instance, revisionHash, cancellationToken); + if (reconciled != null) + return Result.Success(reconciled); + // Serialize for transmission var configJson = JsonSerializer.Serialize(flattenedConfig); @@ -199,7 +219,7 @@ public class DeploymentService record.Status = DeploymentStatus.Failed; record.ErrorMessage = isTimeout - ? $"Communication failure: {ex.Message}" + ? $"{TimeoutFailurePrefix} {ex.Message}" : $"Deployment error: {ex.Message}"; record.CompletedAt = DateTimeOffset.UtcNow; @@ -401,6 +421,105 @@ public class DeploymentService return await _repository.GetDeploymentByDeploymentIdAsync(deploymentId, cancellationToken); } + /// + /// DeploymentManager-006: query-the-site-before-redeploy reconciliation. + /// + /// The site query is issued ONLY when a prior + /// for this instance is stuck , or + /// is due to a timeout — the only + /// cases where the site may have applied the config without central + /// learning of it. Fresh first-time deploys and redeploys after a clean + /// prior skip the extra round-trip. + /// + /// Reconciliation: if the site already has the TARGET revision hash, the + /// prior record is marked and + /// returned (the caller must NOT re-send the deploy). Otherwise null + /// is returned and the normal deploy proceeds. + /// + /// Query failure: if the site is unreachable or the query times out, this + /// returns null (fall through to a normal deploy) — site-side + /// stale-rejection of an older revision hash is the safety net. The deploy + /// is never aborted on a failed query. + /// + private async Task TryReconcileWithSiteAsync( + Instance instance, + string targetRevisionHash, + CancellationToken cancellationToken) + { + var prior = await _repository.GetCurrentDeploymentStatusAsync(instance.Id, cancellationToken); + if (prior == null || !ShouldQuerySiteBeforeRedeploy(prior)) + return null; + + DeploymentStateQueryResponse response; + try + { + var siteId = await ResolveSiteIdentifierAsync(instance.SiteId, cancellationToken); + var query = new DeploymentStateQueryRequest( + Guid.NewGuid().ToString("N"), instance.UniqueName, DateTimeOffset.UtcNow); + + _logger.LogInformation( + "Querying site {SiteId} for applied deployment state of instance {Instance} " + + "before re-deploy (prior record {DeploymentId} is {Status})", + siteId, instance.UniqueName, prior.DeploymentId, prior.Status); + + response = await _communicationService.QueryDeploymentStateAsync( + siteId, query, cancellationToken); + } + catch (Exception ex) + { + // Query failure (site unreachable / timeout): do NOT abort. Fall + // through to a normal deploy; site-side stale-rejection of an older + // revision hash is the safety net. + _logger.LogWarning(ex, + "Site query before re-deploy of instance {Instance} failed; " + + "proceeding with normal deploy (site-side stale-rejection is the safety net)", + instance.UniqueName); + return null; + } + + if (response.IsDeployed && + string.Equals(response.AppliedRevisionHash, targetRevisionHash, StringComparison.Ordinal)) + { + // The site already has the target revision — the prior deployment + // actually succeeded. Reconcile the stale record instead of + // re-sending the deploy. + _logger.LogInformation( + "Site already has target revision {RevisionHash} for instance {Instance}; " + + "marking prior deployment record {DeploymentId} Success without re-deploying", + targetRevisionHash, instance.UniqueName, prior.DeploymentId); + + prior.Status = DeploymentStatus.Success; + prior.ErrorMessage = null; + prior.CompletedAt = DateTimeOffset.UtcNow; + await _repository.UpdateDeploymentRecordAsync(prior, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + await _auditService.LogAsync(prior.DeployedBy, "DeployReconciled", "Instance", + instance.Id.ToString(), instance.UniqueName, + new { DeploymentId = prior.DeploymentId, RevisionHash = targetRevisionHash }, + cancellationToken); + + return prior; + } + + // Site does not have the target revision (or is not deployed) — proceed + // with the normal deploy. + return null; + } + + /// + /// DeploymentManager-006: the site is queried before a re-deploy only when a + /// prior record is stuck , or is + /// because the site command timed out + /// (detected via the error-message + /// marker). All other prior states skip the query. + /// + private static bool ShouldQuerySiteBeforeRedeploy(DeploymentRecord prior) => + prior.Status == DeploymentStatus.InProgress + || (prior.Status == DeploymentStatus.Failed + && prior.ErrorMessage != null + && prior.ErrorMessage.StartsWith(TimeoutFailurePrefix, StringComparison.Ordinal)); + private async Task StoreDeployedSnapshotAsync( int instanceId, string deploymentId, diff --git a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs index 1248a0d..f414ca4 100644 --- a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs @@ -78,6 +78,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers Receive(HandleEnable); Receive(HandleDelete); + // DeploymentManager-006: query-the-site-before-redeploy idempotency. + // Central asks for the instance's currently-applied deployment identity + // before re-sending a deployment whose prior record is stuck InProgress + // or Failed due to a timeout. + Receive(HandleDeploymentStateQuery); + // WP-33: Handle system-wide artifact deployment Receive(HandleDeployArtifacts); @@ -446,6 +452,44 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers _logger.LogInformation("Instance {Instance} deleted", instanceName); } + /// + /// DeploymentManager-006: answers a central query for the instance's + /// currently-applied deployment identity. The site's deployed-config store + /// (SQLite) is the authoritative record — it covers both enabled and + /// disabled instances, and survives node restart/failover. If the instance + /// has no stored config, the response reports IsDeployed = false with + /// null identity so central falls through to a normal deploy. + /// + private void HandleDeploymentStateQuery(DeploymentStateQueryRequest request) + { + var sender = Sender; + var instanceName = request.InstanceUniqueName; + + _storage.GetAllDeployedConfigsAsync().ContinueWith(t => + { + if (!t.IsCompletedSuccessfully) + { + _logger.LogError( + t.Exception?.GetBaseException(), + "Failed to read deployed configs for deployment state query of {Instance}", + instanceName); + // Treat a storage read failure as "unknown" — central falls + // through to a normal deploy and relies on site-side + // stale-rejection as the safety net. + return new DeploymentStateQueryResponse( + request.CorrelationId, instanceName, false, null, null, DateTimeOffset.UtcNow); + } + + var config = t.Result.FirstOrDefault(c => c.InstanceUniqueName == instanceName); + return config == null + ? new DeploymentStateQueryResponse( + request.CorrelationId, instanceName, false, null, null, DateTimeOffset.UtcNow) + : new DeploymentStateQueryResponse( + request.CorrelationId, instanceName, true, + config.DeploymentId, config.RevisionHash, DateTimeOffset.UtcNow); + }).PipeTo(sender); + } + // ── DCL connection management ── private readonly HashSet _createdConnections = new(); diff --git a/tests/ScadaLink.Commons.Tests/Messages/CompatibilityTests.cs b/tests/ScadaLink.Commons.Tests/Messages/CompatibilityTests.cs index cf61708..36287b4 100644 --- a/tests/ScadaLink.Commons.Tests/Messages/CompatibilityTests.cs +++ b/tests/ScadaLink.Commons.Tests/Messages/CompatibilityTests.cs @@ -326,4 +326,48 @@ public class CompatibilityTests Assert.NotNull(msg); Assert.Equal((DeploymentStatus)99, msg!.Status); } + + // ── DeploymentManager-006: query-the-site-before-redeploy contracts ── + + [Fact] + public void RoundTrip_DeploymentStateQueryRequest_Succeeds() + { + var msg = new DeploymentStateQueryRequest("corr-1", "inst-1", DateTimeOffset.UtcNow); + var json = JsonSerializer.Serialize(msg); + var deserialized = JsonSerializer.Deserialize(json, Options); + + Assert.NotNull(deserialized); + Assert.Equal("corr-1", deserialized!.CorrelationId); + Assert.Equal("inst-1", deserialized.InstanceUniqueName); + } + + [Fact] + public void RoundTrip_DeploymentStateQueryResponse_Deployed_Succeeds() + { + var msg = new DeploymentStateQueryResponse( + "corr-1", "inst-1", true, "dep-9", "sha256:abc", DateTimeOffset.UtcNow); + var json = JsonSerializer.Serialize(msg); + var deserialized = JsonSerializer.Deserialize(json, Options); + + Assert.NotNull(deserialized); + Assert.True(deserialized!.IsDeployed); + Assert.Equal("dep-9", deserialized.AppliedDeploymentId); + Assert.Equal("sha256:abc", deserialized.AppliedRevisionHash); + } + + [Fact] + public void RoundTrip_DeploymentStateQueryResponse_NotDeployed_NullApplied() + { + // When the instance is not deployed at the site, the applied identity + // fields are null — verified to survive a JSON round-trip. + var msg = new DeploymentStateQueryResponse( + "corr-1", "inst-1", false, null, null, DateTimeOffset.UtcNow); + var json = JsonSerializer.Serialize(msg); + var deserialized = JsonSerializer.Deserialize(json, Options); + + Assert.NotNull(deserialized); + Assert.False(deserialized!.IsDeployed); + Assert.Null(deserialized.AppliedDeploymentId); + Assert.Null(deserialized.AppliedRevisionHash); + } } diff --git a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs index dca586b..6585794 100644 --- a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs +++ b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs @@ -1,12 +1,15 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using ScadaLink.Commons.Messages.Deployment; namespace ScadaLink.Communication.Tests; /// /// WP-2: Tests for CommunicationService initialization and state. /// -public class CommunicationServiceTests +public class CommunicationServiceTests : TestKit { [Fact] public async Task BeforeInitialization_ThrowsOnUsage() @@ -18,7 +21,7 @@ public class CommunicationServiceTests // CommunicationService requires SetCommunicationActor before use await Assert.ThrowsAsync(() => service.DeployInstanceAsync("site1", - new Commons.Messages.Deployment.DeployInstanceCommand( + new DeployInstanceCommand( "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow))); } @@ -30,4 +33,63 @@ public class CommunicationServiceTests Assert.NotNull(method); Assert.Equal(typeof(void), method!.ReturnType); } + + // ── DeploymentManager-006: query-the-site-before-redeploy ── + + [Fact] + public async Task QueryDeploymentStateAsync_BeforeInitialization_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.QueryDeploymentStateAsync("site1", + new DeploymentStateQueryRequest("corr-1", "inst1", DateTimeOffset.UtcNow))); + } + + [Fact] + public async Task QueryDeploymentStateAsync_SendsEnvelopeAndReturnsResponse() + { + // The query must be dispatched as a SiteEnvelope over the existing + // command/control transport, exactly like other site-directed commands, + // and the typed response returned to the caller. + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + // A probe stands in for CentralCommunicationActor: it asserts the + // envelope shape and replies with a typed response. + var commActor = Sys.ActorOf(Props.Create(() => new EchoStateQueryActor())); + service.SetCommunicationActor(commActor); + + var request = new DeploymentStateQueryRequest("corr-9", "QueriedInst", DateTimeOffset.UtcNow); + var response = await service.QueryDeploymentStateAsync("site-a", request); + + Assert.Equal("corr-9", response.CorrelationId); + Assert.Equal("QueriedInst", response.InstanceUniqueName); + Assert.True(response.IsDeployed); + Assert.Equal("sha256:applied", response.AppliedRevisionHash); + } + + /// + /// Stand-in for CentralCommunicationActor: verifies the message is wrapped + /// in a SiteEnvelope targeting the requested site and replies with a typed + /// DeploymentStateQueryResponse. + /// + private class EchoStateQueryActor : ReceiveActor + { + public EchoStateQueryActor() + { + Receive(env => + { + if (env is { SiteId: "site-a", Message: DeploymentStateQueryRequest req }) + { + Sender.Tell(new DeploymentStateQueryResponse( + req.CorrelationId, req.InstanceUniqueName, true, + "dep-applied", "sha256:applied", DateTimeOffset.UtcNow)); + } + }); + } + } } diff --git a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs index d7c48c1..37a6b1a 100644 --- a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs @@ -51,6 +51,22 @@ public class SiteCommunicationActorTests : TestKit dmProbe.ExpectMsg(); } + [Fact] + public void DeploymentStateQuery_ForwardedToDeploymentManager() + { + // DeploymentManager-006: the site-before-redeploy query travels over the + // ClusterClient command/control transport and is routed to the local + // Deployment Manager, which owns the deployed-config store. + var dmProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + var request = new DeploymentStateQueryRequest("corr-q", "inst1", DateTimeOffset.UtcNow); + siteActor.Tell(request); + + dmProbe.ExpectMsg(msg => msg.CorrelationId == "corr-q"); + } + [Fact] public void IntegrationCall_WithoutHandler_ReturnsFailure() { diff --git a/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs b/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs index 3adbbd6..9b382e0 100644 --- a/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs +++ b/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs @@ -1,3 +1,5 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NSubstitute; @@ -17,7 +19,7 @@ namespace ScadaLink.DeploymentManager.Tests; /// /// WP-1/2/4/5/6/8/16: Tests for central-side DeploymentService. /// -public class DeploymentServiceTests +public class DeploymentServiceTests : TestKit { private readonly IDeploymentManagerRepository _repo; private readonly IFlatteningPipeline _pipeline; @@ -363,4 +365,253 @@ public class DeploymentServiceTests // Failure case does not reach audit (returns before communication) // The audit is only logged after communication succeeds/fails } + + // ── DeploymentManager-006: query-the-site-before-redeploy idempotency ── + + /// + /// Builds a DeploymentService whose CommunicationService is backed by the + /// supplied actor, so the site query and deploy commands can be observed. + /// + private DeploymentService CreateServiceWithCommActor(IActorRef commActor) + { + var comms = new CommunicationService( + Options.Create(new CommunicationOptions + { + QueryTimeout = TimeSpan.FromSeconds(5), + DeploymentTimeout = TimeSpan.FromSeconds(5) + }), + NullLogger.Instance); + comms.SetCommunicationActor(commActor); + + var siteRepo = Substitute.For(); + return new DeploymentService( + _repo, siteRepo, _pipeline, comms, _lockManager, _audit, + Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }), + NullLogger.Instance); + } + + private void SetupValidPipeline(int instanceId, string instanceName, string revisionHash) + { + var config = new FlattenedConfiguration { InstanceUniqueName = instanceName }; + _pipeline.FlattenAndValidateAsync(instanceId, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(config, revisionHash, ValidationResult.Success()))); + } + + [Fact] + public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasTargetHash_MarksSuccessWithoutRedeploy() + { + // Prior record stuck InProgress -> site is queried. The site reports it + // already has the TARGET revision hash, so the prior record is marked + // Success and NO new DeployInstanceCommand is sent. + var instance = new Instance("RedeployInst") { Id = 7, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(7, Arg.Any()).Returns(instance); + SetupValidPipeline(7, "RedeployInst", "sha256:target"); + + var prior = new DeploymentRecord("dep-prior", "admin") + { + InstanceId = 7, + Status = DeploymentStatus.InProgress, + RevisionHash = "sha256:target" + }; + _repo.GetCurrentDeploymentStatusAsync(7, Arg.Any()).Returns(prior); + + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(7, "admin"); + + Assert.True(result.IsSuccess); + Assert.Equal(DeploymentStatus.Success, prior.Status); + // The site query was issued, but no new deploy command was sent. + Assert.Equal(1, ReconcileProbeActor.QueryCount); + Assert.Equal(0, ReconcileProbeActor.DeployCount); + // No new deployment record was created — the prior one was reconciled. + await _repo.DidNotReceive().AddDeploymentRecordAsync( + Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasDifferentHash_ProceedsWithDeploy() + { + // Prior record stuck InProgress -> site is queried. The site has a + // DIFFERENT revision hash, so the normal deploy proceeds. + var instance = new Instance("RedeployInst2") { Id = 8, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(8, Arg.Any()).Returns(instance); + SetupValidPipeline(8, "RedeployInst2", "sha256:target"); + + var prior = new DeploymentRecord("dep-prior2", "admin") + { + InstanceId = 8, + Status = DeploymentStatus.InProgress, + RevisionHash = "sha256:old" + }; + _repo.GetCurrentDeploymentStatusAsync(8, Arg.Any()).Returns(prior); + + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:old", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(8, "admin"); + + Assert.True(result.IsSuccess); + Assert.Equal(1, ReconcileProbeActor.QueryCount); + // The normal deploy proceeded — a new command was sent. + Assert.Equal(1, ReconcileProbeActor.DeployCount); + await _repo.Received().AddDeploymentRecordAsync( + Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task DeployInstanceAsync_PriorFailedTimeoutRecord_QueriesSite() + { + // A prior record Failed due to a timeout also triggers the site query. + var instance = new Instance("TimedOutInst") { Id = 9, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(9, Arg.Any()).Returns(instance); + SetupValidPipeline(9, "TimedOutInst", "sha256:target"); + + var prior = new DeploymentRecord("dep-prior3", "admin") + { + InstanceId = 9, + Status = DeploymentStatus.Failed, + RevisionHash = "sha256:target", + ErrorMessage = "Communication failure: deployment Ask timed out" + }; + _repo.GetCurrentDeploymentStatusAsync(9, Arg.Any()).Returns(prior); + + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(9, "admin"); + + Assert.True(result.IsSuccess); + Assert.Equal(1, ReconcileProbeActor.QueryCount); + Assert.Equal(0, ReconcileProbeActor.DeployCount); + Assert.Equal(DeploymentStatus.Success, prior.Status); + } + + [Fact] + public async Task DeployInstanceAsync_PriorSuccessRecord_SkipsSiteQuery() + { + // A clean prior Success record must NOT trigger the extra round-trip. + var instance = new Instance("CleanInst") { Id = 10, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(10, Arg.Any()).Returns(instance); + SetupValidPipeline(10, "CleanInst", "sha256:target"); + + var prior = new DeploymentRecord("dep-clean", "admin") + { + InstanceId = 10, + Status = DeploymentStatus.Success, + RevisionHash = "sha256:old" + }; + _repo.GetCurrentDeploymentStatusAsync(10, Arg.Any()).Returns(prior); + + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(10, "admin"); + + Assert.True(result.IsSuccess); + // No site query — the prior deploy completed cleanly. + Assert.Equal(0, ReconcileProbeActor.QueryCount); + Assert.Equal(1, ReconcileProbeActor.DeployCount); + } + + [Fact] + public async Task DeployInstanceAsync_FreshFirstTimeDeploy_SkipsSiteQuery() + { + // No prior record at all -> fresh deploy, no extra round-trip. + var instance = new Instance("FreshInst") { Id = 11, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(11, Arg.Any()).Returns(instance); + SetupValidPipeline(11, "FreshInst", "sha256:target"); + _repo.GetCurrentDeploymentStatusAsync(11, Arg.Any()) + .Returns((DeploymentRecord?)null); + + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(11, "admin"); + + Assert.True(result.IsSuccess); + Assert.Equal(0, ReconcileProbeActor.QueryCount); + Assert.Equal(1, ReconcileProbeActor.DeployCount); + } + + [Fact] + public async Task DeployInstanceAsync_PriorInProgressRecord_QueryFails_FallsThroughToDeploy() + { + // The site query fails (unreachable / times out). The deploy must NOT + // abort — it falls through to a normal deploy and relies on site-side + // stale-rejection as the safety net. + var instance = new Instance("UnreachableInst") { Id = 12, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(12, Arg.Any()).Returns(instance); + SetupValidPipeline(12, "UnreachableInst", "sha256:target"); + + var prior = new DeploymentRecord("dep-prior5", "admin") + { + InstanceId = 12, + Status = DeploymentStatus.InProgress, + RevisionHash = "sha256:target" + }; + _repo.GetCurrentDeploymentStatusAsync(12, Arg.Any()).Returns(prior); + + // The probe drops the query (no reply) -> the Ask times out. + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(siteHash: "sha256:target", failQuery: true))); + var service = CreateServiceWithCommActor(commActor); + + var result = await service.DeployInstanceAsync(12, "admin"); + + // Did not abort — the deploy proceeded after the failed query. + Assert.True(result.IsSuccess); + Assert.Equal(1, ReconcileProbeActor.QueryCount); + Assert.Equal(1, ReconcileProbeActor.DeployCount); + } + + /// + /// Stand-in CentralCommunicationActor for reconciliation tests. Counts the + /// site queries and deploy commands it receives, answers queries with a + /// configurable applied revision hash, and (optionally) drops the query to + /// simulate an unreachable site so the central Ask times out. + /// + private class ReconcileProbeActor : ReceiveActor + { + public static int QueryCount; + public static int DeployCount; + + public ReconcileProbeActor(string siteHash, bool failQuery) + { + // Each test creates a fresh actor; reset the shared counters. + QueryCount = 0; + DeployCount = 0; + + Receive(env => + { + switch (env.Message) + { + case DeploymentStateQueryRequest q: + QueryCount++; + if (!failQuery) + { + Sender.Tell(new DeploymentStateQueryResponse( + q.CorrelationId, q.InstanceUniqueName, true, + "dep-applied", siteHash, DateTimeOffset.UtcNow)); + } + // failQuery: drop the message -> caller's Ask times out. + break; + + case DeployInstanceCommand d: + DeployCount++; + Sender.Tell(new DeploymentStatusResponse( + d.DeploymentId, d.InstanceUniqueName, + DeploymentStatus.Success, null, DateTimeOffset.UtcNow)); + break; + } + }); + } + } } diff --git a/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj b/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj index 23f2b4f..754f7cf 100644 --- a/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj +++ b/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj @@ -9,6 +9,7 @@ + diff --git a/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs index 09bf0e9..f9c8513 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerActorTests.cs @@ -187,6 +187,46 @@ public class DeploymentManagerActorTests : TestKit, IDisposable Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "LifecyclePump"); } + // ── DeploymentManager-006: query-the-site-before-redeploy ── + + [Fact] + public async Task DeploymentStateQuery_DeployedInstance_ReturnsAppliedIdentity() + { + // A deployed instance must report its currently-applied deployment ID + // and revision hash so central can reconcile before a re-deploy. + await _storage.StoreDeployedConfigAsync( + "QueriedPump", MakeConfigJson("QueriedPump"), "dep-applied", "sha256:applied", true); + + var actor = CreateDeploymentManager(); + await Task.Delay(2000); // allow startup to load configs + + actor.Tell(new DeploymentStateQueryRequest("corr-q1", "QueriedPump", DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("corr-q1", response.CorrelationId); + Assert.Equal("QueriedPump", response.InstanceUniqueName); + Assert.True(response.IsDeployed); + Assert.Equal("dep-applied", response.AppliedDeploymentId); + Assert.Equal("sha256:applied", response.AppliedRevisionHash); + } + + [Fact] + public async Task DeploymentStateQuery_UnknownInstance_ReturnsNotDeployed() + { + // An instance the site has never received a deployment for must report + // IsDeployed=false with null applied identity. + var actor = CreateDeploymentManager(); + await Task.Delay(500); + + actor.Tell(new DeploymentStateQueryRequest("corr-q2", "NeverDeployed", DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("corr-q2", response.CorrelationId); + Assert.False(response.IsDeployed); + Assert.Null(response.AppliedDeploymentId); + Assert.Null(response.AppliedRevisionHash); + } + [Fact] public void DeploymentManager_SupervisionStrategy_ResumesOnException() {