feat(deploy): wire periodic PendingDeployment purge + SQL Server same-id re-stage test

Notify-and-fetch follow-ups:

- PendingDeploymentPurgeActor: a central cluster singleton (not
  readiness-gated, best-effort) that sweeps expired PendingDeployment
  staging rows on CommunicationOptions.PendingDeploymentPurgeInterval
  (default 1h). Modeled on the kpi-history-recorder pattern: self-scheduling
  timer, per-tick DI scope -> IDeploymentManagerRepository, continue-on-error.
  Wired in AkkaHostedService.RegisterCentralActors (manager + proxy + drain);
  resolves the deferred TODO in DeploymentService. Correctness never depends
  on it (supersession bounds rows to <=1/instance; the fetch endpoint enforces
  the TTL), so it is deliberately absent from RequiredSingletonsHealthCheck.

- SQL Server integration test for StagePendingIfAbsentAsync re-staging an
  instance's OWN DeploymentId over an expired row against the real UNIQUE
  index on DeploymentId — confirms EF orders DELETE before INSERT in one
  SaveChanges (SQLite's constraint timing differs from SQL Server's). Plus
  a same-instance supersession variant on real SQL Server.

Tests: 2 TestKit actor tests + 2 SQL Server integration tests (both ran
green against the infra MSSQL container); 235 Communication + 15
PendingDeployment tests pass; Host builds 0 warnings.
This commit is contained in:
Joseph Doherty
2026-06-26 23:19:29 -04:00
parent d9f5fbb664
commit 06f2df4f89
6 changed files with 431 additions and 4 deletions
@@ -0,0 +1,126 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// <summary>
/// Central cluster singleton that periodically reclaims expired (TTL-elapsed)
/// <c>PendingDeployment</c> staging rows from the central configuration database —
/// the notify-and-fetch deploy transport stages each deploy's flattened config in a
/// <c>PendingDeployment</c> row that the site fetches over HTTP using a per-deployment
/// token, and this actor is the maintenance cadence that sweeps rows whose TTL has
/// elapsed without a re-deploy superseding them.
/// </summary>
/// <remarks>
/// <para>
/// <b>Best-effort, not readiness-gated.</b> This purge is pure hygiene: supersession
/// already bounds the table to ≤1 pending row per instance, and the config-fetch
/// endpoint enforces the TTL on read (expired rows return 404), so correctness never
/// depends on this purge running. It exists only to reclaim rows left behind by
/// instances that are deployed once and never re-deployed. Like
/// <c>KpiHistoryRecorderActor</c>, it is deliberately absent from
/// <c>RequiredSingletonsHealthCheck</c> — it must never gate <c>/health/ready</c>.
/// </para>
/// <para>
/// <b>Continue-on-error.</b> The tick handler swallows every exception and logs it; a
/// transient SQL failure on one tick must not crash the singleton — the next tick
/// retries. The per-tick try/catch (not the supervisor) is what keeps the actor alive.
/// </para>
/// <para>
/// <b>DI scopes.</b> <see cref="IDeploymentManagerRepository"/> is a scoped EF Core
/// service registered by <c>AddConfigurationDatabase</c>. The singleton opens one DI
/// scope per tick and resolves the repository there, mirroring the
/// <c>AuditLogPurgeActor</c> / <c>KpiHistoryRecorderActor</c> pattern.
/// </para>
/// </remarks>
public class PendingDeploymentPurgeActor : ReceiveActor
{
private readonly IServiceProvider _services;
private readonly TimeSpan _interval;
private readonly ILogger<PendingDeploymentPurgeActor> _logger;
private ICancelable? _timer;
/// <summary>Initializes a new instance of <see cref="PendingDeploymentPurgeActor"/> and registers the tick handler.</summary>
/// <param name="services">Root DI provider used to create a scoped repository per tick.</param>
/// <param name="options">Communication options supplying the purge interval.</param>
/// <param name="logger">Logger instance.</param>
public PendingDeploymentPurgeActor(
IServiceProvider services,
IOptions<CommunicationOptions> options,
ILogger<PendingDeploymentPurgeActor> logger)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_services = services;
_interval = options.Value.PendingDeploymentPurgeInterval;
_logger = logger;
ReceiveAsync<PurgeTick>(_ => OnTickAsync());
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
_timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: _interval,
interval: _interval,
receiver: Self,
message: PurgeTick.Instance,
sender: Self);
}
/// <inheritdoc />
protected override void PostStop()
{
_timer?.Cancel();
base.PostStop();
}
private async Task OnTickAsync()
{
// CreateAsyncScope + await using so the scoped EF Core DbContext (IAsyncDisposable)
// disposes asynchronously without blocking on connection cleanup.
await using var scope = _services.CreateAsyncScope();
IDeploymentManagerRepository repository;
try
{
repository = scope.ServiceProvider.GetRequiredService<IDeploymentManagerRepository>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resolve IDeploymentManagerRepository for PendingDeployment purge tick.");
return;
}
try
{
var purged = await repository
.PurgeExpiredPendingDeploymentsAsync(DateTimeOffset.UtcNow)
.ConfigureAwait(false);
if (purged > 0)
{
_logger.LogInformation(
"Purged {Count} expired PendingDeployment staging row(s).", purged);
}
}
catch (Exception ex)
{
// Best-effort: a failed tick must not crash the singleton. The next
// interval retries; correctness does not depend on any single tick.
_logger.LogError(ex, "PendingDeployment purge tick failed; retrying on the next interval.");
}
}
/// <summary>Self-tick triggering one expired-row purge pass.</summary>
internal sealed class PurgeTick
{
public static readonly PurgeTick Instance = new();
private PurgeTick() { }
}
}
@@ -71,4 +71,13 @@ public class CommunicationOptions
/// comfortably cover both site nodes' fetches within one deploy window.
/// </summary>
public TimeSpan PendingDeploymentTtl { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// How often the central <c>PendingDeploymentPurgeActor</c> singleton reclaims
/// expired (TTL-elapsed) PendingDeployment staging rows. Best-effort hygiene only:
/// supersession bounds pending rows to ≤1 per instance and the config-fetch endpoint
/// already enforces the TTL, so this purge merely sweeps rows left behind by instances
/// that are deployed once and never re-deployed. Default 1 hour ≫ <see cref="PendingDeploymentTtl"/>.
/// </summary>
public TimeSpan PendingDeploymentPurgeInterval { get; set; } = TimeSpan.FromHours(1);
}
@@ -285,10 +285,9 @@ public class DeploymentService
// standby node to fetch; deleting now would 404 that in-flight
// standby fetch and break failover. Supersession bounds pending rows
// to ≤1 per instance and the fetch endpoint enforces the TTL, so
// leaving rows for TTL purge is safe.
// TODO(notify-and-fetch): wire PurgeExpiredPendingDeploymentsAsync
// into a central maintenance cadence (none exists in DeploymentManager
// today; deferred — supersession + endpoint TTL keep this safe).
// leaving rows for TTL purge is safe. Expired rows are swept by the
// central PendingDeploymentPurgeActor singleton on its
// CommunicationOptions.PendingDeploymentPurgeInterval cadence.
var response = await _communicationService.RefreshDeploymentAsync(siteId, command, cancellationToken);
// WP-1: Update status based on site response.
@@ -756,6 +756,60 @@ akka {{
_actorSystem.ActorOf(kpiHistoryProxyProps, "kpi-history-recorder-proxy");
_logger.LogInformation("KpiHistoryRecorderActor singleton created (not readiness-gated)");
// Notify-and-fetch deploy transport — central singleton that periodically
// reclaims expired (TTL-elapsed) PendingDeployment staging rows. Mirrors the
// kpi-history-recorder singleton pattern above: a ClusterSingletonManager pins
// it to the active central node, a ClusterSingletonProxy gives a stable address,
// and a PhaseClusterLeave graceful-stop task drains the in-flight tick before
// handover. The purge timer self-schedules in PreStart. NOT readiness-gated by
// design: the purge is best-effort hygiene — supersession bounds the table to ≤1
// pending row per instance and the config-fetch endpoint enforces the TTL on read,
// so correctness never depends on it; pending-deployment-purge is therefore
// deliberately absent from RequiredSingletonsHealthCheck. The actor takes the root
// IServiceProvider and opens its own per-tick DI scope (IDeploymentManagerRepository
// is a scoped EF Core service).
var pendingPurgeLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<PendingDeploymentPurgeActor>();
var pendingPurgeCommunicationOptions =
_serviceProvider.GetRequiredService<IOptions<CommunicationOptions>>();
var pendingPurgeSingletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new PendingDeploymentPurgeActor(
_serviceProvider,
pendingPurgeCommunicationOptions,
pendingPurgeLogger)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithSingletonName("pending-deployment-purge"));
var pendingPurgeSingletonManager =
_actorSystem!.ActorOf(pendingPurgeSingletonProps, "pending-deployment-purge-singleton");
var pendingPurgeShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
pendingPurgeShutdown.AddTask(
Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
"drain-pending-deployment-purge-singleton",
async () =>
{
try
{
await pendingPurgeSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"PendingDeploymentPurge singleton did not drain within the graceful-stop "
+ "timeout; falling through to PoisonPill handover");
}
return Akka.Done.Instance;
});
var pendingPurgeProxyProps = ClusterSingletonProxy.Props(
singletonManagerPath: "/user/pending-deployment-purge-singleton",
settings: ClusterSingletonProxySettings.Create(_actorSystem)
.WithSingletonName("pending-deployment-purge"));
_actorSystem.ActorOf(pendingPurgeProxyProps, "pending-deployment-purge-proxy");
_logger.LogInformation("PendingDeploymentPurgeActor singleton created (not readiness-gated)");
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
}
@@ -0,0 +1,98 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests;
/// <summary>
/// Tests for <see cref="PendingDeploymentPurgeActor"/> — the notify-and-fetch central
/// singleton that sweeps expired PendingDeployment staging rows. The actor is thin glue
/// over <c>IDeploymentManagerRepository.PurgeExpiredPendingDeploymentsAsync</c> (whose
/// row-level semantics are covered by the repository tests); these tests pin the actor's
/// own policy: it opens a per-tick DI scope, calls the purge with a now-ish threshold,
/// and — being best-effort hygiene — survives a throwing purge and keeps ticking.
/// </summary>
/// <remarks>
/// The substitute's return is driven by a closure with an <see cref="System.Threading.Interlocked"/>
/// counter rather than NSubstitute's <c>ReceivedCalls()</c>, because the repeating
/// scheduler timer records calls on a background thread concurrently with the test
/// thread's assertions — querying NSubstitute's call list under that concurrency can
/// throw "collection was modified". A fenced counter is safe to read while the timer runs.
/// </remarks>
public class PendingDeploymentPurgeActorTests : TestKit
{
private IActorRef CreateActor(IDeploymentManagerRepository repo, TimeSpan interval)
{
var services = new ServiceCollection();
// Mirror AddConfigurationDatabase: IDeploymentManagerRepository is scoped, so the
// actor opens a fresh scope per tick and resolves the repository there.
services.AddScoped(_ => repo);
var sp = services.BuildServiceProvider();
var options = Options.Create(new CommunicationOptions { PendingDeploymentPurgeInterval = interval });
return Sys.ActorOf(Props.Create(() => new PendingDeploymentPurgeActor(
sp, options, NullLogger<PendingDeploymentPurgeActor>.Instance)));
}
[Fact]
public void Tick_PurgesExpiredRows_WithNowThreshold()
{
var calls = 0;
DateTimeOffset captured = default;
var repo = Substitute.For<IDeploymentManagerRepository>();
repo.PurgeExpiredPendingDeploymentsAsync(Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(ci =>
{
captured = ci.Arg<DateTimeOffset>();
Interlocked.Increment(ref calls);
return 3;
});
// Large interval so the PreStart timer never fires during the test — only the
// single manual tick below drives a purge, making the count deterministic.
var actor = CreateActor(repo, TimeSpan.FromHours(1));
var before = DateTimeOffset.UtcNow;
actor.Tell(PendingDeploymentPurgeActor.PurgeTick.Instance);
AwaitAssert(
() => Assert.Equal(1, Volatile.Read(ref calls)),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
// The purge threshold is "now" — bounded by the window around the tick. Reading
// `captured` after the fenced counter read is safe (Interlocked establishes the
// happens-before with the write that precedes the increment).
var after = DateTimeOffset.UtcNow;
Assert.InRange(captured, before, after);
}
[Fact]
public void Tick_PurgeThrows_ActorSurvives_AndRetriesOnNextInterval()
{
var calls = 0;
var repo = Substitute.For<IDeploymentManagerRepository>();
repo.PurgeExpiredPendingDeploymentsAsync(Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns<int>(_ =>
{
Interlocked.Increment(ref calls);
throw new InvalidOperationException("simulated purge failure");
});
// Fast interval so the self-scheduling timer fires repeatedly; the actor must
// swallow each per-tick exception and keep ticking (best-effort hygiene — a
// crash here would drop the singleton until failover).
CreateActor(repo, TimeSpan.FromMilliseconds(100));
AwaitAssert(
() => Assert.True(Volatile.Read(ref calls) >= 2,
$"expected >= 2 ticks despite failures, got {Volatile.Read(ref calls)}"),
duration: TimeSpan.FromSeconds(5),
interval: TimeSpan.FromMilliseconds(50));
}
}
@@ -0,0 +1,141 @@
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests.Migrations;
using Xunit;
namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests.Repositories;
/// <summary>
/// SQL Server integration coverage for the notify-and-fetch <see cref="PendingDeployment"/>
/// staging store's <b>same-DeploymentId re-stage</b> path. The SQLite in-memory fixture
/// (<c>PendingDeploymentRepositoryTests</c>) covers the logic, but the delete-before-insert
/// ordering that lets <see cref="DeploymentManagerRepository.StagePendingIfAbsentAsync"/>
/// re-stage an instance's OWN <c>DeploymentId</c> over an expired row depends on EF emitting
/// the DELETE before the INSERT within a single <c>SaveChanges</c> — against the production
/// UNIQUE index on <c>DeploymentId</c>. SQLite's constraint timing differs from SQL Server's
/// (SQLite defers/relaxes within a transaction where SQL Server enforces per-statement), so
/// this class asserts the behaviour against the real migrated schema via
/// <see cref="MsSqlMigrationFixture"/> rather than the SQLite provider.
/// </summary>
public class PendingDeploymentRepositoryIntegrationTests : IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public PendingDeploymentRepositoryIntegrationTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
[SkippableFact]
public async Task StagePendingIfAbsent_ExpiredRowSameDeploymentId_ReStages_NoUniqueViolation()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var instanceId = await SeedInstanceAsync("ReStageInst");
var now = DateTimeOffset.UtcNow;
// Seed an EXPIRED pending row carrying DeploymentId "D1". A startup reconcile re-stages
// the DeployedConfigSnapshot's OWN DeploymentId, so the fresh row reuses "D1" — the exact
// collision the unique index would reject if the DELETE didn't precede the INSERT.
await using (var seedContext = CreateContext())
{
seedContext.Set<PendingDeployment>().Add(new PendingDeployment(
"D1", instanceId, "rev-old", "{\"old\":true}", "tok-old",
now.AddMinutes(-20), now.AddMinutes(-10)));
await seedContext.SaveChangesAsync();
}
// Re-stage "D1" over the expired row. Against the real UNIQUE index on DeploymentId this
// MUST succeed (EF orders delete-before-insert in the single SaveChanges) rather than
// throwing SqlException 2627/2601.
await using (var context = CreateContext())
{
var repo = new DeploymentManagerRepository(context);
var staged = await repo.StagePendingIfAbsentAsync(
instanceId, "D1", "rev-new", "{\"new\":true}", "tok-new",
now, now.AddMinutes(10));
Assert.True(staged);
}
// Exactly one row for "D1" survives — the fresh one.
await using (var readContext = CreateContext())
{
var rows = await readContext.Set<PendingDeployment>()
.Where(p => p.DeploymentId == "D1")
.ToListAsync();
Assert.Single(rows);
Assert.Equal("tok-new", rows[0].Token);
Assert.Equal("{\"new\":true}", rows[0].ConfigurationJson);
Assert.True(rows[0].ExpiresAtUtc > now);
}
}
[SkippableFact]
public async Task AddPendingDeployment_NewDeployDifferentId_SupersedesPriorRow_OnRealSqlServer()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var instanceId = await SeedInstanceAsync("SupersedeInst");
var now = DateTimeOffset.UtcNow;
await using (var context = CreateContext())
{
var repo = new DeploymentManagerRepository(context);
await repo.AddPendingDeploymentAsync(new PendingDeployment(
"dep-A", instanceId, "rev-a", "{\"v\":1}", "tok-a", now, now.AddMinutes(5)));
await repo.SaveChangesAsync();
}
// A newer deploy for the SAME instance with a DIFFERENT DeploymentId must replace the
// prior row (delete-then-insert under the per-instance operation lock) with no FK or
// unique conflict on real SQL Server.
await using (var context = CreateContext())
{
var repo = new DeploymentManagerRepository(context);
await repo.AddPendingDeploymentAsync(new PendingDeployment(
"dep-B", instanceId, "rev-b", "{\"v\":2}", "tok-b", now, now.AddMinutes(5)));
await repo.SaveChangesAsync();
}
await using (var readContext = CreateContext())
{
var rows = await readContext.Set<PendingDeployment>()
.Where(p => p.InstanceId == instanceId)
.ToListAsync();
Assert.Single(rows);
Assert.Equal("dep-B", rows[0].DeploymentId);
Assert.Equal("{\"v\":2}", rows[0].ConfigurationJson);
}
}
/// <summary>
/// Seeds an Instance (with its required Site + Template) against the fixture database and
/// returns its generated id. The PendingDeployment → Instance FK requires a real parent row.
/// </summary>
private async Task<int> SeedInstanceAsync(string uniqueName)
{
await using var context = CreateContext();
var site = new Site($"Site-{uniqueName}", $"S-{uniqueName}");
var template = new Template($"T-{uniqueName}");
context.Sites.Add(site);
context.Templates.Add(template);
await context.SaveChangesAsync();
var instance = new Instance(uniqueName) { SiteId = site.Id, TemplateId = template.Id };
context.Instances.Add(instance);
await context.SaveChangesAsync();
return instance.Id;
}
private ScadaBridgeDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaBridgeDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaBridgeDbContext(options);
}
}