Files
scadalink-design/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs

927 lines
40 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.Commons.Entities.Deployment;
using ScadaLink.Commons.Entities.Instances;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Commons.Types.Flattening;
using ScadaLink.Communication;
using ScadaLink.TemplateEngine.Flattening;
namespace ScadaLink.DeploymentManager.Tests;
/// <summary>
/// WP-1/2/4/5/6/8/16: Tests for central-side DeploymentService.
/// </summary>
public class DeploymentServiceTests : TestKit
{
private readonly IDeploymentManagerRepository _repo;
private readonly IFlatteningPipeline _pipeline;
private readonly CommunicationService _comms;
private readonly OperationLockManager _lockManager;
private readonly IAuditService _audit;
private readonly DeploymentService _service;
public DeploymentServiceTests()
{
_repo = Substitute.For<IDeploymentManagerRepository>();
_pipeline = Substitute.For<IFlatteningPipeline>();
_comms = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
_lockManager = new OperationLockManager();
_audit = Substitute.For<IAuditService>();
var options = Options.Create(new DeploymentManagerOptions
{
OperationLockTimeout = TimeSpan.FromSeconds(5)
});
var siteRepo = Substitute.For<ISiteRepository>();
_service = new DeploymentService(
_repo, siteRepo, _pipeline, _comms, _lockManager, _audit,
new DiffService(), options,
NullLogger<DeploymentService>.Instance);
}
// ── WP-1: Deployment flow ──
[Fact]
public async Task DeployInstanceAsync_InstanceNotFound_ReturnsFailure()
{
_repo.GetInstanceByIdAsync(1).Returns((Instance?)null);
var result = await _service.DeployInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("not found", result.Error);
}
[Fact]
public async Task DeployInstanceAsync_ValidationFails_ReturnsFailure()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var validationResult = new ValidationResult
{
Errors = [ValidationEntry.Error(ValidationCategory.ScriptCompilation, "Compile error")]
};
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(new FlattenedConfiguration(), "hash1", validationResult)));
var result = await _service.DeployInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("validation failed", result.Error);
}
[Fact]
public async Task DeployInstanceAsync_FlatteningFails_ReturnsFailure()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Failure("Template chain empty"));
var result = await _service.DeployInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("Validation failed", result.Error);
}
// ── WP-2: Deployment identity ──
[Fact]
public async Task DeployInstanceAsync_CreatesUniqueDeploymentId()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
// Pipeline succeeds
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
var validResult = ValidationResult.Success();
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", validResult)));
// Capture the deployment record
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// CommunicationService will throw because actor not set -- this tests the flow up to that point
try
{
await _service.DeployInstanceAsync(1, "admin");
}
catch (InvalidOperationException)
{
// Expected -- CommunicationService not initialized
}
Assert.NotNull(captured);
Assert.False(string.IsNullOrEmpty(captured!.DeploymentId));
Assert.Equal(32, captured.DeploymentId.Length); // GUID without hyphens
Assert.Equal("sha256:abc", captured.RevisionHash);
}
// ── WP-4: State transition validation ──
[Fact]
public async Task DeployInstanceAsync_EnabledInstance_AllowsDeploy()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "hash", ValidationResult.Success())));
// Will fail at communication layer, but passes state validation
try { await _service.DeployInstanceAsync(1, "admin"); } catch (InvalidOperationException) { }
// If we got past state validation, the deployment record was created
await _repo.Received().AddDeploymentRecordAsync(Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
// ── DeploymentManager-001: unexpected exception must not leave record InProgress ──
[Fact]
public async Task DeployInstanceAsync_CommunicationThrowsUnexpectedException_RecordMarkedFailed()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
// Capture the deployment record so we can inspect its final state.
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// _comms has no actor set, so DeployInstanceAsync throws
// InvalidOperationException -- a non-timeout, non-cancellation exception.
var result = await _service.DeployInstanceAsync(1, "admin");
// The exception must be handled, not escape.
Assert.True(result.IsFailure);
Assert.Contains("Deployment failed", result.Error);
// The record must not be left stuck in InProgress.
Assert.NotNull(captured);
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
Assert.NotNull(captured.ErrorMessage);
Assert.NotNull(captured.CompletedAt);
}
// ── DeploymentManager-002: failure write must not use a cancelled token ──
[Fact]
public async Task DeployInstanceAsync_FailureWrite_UsesNonCancellableToken()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(instance);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// Simulate a repository that rejects already-cancelled tokens (the
// real EF Core behaviour when the operation token is cancelled). If the
// catch block passes the operation's cancelled token, the Failed-status
// write throws and the record stays InProgress -- the exact bug.
_repo.UpdateDeploymentRecordAsync(
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
.Returns<Task>(_ => throw new OperationCanceledException());
_repo.SaveChangesAsync(Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
.Returns<Task<int>>(_ => throw new OperationCanceledException());
// The communication call fails (no actor set). The catch block must
// persist the Failed status with a non-cancellable token, so cleanup
// succeeds even when the caller's token is cancelled.
var result = await _service.DeployInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.NotNull(captured);
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
// The Failed-status write happened with a non-cancelled token.
await _repo.Received().UpdateDeploymentRecordAsync(
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
Arg.Is<CancellationToken>(ct => !ct.IsCancellationRequested));
}
// ── WP-6: Lifecycle commands ──
[Fact]
public async Task DisableInstanceAsync_InstanceNotFound_ReturnsFailure()
{
_repo.GetInstanceByIdAsync(1).Returns((Instance?)null);
var result = await _service.DisableInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("not found", result.Error);
}
[Fact]
public async Task DisableInstanceAsync_WhenDisabled_ReturnsTransitionError()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Disabled };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var result = await _service.DisableInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("not allowed", result.Error);
}
[Fact]
public async Task EnableInstanceAsync_WhenEnabled_ReturnsTransitionError()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var result = await _service.EnableInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("not allowed", result.Error);
}
[Fact]
public async Task DeleteInstanceAsync_InstanceNotFound_ReturnsFailure()
{
_repo.GetInstanceByIdAsync(1).Returns((Instance?)null);
var result = await _service.DeleteInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.Contains("not found", result.Error);
}
// ── DeploymentManager-004: site-success but central-delete-failure must not escape uncaught ──
[Fact]
public async Task DeleteInstanceAsync_SiteSucceeds_CentralDeleteFails_ReturnsDistinctFailure()
{
// The site destroys the Instance Actor and removes its config (response
// Success), but the central record removal throws. The exception must
// NOT propagate uncaught -- it must be surfaced as a distinct failure so
// an operator can reconcile the orphaned central record.
var instance = new Instance("OrphanInst") { Id = 30, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(30, Arg.Any<CancellationToken>()).Returns(instance);
_repo.DeleteInstanceAsync(30, Arg.Any<CancellationToken>())
.Returns<Task>(_ => throw new InvalidOperationException("db unavailable"));
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:x", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeleteInstanceAsync(30, "admin");
// The failure is surfaced (not thrown) and clearly says the site
// succeeded but the central record could not be removed.
Assert.True(result.IsFailure);
Assert.Contains("site", result.Error, StringComparison.OrdinalIgnoreCase);
Assert.Contains("central", result.Error, StringComparison.OrdinalIgnoreCase);
}
// ── WP-8: Deployment comparison ──
[Fact]
public async Task GetDeploymentComparisonAsync_NoSnapshot_ReturnsFailure()
{
_repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns((DeployedConfigSnapshot?)null);
var result = await _service.GetDeploymentComparisonAsync(1);
Assert.True(result.IsFailure);
Assert.Contains("No deployed snapshot", result.Error);
}
[Fact]
public async Task GetDeploymentComparisonAsync_SameHash_NotStale()
{
var snapshot = new DeployedConfigSnapshot("dep1", "sha256:abc", "{}")
{
InstanceId = 1,
DeployedAt = DateTimeOffset.UtcNow
};
_repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns(snapshot);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
var result = await _service.GetDeploymentComparisonAsync(1);
Assert.True(result.IsSuccess);
Assert.False(result.Value.IsStale);
}
[Fact]
public async Task GetDeploymentComparisonAsync_DifferentHash_IsStale()
{
var snapshot = new DeployedConfigSnapshot("dep1", "sha256:abc", "{}")
{
InstanceId = 1,
DeployedAt = DateTimeOffset.UtcNow
};
_repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns(snapshot);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:xyz", ValidationResult.Success())));
var result = await _service.GetDeploymentComparisonAsync(1);
Assert.True(result.IsSuccess);
Assert.True(result.Value.IsStale);
}
// ── DeploymentManager-007: comparison must produce a structured diff ──
[Fact]
public async Task GetDeploymentComparisonAsync_ProducesStructuredDiff()
{
// The deployed snapshot has one attribute; the current template-derived
// config has a different attribute. The comparison must surface a real
// Added/Removed diff via the TemplateEngine DiffService, not just a
// boolean staleness flag.
var deployedConfig = new FlattenedConfiguration
{
InstanceUniqueName = "DiffInst",
Attributes = [new ResolvedAttribute { CanonicalName = "OldAttr", DataType = "Int" }]
};
var snapshot = new DeployedConfigSnapshot(
"dep1", "sha256:old", System.Text.Json.JsonSerializer.Serialize(deployedConfig))
{
InstanceId = 40,
DeployedAt = DateTimeOffset.UtcNow
};
_repo.GetDeployedSnapshotByInstanceIdAsync(40, Arg.Any<CancellationToken>()).Returns(snapshot);
var currentConfig = new FlattenedConfiguration
{
InstanceUniqueName = "DiffInst",
Attributes = [new ResolvedAttribute { CanonicalName = "NewAttr", DataType = "Int" }]
};
_pipeline.FlattenAndValidateAsync(40, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(currentConfig, "sha256:new", ValidationResult.Success())));
var result = await _service.GetDeploymentComparisonAsync(40);
Assert.True(result.IsSuccess);
Assert.True(result.Value.IsStale);
// A structured diff is present with the added and removed attributes.
Assert.NotNull(result.Value.Diff);
Assert.True(result.Value.Diff!.HasChanges);
Assert.Contains(result.Value.Diff.AttributeChanges,
c => c.CanonicalName == "NewAttr" && c.ChangeType == DiffChangeType.Added);
Assert.Contains(result.Value.Diff.AttributeChanges,
c => c.CanonicalName == "OldAttr" && c.ChangeType == DiffChangeType.Removed);
}
// ── WP-2: GetDeploymentStatusAsync ──
[Fact]
public async Task GetDeploymentStatusAsync_ReturnsRecordByDeploymentId()
{
var record = new DeploymentRecord("dep1", "admin")
{
Status = DeploymentStatus.Success
};
_repo.GetDeploymentByDeploymentIdAsync("dep1").Returns(record);
var result = await _service.GetDeploymentStatusAsync("dep1");
Assert.NotNull(result);
Assert.Equal("dep1", result!.DeploymentId);
Assert.Equal(DeploymentStatus.Success, result.Status);
}
// ── Audit logging ──
[Fact]
public async Task DeployInstanceAsync_FlatteningFails_DoesNotReachAudit()
{
// DeploymentManager-011: this test previously asserted nothing. A
// flatten failure returns before any site communication, so no audit
// entry is written.
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Failure("Error"));
await _service.DeployInstanceAsync(1, "admin");
await _audit.DidNotReceive().LogAsync(
Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
Arg.Any<string>(), Arg.Any<object>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_SiteSucceeds_WritesDeployAuditEntry()
{
// DeploymentManager-011: a successful deployment must write a "Deploy"
// audit entry referencing the deployed instance.
var instance = new Instance("AuditInst") { Id = 50, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(50, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(50, "AuditInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(50, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(50, "admin");
Assert.True(result.IsSuccess);
await _audit.Received().LogAsync(
"admin", "Deploy", "Instance", "50", "AuditInst",
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
// ── DeploymentManager-011: lifecycle success paths ──
[Fact]
public async Task DisableInstanceAsync_SiteSucceeds_SetsDisabledStateAndAudits()
{
var instance = new Instance("DisInst") { Id = 51, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(51, Arg.Any<CancellationToken>()).Returns(instance);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DisableInstanceAsync(51, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(InstanceState.Disabled, instance.State);
await _repo.Received().UpdateInstanceAsync(instance, Arg.Any<CancellationToken>());
await _audit.Received().LogAsync(
"admin", "Disable", "Instance", "51", "DisInst",
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task EnableInstanceAsync_SiteSucceeds_SetsEnabledStateAndAudits()
{
var instance = new Instance("EnInst") { Id = 52, SiteId = 1, State = InstanceState.Disabled };
_repo.GetInstanceByIdAsync(52, Arg.Any<CancellationToken>()).Returns(instance);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.EnableInstanceAsync(52, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(InstanceState.Enabled, instance.State);
await _repo.Received().UpdateInstanceAsync(instance, Arg.Any<CancellationToken>());
await _audit.Received().LogAsync(
"admin", "Enable", "Instance", "52", "EnInst",
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeleteInstanceAsync_SiteSucceeds_RemovesRecordAndAudits()
{
var instance = new Instance("DelInst") { Id = 53, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(53, Arg.Any<CancellationToken>()).Returns(instance);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "x", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeleteInstanceAsync(53, "admin");
Assert.True(result.IsSuccess);
await _repo.Received().DeleteInstanceAsync(53, Arg.Any<CancellationToken>());
await _audit.Received().LogAsync(
"admin", "Delete", "Instance", "53", "DelInst",
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_SameInstance_OperationLockSerializesConcurrentDeploys()
{
// DeploymentManager-011: two concurrent deploys of the SAME instance
// must be serialized by the per-instance operation lock — the site sees
// them one at a time, never overlapping.
var instance = new Instance("LockInst") { Id = 54, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(54, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(54, "LockInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(54, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var commActor = Sys.ActorOf(Props.Create(() =>
new SerializationProbeActor()));
var service = CreateServiceWithCommActor(commActor);
var deploy1 = service.DeployInstanceAsync(54, "admin");
var deploy2 = service.DeployInstanceAsync(54, "admin");
var results = await Task.WhenAll(deploy1, deploy2);
Assert.True(results[0].IsSuccess);
Assert.True(results[1].IsSuccess);
// The probe records the maximum concurrency observed; the lock must
// keep it at 1 for a single instance.
Assert.Equal(1, SerializationProbeActor.MaxConcurrent);
}
// ── DeploymentManager-006: query-the-site-before-redeploy idempotency ──
/// <summary>
/// Builds a DeploymentService whose CommunicationService is backed by the
/// supplied actor, so the site query and deploy commands can be observed.
/// </summary>
private DeploymentService CreateServiceWithCommActor(IActorRef commActor)
{
var comms = new CommunicationService(
Options.Create(new CommunicationOptions
{
QueryTimeout = TimeSpan.FromSeconds(5),
DeploymentTimeout = TimeSpan.FromSeconds(5)
}),
NullLogger<CommunicationService>.Instance);
comms.SetCommunicationActor(commActor);
var siteRepo = Substitute.For<ISiteRepository>();
return new DeploymentService(
_repo, siteRepo, _pipeline, comms, _lockManager, _audit,
new DiffService(),
Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }),
NullLogger<DeploymentService>.Instance);
}
private void SetupValidPipeline(int instanceId, string instanceName, string revisionHash)
{
var config = new FlattenedConfiguration { InstanceUniqueName = instanceName };
_pipeline.FlattenAndValidateAsync(instanceId, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, revisionHash, ValidationResult.Success())));
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasTargetHash_MarksSuccessWithoutRedeploy()
{
// Prior record stuck InProgress -> site is queried. The site reports it
// already has the TARGET revision hash, so the prior record is marked
// Success and NO new DeployInstanceCommand is sent.
var instance = new Instance("RedeployInst") { Id = 7, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(7, "RedeployInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior", "admin")
{
InstanceId = 7,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(7, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(7, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(DeploymentStatus.Success, prior.Status);
// The site query was issued, but no new deploy command was sent.
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(0, ReconcileProbeActor.DeployCount);
// No new deployment record was created — the prior one was reconciled.
await _repo.DidNotReceive().AddDeploymentRecordAsync(
Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_SiteHasDifferentHash_ProceedsWithDeploy()
{
// Prior record stuck InProgress -> site is queried. The site has a
// DIFFERENT revision hash, so the normal deploy proceeds.
var instance = new Instance("RedeployInst2") { Id = 8, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(8, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(8, "RedeployInst2", "sha256:target");
var prior = new DeploymentRecord("dep-prior2", "admin")
{
InstanceId = 8,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:old"
};
_repo.GetCurrentDeploymentStatusAsync(8, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:old", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(8, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
// The normal deploy proceeded — a new command was sent.
Assert.Equal(1, ReconcileProbeActor.DeployCount);
await _repo.Received().AddDeploymentRecordAsync(
Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_PriorFailedTimeoutRecord_QueriesSite()
{
// A prior record Failed due to a timeout also triggers the site query.
var instance = new Instance("TimedOutInst") { Id = 9, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(9, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(9, "TimedOutInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior3", "admin")
{
InstanceId = 9,
Status = DeploymentStatus.Failed,
RevisionHash = "sha256:target",
ErrorMessage = "Communication failure: deployment Ask timed out"
};
_repo.GetCurrentDeploymentStatusAsync(9, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(9, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(0, ReconcileProbeActor.DeployCount);
Assert.Equal(DeploymentStatus.Success, prior.Status);
}
[Fact]
public async Task DeployInstanceAsync_PriorSuccessRecord_SkipsSiteQuery()
{
// A clean prior Success record must NOT trigger the extra round-trip.
var instance = new Instance("CleanInst") { Id = 10, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(10, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(10, "CleanInst", "sha256:target");
var prior = new DeploymentRecord("dep-clean", "admin")
{
InstanceId = 10,
Status = DeploymentStatus.Success,
RevisionHash = "sha256:old"
};
_repo.GetCurrentDeploymentStatusAsync(10, Arg.Any<CancellationToken>()).Returns(prior);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(10, "admin");
Assert.True(result.IsSuccess);
// No site query — the prior deploy completed cleanly.
Assert.Equal(0, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
[Fact]
public async Task DeployInstanceAsync_FreshFirstTimeDeploy_SkipsSiteQuery()
{
// No prior record at all -> fresh deploy, no extra round-trip.
var instance = new Instance("FreshInst") { Id = 11, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(11, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(11, "FreshInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(11, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(11, "admin");
Assert.True(result.IsSuccess);
Assert.Equal(0, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
[Fact]
public async Task DeployInstanceAsync_PriorInProgressRecord_QueryFails_FallsThroughToDeploy()
{
// The site query fails (unreachable / times out). The deploy must NOT
// abort — it falls through to a normal deploy and relies on site-side
// stale-rejection as the safety net.
var instance = new Instance("UnreachableInst") { Id = 12, SiteId = 1, State = InstanceState.Enabled };
_repo.GetInstanceByIdAsync(12, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(12, "UnreachableInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior5", "admin")
{
InstanceId = 12,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(12, Arg.Any<CancellationToken>()).Returns(prior);
// The probe drops the query (no reply) -> the Ask times out.
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: true)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(12, "admin");
// Did not abort — the deploy proceeded after the failed query.
Assert.True(result.IsSuccess);
Assert.Equal(1, ReconcileProbeActor.QueryCount);
Assert.Equal(1, ReconcileProbeActor.DeployCount);
}
// ── DeploymentManager-003: post-success persistence must commit the Success status ──
[Fact]
public async Task DeployInstanceAsync_SiteSucceeds_SnapshotWriteFails_RecordStillCommittedSuccess()
{
// The site applies the deployment (response Success), but storing the
// deployed-config snapshot afterwards throws. The deployment record's
// Success status MUST still be durably committed -- otherwise central
// and site diverge: the site runs the new config while central shows a
// non-Success record forever.
var instance = new Instance("SnapFailInst") { Id = 20, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(20, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(20, "SnapFailInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(20, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// The snapshot store throws.
_repo.GetDeployedSnapshotByInstanceIdAsync(20, Arg.Any<CancellationToken>())
.Returns((DeployedConfigSnapshot?)null);
_repo.AddDeployedSnapshotAsync(Arg.Any<DeployedConfigSnapshot>(), Arg.Any<CancellationToken>())
.Returns<Task>(_ => throw new InvalidOperationException("snapshot store unavailable"));
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(20, "admin");
// The site succeeded -> the deployment is reported successful.
Assert.True(result.IsSuccess);
Assert.NotNull(captured);
Assert.Equal(DeploymentStatus.Success, captured!.Status);
// The Success status was committed (a SaveChanges happened with the
// record in Success state) BEFORE the snapshot write was attempted.
await _repo.Received().UpdateDeploymentRecordAsync(
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Success),
Arg.Any<CancellationToken>());
}
/// <summary>
/// Stand-in CentralCommunicationActor that measures deploy concurrency. It
/// defers each deploy reply via the scheduler, so if two deploys for the
/// same instance were NOT serialized by the operation lock their windows
/// would overlap and <see cref="MaxConcurrent"/> would exceed 1.
/// </summary>
private class SerializationProbeActor : ReceiveActor, IWithTimers
{
public static int MaxConcurrent;
private static int _current;
private static readonly object Gate = new();
public ITimerScheduler Timers { get; set; } = null!;
public SerializationProbeActor()
{
MaxConcurrent = 0;
_current = 0;
Receive<SiteEnvelope>(env =>
{
if (env.Message is DeployInstanceCommand d)
{
lock (Gate)
{
_current++;
if (_current > MaxConcurrent) MaxConcurrent = _current;
}
var replyTo = Sender;
// Defer the reply so the deploy "window" stays open long
// enough for a non-serialized second deploy to overlap.
Timers.StartSingleTimer(
d.DeploymentId,
new CompleteDeploy(d, replyTo),
TimeSpan.FromMilliseconds(150));
}
else if (env.Message is DeploymentStateQueryRequest q)
{
Sender.Tell(new DeploymentStateQueryResponse(
q.CorrelationId, q.InstanceUniqueName, false, null, null, DateTimeOffset.UtcNow));
}
});
Receive<CompleteDeploy>(c =>
{
lock (Gate)
{
_current--;
}
c.ReplyTo.Tell(new DeploymentStatusResponse(
c.Command.DeploymentId, c.Command.InstanceUniqueName,
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
});
}
private sealed record CompleteDeploy(DeployInstanceCommand Command, IActorRef ReplyTo);
}
/// <summary>
/// Stand-in CentralCommunicationActor for reconciliation tests. Counts the
/// site queries and deploy commands it receives, answers queries with a
/// configurable applied revision hash, and (optionally) drops the query to
/// simulate an unreachable site so the central Ask times out.
/// </summary>
private class ReconcileProbeActor : ReceiveActor
{
public static int QueryCount;
public static int DeployCount;
public ReconcileProbeActor(string siteHash, bool failQuery)
{
// Each test creates a fresh actor; reset the shared counters.
QueryCount = 0;
DeployCount = 0;
Receive<SiteEnvelope>(env =>
{
switch (env.Message)
{
case DeploymentStateQueryRequest q:
QueryCount++;
if (!failQuery)
{
Sender.Tell(new DeploymentStateQueryResponse(
q.CorrelationId, q.InstanceUniqueName, true,
"dep-applied", siteHash, DateTimeOffset.UtcNow));
}
// failQuery: drop the message -> caller's Ask times out.
break;
case DeployInstanceCommand d:
DeployCount++;
Sender.Tell(new DeploymentStatusResponse(
d.DeploymentId, d.InstanceUniqueName,
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
break;
case DisableInstanceCommand dis:
Sender.Tell(new InstanceLifecycleResponse(
dis.CommandId, dis.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
break;
case EnableInstanceCommand en:
Sender.Tell(new InstanceLifecycleResponse(
en.CommandId, en.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
break;
case DeleteInstanceCommand del:
Sender.Tell(new InstanceLifecycleResponse(
del.CommandId, del.InstanceUniqueName, true, null, DateTimeOffset.UtcNow));
break;
}
});
}
}
}