fix(deployment-manager): resolve DeploymentManager-003..011 — atomic status commit, orphan-delete handling, semaphore reclamation, structured diff, options binding, lifecycle test coverage
This commit is contained in:
@@ -13,6 +13,7 @@ using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.Commons.Types.Flattening;
|
||||
using ScadaLink.Communication;
|
||||
using ScadaLink.TemplateEngine.Flattening;
|
||||
|
||||
namespace ScadaLink.DeploymentManager.Tests;
|
||||
|
||||
@@ -45,7 +46,8 @@ public class DeploymentServiceTests : TestKit
|
||||
|
||||
var siteRepo = Substitute.For<ISiteRepository>();
|
||||
_service = new DeploymentService(
|
||||
_repo, siteRepo, _pipeline, _comms, _lockManager, _audit, options,
|
||||
_repo, siteRepo, _pipeline, _comms, _lockManager, _audit,
|
||||
new DiffService(), options,
|
||||
NullLogger<DeploymentService>.Instance);
|
||||
}
|
||||
|
||||
@@ -276,6 +278,34 @@ public class DeploymentServiceTests : TestKit
|
||||
Assert.Contains("not found", result.Error);
|
||||
}
|
||||
|
||||
// ── DeploymentManager-004: site-success but central-delete-failure must not escape uncaught ──
|
||||
|
||||
[Fact]
|
||||
public async Task DeleteInstanceAsync_SiteSucceeds_CentralDeleteFails_ReturnsDistinctFailure()
|
||||
{
|
||||
// The site destroys the Instance Actor and removes its config (response
|
||||
// Success), but the central record removal throws. The exception must
|
||||
// NOT propagate uncaught -- it must be surfaced as a distinct failure so
|
||||
// an operator can reconcile the orphaned central record.
|
||||
var instance = new Instance("OrphanInst") { Id = 30, SiteId = 1, State = InstanceState.Enabled };
|
||||
_repo.GetInstanceByIdAsync(30, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
|
||||
_repo.DeleteInstanceAsync(30, Arg.Any<CancellationToken>())
|
||||
.Returns<Task>(_ => throw new InvalidOperationException("db unavailable"));
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "sha256:x", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.DeleteInstanceAsync(30, "admin");
|
||||
|
||||
// The failure is surfaced (not thrown) and clearly says the site
|
||||
// succeeded but the central record could not be removed.
|
||||
Assert.True(result.IsFailure);
|
||||
Assert.Contains("site", result.Error, StringComparison.OrdinalIgnoreCase);
|
||||
Assert.Contains("central", result.Error, StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
// ── WP-8: Deployment comparison ──
|
||||
|
||||
[Fact]
|
||||
@@ -331,6 +361,51 @@ public class DeploymentServiceTests : TestKit
|
||||
Assert.True(result.Value.IsStale);
|
||||
}
|
||||
|
||||
// ── DeploymentManager-007: comparison must produce a structured diff ──
|
||||
|
||||
[Fact]
|
||||
public async Task GetDeploymentComparisonAsync_ProducesStructuredDiff()
|
||||
{
|
||||
// The deployed snapshot has one attribute; the current template-derived
|
||||
// config has a different attribute. The comparison must surface a real
|
||||
// Added/Removed diff via the TemplateEngine DiffService, not just a
|
||||
// boolean staleness flag.
|
||||
var deployedConfig = new FlattenedConfiguration
|
||||
{
|
||||
InstanceUniqueName = "DiffInst",
|
||||
Attributes = [new ResolvedAttribute { CanonicalName = "OldAttr", DataType = "Int" }]
|
||||
};
|
||||
var snapshot = new DeployedConfigSnapshot(
|
||||
"dep1", "sha256:old", System.Text.Json.JsonSerializer.Serialize(deployedConfig))
|
||||
{
|
||||
InstanceId = 40,
|
||||
DeployedAt = DateTimeOffset.UtcNow
|
||||
};
|
||||
_repo.GetDeployedSnapshotByInstanceIdAsync(40, Arg.Any<CancellationToken>()).Returns(snapshot);
|
||||
|
||||
var currentConfig = new FlattenedConfiguration
|
||||
{
|
||||
InstanceUniqueName = "DiffInst",
|
||||
Attributes = [new ResolvedAttribute { CanonicalName = "NewAttr", DataType = "Int" }]
|
||||
};
|
||||
_pipeline.FlattenAndValidateAsync(40, Arg.Any<CancellationToken>())
|
||||
.Returns(Result<FlatteningPipelineResult>.Success(
|
||||
new FlatteningPipelineResult(currentConfig, "sha256:new", ValidationResult.Success())));
|
||||
|
||||
var result = await _service.GetDeploymentComparisonAsync(40);
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
Assert.True(result.Value.IsStale);
|
||||
|
||||
// A structured diff is present with the added and removed attributes.
|
||||
Assert.NotNull(result.Value.Diff);
|
||||
Assert.True(result.Value.Diff!.HasChanges);
|
||||
Assert.Contains(result.Value.Diff.AttributeChanges,
|
||||
c => c.CanonicalName == "NewAttr" && c.ChangeType == DiffChangeType.Added);
|
||||
Assert.Contains(result.Value.Diff.AttributeChanges,
|
||||
c => c.CanonicalName == "OldAttr" && c.ChangeType == DiffChangeType.Removed);
|
||||
}
|
||||
|
||||
// ── WP-2: GetDeploymentStatusAsync ──
|
||||
|
||||
[Fact]
|
||||
@@ -352,8 +427,11 @@ public class DeploymentServiceTests : TestKit
|
||||
// ── Audit logging ──
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_AuditLogs()
|
||||
public async Task DeployInstanceAsync_FlatteningFails_DoesNotReachAudit()
|
||||
{
|
||||
// DeploymentManager-011: this test previously asserted nothing. A
|
||||
// flatten failure returns before any site communication, so no audit
|
||||
// entry is written.
|
||||
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
|
||||
_repo.GetInstanceByIdAsync(1).Returns(instance);
|
||||
|
||||
@@ -362,8 +440,120 @@ public class DeploymentServiceTests : TestKit
|
||||
|
||||
await _service.DeployInstanceAsync(1, "admin");
|
||||
|
||||
// Failure case does not reach audit (returns before communication)
|
||||
// The audit is only logged after communication succeeds/fails
|
||||
await _audit.DidNotReceive().LogAsync(
|
||||
Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
|
||||
Arg.Any<string>(), Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_SiteSucceeds_WritesDeployAuditEntry()
|
||||
{
|
||||
// DeploymentManager-011: a successful deployment must write a "Deploy"
|
||||
// audit entry referencing the deployed instance.
|
||||
var instance = new Instance("AuditInst") { Id = 50, SiteId = 1, State = InstanceState.NotDeployed };
|
||||
_repo.GetInstanceByIdAsync(50, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
SetupValidPipeline(50, "AuditInst", "sha256:target");
|
||||
_repo.GetCurrentDeploymentStatusAsync(50, Arg.Any<CancellationToken>())
|
||||
.Returns((DeploymentRecord?)null);
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.DeployInstanceAsync(50, "admin");
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
await _audit.Received().LogAsync(
|
||||
"admin", "Deploy", "Instance", "50", "AuditInst",
|
||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
// ── DeploymentManager-011: lifecycle success paths ──
|
||||
|
||||
[Fact]
|
||||
public async Task DisableInstanceAsync_SiteSucceeds_SetsDisabledStateAndAudits()
|
||||
{
|
||||
var instance = new Instance("DisInst") { Id = 51, SiteId = 1, State = InstanceState.Enabled };
|
||||
_repo.GetInstanceByIdAsync(51, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.DisableInstanceAsync(51, "admin");
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
Assert.Equal(InstanceState.Disabled, instance.State);
|
||||
await _repo.Received().UpdateInstanceAsync(instance, Arg.Any<CancellationToken>());
|
||||
await _audit.Received().LogAsync(
|
||||
"admin", "Disable", "Instance", "51", "DisInst",
|
||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task EnableInstanceAsync_SiteSucceeds_SetsEnabledStateAndAudits()
|
||||
{
|
||||
var instance = new Instance("EnInst") { Id = 52, SiteId = 1, State = InstanceState.Disabled };
|
||||
_repo.GetInstanceByIdAsync(52, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.EnableInstanceAsync(52, "admin");
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
Assert.Equal(InstanceState.Enabled, instance.State);
|
||||
await _repo.Received().UpdateInstanceAsync(instance, Arg.Any<CancellationToken>());
|
||||
await _audit.Received().LogAsync(
|
||||
"admin", "Enable", "Instance", "52", "EnInst",
|
||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeleteInstanceAsync_SiteSucceeds_RemovesRecordAndAudits()
|
||||
{
|
||||
var instance = new Instance("DelInst") { Id = 53, SiteId = 1, State = InstanceState.Enabled };
|
||||
_repo.GetInstanceByIdAsync(53, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.DeleteInstanceAsync(53, "admin");
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
await _repo.Received().DeleteInstanceAsync(53, Arg.Any<CancellationToken>());
|
||||
await _audit.Received().LogAsync(
|
||||
"admin", "Delete", "Instance", "53", "DelInst",
|
||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_SameInstance_OperationLockSerializesConcurrentDeploys()
|
||||
{
|
||||
// DeploymentManager-011: two concurrent deploys of the SAME instance
|
||||
// must be serialized by the per-instance operation lock — the site sees
|
||||
// them one at a time, never overlapping.
|
||||
var instance = new Instance("LockInst") { Id = 54, SiteId = 1, State = InstanceState.Enabled };
|
||||
_repo.GetInstanceByIdAsync(54, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
SetupValidPipeline(54, "LockInst", "sha256:target");
|
||||
_repo.GetCurrentDeploymentStatusAsync(54, Arg.Any<CancellationToken>())
|
||||
.Returns((DeploymentRecord?)null);
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new SerializationProbeActor()));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var deploy1 = service.DeployInstanceAsync(54, "admin");
|
||||
var deploy2 = service.DeployInstanceAsync(54, "admin");
|
||||
var results = await Task.WhenAll(deploy1, deploy2);
|
||||
|
||||
Assert.True(results[0].IsSuccess);
|
||||
Assert.True(results[1].IsSuccess);
|
||||
// The probe records the maximum concurrency observed; the lock must
|
||||
// keep it at 1 for a single instance.
|
||||
Assert.Equal(1, SerializationProbeActor.MaxConcurrent);
|
||||
}
|
||||
|
||||
// ── DeploymentManager-006: query-the-site-before-redeploy idempotency ──
|
||||
@@ -386,6 +576,7 @@ public class DeploymentServiceTests : TestKit
|
||||
var siteRepo = Substitute.For<ISiteRepository>();
|
||||
return new DeploymentService(
|
||||
_repo, siteRepo, _pipeline, comms, _lockManager, _audit,
|
||||
new DiffService(),
|
||||
Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }),
|
||||
NullLogger<DeploymentService>.Instance);
|
||||
}
|
||||
@@ -572,6 +763,109 @@ public class DeploymentServiceTests : TestKit
|
||||
Assert.Equal(1, ReconcileProbeActor.DeployCount);
|
||||
}
|
||||
|
||||
// ── DeploymentManager-003: post-success persistence must commit the Success status ──
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_SiteSucceeds_SnapshotWriteFails_RecordStillCommittedSuccess()
|
||||
{
|
||||
// The site applies the deployment (response Success), but storing the
|
||||
// deployed-config snapshot afterwards throws. The deployment record's
|
||||
// Success status MUST still be durably committed -- otherwise central
|
||||
// and site diverge: the site runs the new config while central shows a
|
||||
// non-Success record forever.
|
||||
var instance = new Instance("SnapFailInst") { Id = 20, SiteId = 1, State = InstanceState.NotDeployed };
|
||||
_repo.GetInstanceByIdAsync(20, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
SetupValidPipeline(20, "SnapFailInst", "sha256:target");
|
||||
_repo.GetCurrentDeploymentStatusAsync(20, Arg.Any<CancellationToken>())
|
||||
.Returns((DeploymentRecord?)null);
|
||||
|
||||
DeploymentRecord? captured = null;
|
||||
await _repo.AddDeploymentRecordAsync(
|
||||
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
|
||||
|
||||
// The snapshot store throws.
|
||||
_repo.GetDeployedSnapshotByInstanceIdAsync(20, Arg.Any<CancellationToken>())
|
||||
.Returns((DeployedConfigSnapshot?)null);
|
||||
_repo.AddDeployedSnapshotAsync(Arg.Any<DeployedConfigSnapshot>(), Arg.Any<CancellationToken>())
|
||||
.Returns<Task>(_ => throw new InvalidOperationException("snapshot store unavailable"));
|
||||
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
var result = await service.DeployInstanceAsync(20, "admin");
|
||||
|
||||
// The site succeeded -> the deployment is reported successful.
|
||||
Assert.True(result.IsSuccess);
|
||||
Assert.NotNull(captured);
|
||||
Assert.Equal(DeploymentStatus.Success, captured!.Status);
|
||||
|
||||
// The Success status was committed (a SaveChanges happened with the
|
||||
// record in Success state) BEFORE the snapshot write was attempted.
|
||||
await _repo.Received().UpdateDeploymentRecordAsync(
|
||||
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Success),
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stand-in CentralCommunicationActor that measures deploy concurrency. It
|
||||
/// defers each deploy reply via the scheduler, so if two deploys for the
|
||||
/// same instance were NOT serialized by the operation lock their windows
|
||||
/// would overlap and <see cref="MaxConcurrent"/> would exceed 1.
|
||||
/// </summary>
|
||||
private class SerializationProbeActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
public static int MaxConcurrent;
|
||||
private static int _current;
|
||||
private static readonly object Gate = new();
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
|
||||
public SerializationProbeActor()
|
||||
{
|
||||
MaxConcurrent = 0;
|
||||
_current = 0;
|
||||
|
||||
Receive<SiteEnvelope>(env =>
|
||||
{
|
||||
if (env.Message is DeployInstanceCommand d)
|
||||
{
|
||||
lock (Gate)
|
||||
{
|
||||
_current++;
|
||||
if (_current > MaxConcurrent) MaxConcurrent = _current;
|
||||
}
|
||||
|
||||
var replyTo = Sender;
|
||||
// Defer the reply so the deploy "window" stays open long
|
||||
// enough for a non-serialized second deploy to overlap.
|
||||
Timers.StartSingleTimer(
|
||||
d.DeploymentId,
|
||||
new CompleteDeploy(d, replyTo),
|
||||
TimeSpan.FromMilliseconds(150));
|
||||
}
|
||||
else if (env.Message is DeploymentStateQueryRequest q)
|
||||
{
|
||||
Sender.Tell(new DeploymentStateQueryResponse(
|
||||
q.CorrelationId, q.InstanceUniqueName, false, null, null, DateTimeOffset.UtcNow));
|
||||
}
|
||||
});
|
||||
|
||||
Receive<CompleteDeploy>(c =>
|
||||
{
|
||||
lock (Gate)
|
||||
{
|
||||
_current--;
|
||||
}
|
||||
c.ReplyTo.Tell(new DeploymentStatusResponse(
|
||||
c.Command.DeploymentId, c.Command.InstanceUniqueName,
|
||||
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
|
||||
});
|
||||
}
|
||||
|
||||
private sealed record CompleteDeploy(DeployInstanceCommand Command, IActorRef ReplyTo);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stand-in CentralCommunicationActor for reconciliation tests. Counts the
|
||||
/// site queries and deploy commands it receives, answers queries with a
|
||||
@@ -610,6 +904,21 @@ public class DeploymentServiceTests : TestKit
|
||||
d.DeploymentId, d.InstanceUniqueName,
|
||||
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
|
||||
break;
|
||||
|
||||
case DisableInstanceCommand dis:
|
||||
Sender.Tell(new InstanceLifecycleResponse(
|
||||
dis.CommandId, dis.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
|
||||
break;
|
||||
|
||||
case EnableInstanceCommand en:
|
||||
Sender.Tell(new InstanceLifecycleResponse(
|
||||
en.CommandId, en.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
|
||||
break;
|
||||
|
||||
case DeleteInstanceCommand del:
|
||||
Sender.Tell(new InstanceLifecycleResponse(
|
||||
del.CommandId, del.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user