feat(deployment-manager): resolve DeploymentManager-006 — query site deployment state before redeploy and reconcile

Adds DeploymentStateQuery request/response contracts (Commons), a site-side
handler (SiteRuntime), a CommunicationService query method (Communication), and
reconciliation in DeploymentService: when a prior record is InProgress or
Failed-on-timeout, query the site; if it already holds the target revision hash
mark the record Success without re-sending; on query failure fall through to a
normal deploy (site-side stale-rejection is the safety net).
This commit is contained in:
Joseph Doherty
2026-05-16 20:12:24 -04:00
parent cac8aebe9f
commit bc548e1447
13 changed files with 662 additions and 19 deletions

View File

@@ -1,3 +1,5 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
@@ -17,7 +19,7 @@ namespace ScadaLink.DeploymentManager.Tests;
/// <summary>
/// WP-1/2/4/5/6/8/16: Tests for central-side DeploymentService.
/// </summary>
public class DeploymentServiceTests
public class DeploymentServiceTests : TestKit
{
private readonly IDeploymentManagerRepository _repo;
private readonly IFlatteningPipeline _pipeline;
@@ -363,4 +365,253 @@ public class DeploymentServiceTests
// Failure case does not reach audit (returns before communication)
// The audit is only logged after communication succeeds/fails
}
// ── DeploymentManager-006: query-the-site-before-redeploy idempotency ──
/// <summary>
/// Builds a DeploymentService whose CommunicationService is backed by the
/// supplied actor, so the site query and deploy commands can be observed.
/// </summary>
private DeploymentService CreateServiceWithCommActor(IActorRef commActor)
{
var comms = new CommunicationService(
Options.Create(new CommunicationOptions
{
QueryTimeout = TimeSpan.FromSeconds(5),
DeploymentTimeout = TimeSpan.FromSeconds(5)
}),
NullLogger<CommunicationService>.Instance);
comms.SetCommunicationActor(commActor);
var siteRepo = Substitute.For<ISiteRepository>();
return new DeploymentService(
_repo, siteRepo, _pipeline, comms, _lockManager, _audit,
Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }),
NullLogger<DeploymentService>.Instance);
}
private void SetupValidPipeline(int instanceId, string instanceName, string revisionHash)
{
var config = new FlattenedConfiguration { InstanceUniqueName = instanceName };
_pipeline.FlattenAndValidateAsync(instanceId, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, revisionHash, ValidationResult.Success())));
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasTargetHash_MarksSuccessWithoutRedeploy()
{
// Prior record stuck InProgress -> site is queried. The site reports it
// already has the TARGET revision hash, so the prior record is marked
// Success and NO new DeployInstanceCommand is sent.
var instance = new Instance("RedeployInst") { Id = 7, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(7, "RedeployInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior", "admin")
{
InstanceId = 7,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(7, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(7, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(DeploymentStatus.Success, prior.Status);
// The site query was issued, but no new deploy command was sent.
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(0, ReconcileProbeActor.DeployCount);
// No new deployment record was created — the prior one was reconciled.
await _repo.DidNotReceive().AddDeploymentRecordAsync(
Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasDifferentHash_ProceedsWithDeploy()
{
// Prior record stuck InProgress -> site is queried. The site has a
// DIFFERENT revision hash, so the normal deploy proceeds.
var instance = new Instance("RedeployInst2") { Id = 8, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(8, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(8, "RedeployInst2", "sha256:target");
var prior = new DeploymentRecord("dep-prior2", "admin")
{
InstanceId = 8,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:old"
};
_repo.GetCurrentDeploymentStatusAsync(8, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:old", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(8, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
// The normal deploy proceeded — a new command was sent.
Assert.Equal(1, ReconcileProbeActor.DeployCount);
await _repo.Received().AddDeploymentRecordAsync(
Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_PriorFailedTimeoutRecord_QueriesSite()
{
// A prior record Failed due to a timeout also triggers the site query.
var instance = new Instance("TimedOutInst") { Id = 9, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(9, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(9, "TimedOutInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior3", "admin")
{
InstanceId = 9,
Status = DeploymentStatus.Failed,
RevisionHash = "sha256:target",
ErrorMessage = "Communication failure: deployment Ask timed out"
};
_repo.GetCurrentDeploymentStatusAsync(9, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(9, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(0, ReconcileProbeActor.DeployCount);
Assert.Equal(DeploymentStatus.Success, prior.Status);
}
[Fact]
public async Task DeployInstanceAsync_PriorSuccessRecord_SkipsSiteQuery()
{
// A clean prior Success record must NOT trigger the extra round-trip.
var instance = new Instance("CleanInst") { Id = 10, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(10, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(10, "CleanInst", "sha256:target");
var prior = new DeploymentRecord("dep-clean", "admin")
{
InstanceId = 10,
Status = DeploymentStatus.Success,
RevisionHash = "sha256:old"
};
_repo.GetCurrentDeploymentStatusAsync(10, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(10, "admin");
Assert.True(result.IsSuccess);
// No site query — the prior deploy completed cleanly.
Assert.Equal(0, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
[Fact]
public async Task DeployInstanceAsync_FreshFirstTimeDeploy_SkipsSiteQuery()
{
// No prior record at all -> fresh deploy, no extra round-trip.
var instance = new Instance("FreshInst") { Id = 11, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(11, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(11, "FreshInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(11, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(11, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(0, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_QueryFails_FallsThroughToDeploy()
{
// The site query fails (unreachable / times out). The deploy must NOT
// abort — it falls through to a normal deploy and relies on site-side
// stale-rejection as the safety net.
var instance = new Instance("UnreachableInst") { Id = 12, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(12, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(12, "UnreachableInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior5", "admin")
{
InstanceId = 12,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(12, Arg.Any<CancellationToken>()).Returns(prior);
// The probe drops the query (no reply) -> the Ask times out.
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: true)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(12, "admin");
// Did not abort — the deploy proceeded after the failed query.
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
/// <summary>
/// Stand-in CentralCommunicationActor for reconciliation tests. Counts the
/// site queries and deploy commands it receives, answers queries with a
/// configurable applied revision hash, and (optionally) drops the query to
/// simulate an unreachable site so the central Ask times out.
/// </summary>
private class ReconcileProbeActor : ReceiveActor
{
public static int QueryCount;
public static int DeployCount;
public ReconcileProbeActor(string siteHash, bool failQuery)
{
// Each test creates a fresh actor; reset the shared counters.
QueryCount = 0;
DeployCount = 0;
Receive<SiteEnvelope>(env =>
{
switch (env.Message)
{
case DeploymentStateQueryRequest q:
QueryCount++;
if (!failQuery)
{
Sender.Tell(new DeploymentStateQueryResponse(
q.CorrelationId, q.InstanceUniqueName, true,
"dep-applied", siteHash, DateTimeOffset.UtcNow));
}
// failQuery: drop the message -> caller's Ask times out.
break;
case DeployInstanceCommand d:
DeployCount++;
Sender.Tell(new DeploymentStatusResponse(
d.DeploymentId, d.InstanceUniqueName,
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
break;
}
});
}
}
}

View File

@@ -9,6 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.TestKit.Xunit2" />
<PackageReference Include="coverlet.collector" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="NSubstitute" />