diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/PendingDeploymentPurgeActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/PendingDeploymentPurgeActor.cs new file mode 100644 index 00000000..0a4768aa --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/PendingDeploymentPurgeActor.cs @@ -0,0 +1,126 @@ +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; + +namespace ZB.MOM.WW.ScadaBridge.Communication.Actors; + +/// +/// Central cluster singleton that periodically reclaims expired (TTL-elapsed) +/// PendingDeployment staging rows from the central configuration database — +/// the notify-and-fetch deploy transport stages each deploy's flattened config in a +/// PendingDeployment row that the site fetches over HTTP using a per-deployment +/// token, and this actor is the maintenance cadence that sweeps rows whose TTL has +/// elapsed without a re-deploy superseding them. +/// +/// +/// +/// Best-effort, not readiness-gated. This purge is pure hygiene: supersession +/// already bounds the table to ≤1 pending row per instance, and the config-fetch +/// endpoint enforces the TTL on read (expired rows return 404), so correctness never +/// depends on this purge running. It exists only to reclaim rows left behind by +/// instances that are deployed once and never re-deployed. Like +/// KpiHistoryRecorderActor, it is deliberately absent from +/// RequiredSingletonsHealthCheck — it must never gate /health/ready. +/// +/// +/// Continue-on-error. The tick handler swallows every exception and logs it; a +/// transient SQL failure on one tick must not crash the singleton — the next tick +/// retries. The per-tick try/catch (not the supervisor) is what keeps the actor alive. +/// +/// +/// DI scopes. is a scoped EF Core +/// service registered by AddConfigurationDatabase. The singleton opens one DI +/// scope per tick and resolves the repository there, mirroring the +/// AuditLogPurgeActor / KpiHistoryRecorderActor pattern. +/// +/// +public class PendingDeploymentPurgeActor : ReceiveActor +{ + private readonly IServiceProvider _services; + private readonly TimeSpan _interval; + private readonly ILogger _logger; + private ICancelable? _timer; + + /// Initializes a new instance of and registers the tick handler. + /// Root DI provider used to create a scoped repository per tick. + /// Communication options supplying the purge interval. + /// Logger instance. + public PendingDeploymentPurgeActor( + IServiceProvider services, + IOptions options, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(logger); + + _services = services; + _interval = options.Value.PendingDeploymentPurgeInterval; + _logger = logger; + + ReceiveAsync(_ => OnTickAsync()); + } + + /// + protected override void PreStart() + { + base.PreStart(); + _timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + initialDelay: _interval, + interval: _interval, + receiver: Self, + message: PurgeTick.Instance, + sender: Self); + } + + /// + protected override void PostStop() + { + _timer?.Cancel(); + base.PostStop(); + } + + private async Task OnTickAsync() + { + // CreateAsyncScope + await using so the scoped EF Core DbContext (IAsyncDisposable) + // disposes asynchronously without blocking on connection cleanup. + await using var scope = _services.CreateAsyncScope(); + IDeploymentManagerRepository repository; + try + { + repository = scope.ServiceProvider.GetRequiredService(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to resolve IDeploymentManagerRepository for PendingDeployment purge tick."); + return; + } + + try + { + var purged = await repository + .PurgeExpiredPendingDeploymentsAsync(DateTimeOffset.UtcNow) + .ConfigureAwait(false); + if (purged > 0) + { + _logger.LogInformation( + "Purged {Count} expired PendingDeployment staging row(s).", purged); + } + } + catch (Exception ex) + { + // Best-effort: a failed tick must not crash the singleton. The next + // interval retries; correctness does not depend on any single tick. + _logger.LogError(ex, "PendingDeployment purge tick failed; retrying on the next interval."); + } + } + + /// Self-tick triggering one expired-row purge pass. + internal sealed class PurgeTick + { + public static readonly PurgeTick Instance = new(); + private PurgeTick() { } + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs index 78c6cc41..f794f9cc 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs @@ -71,4 +71,13 @@ public class CommunicationOptions /// comfortably cover both site nodes' fetches within one deploy window. /// public TimeSpan PendingDeploymentTtl { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// How often the central PendingDeploymentPurgeActor singleton reclaims + /// expired (TTL-elapsed) PendingDeployment staging rows. Best-effort hygiene only: + /// supersession bounds pending rows to ≤1 per instance and the config-fetch endpoint + /// already enforces the TTL, so this purge merely sweeps rows left behind by instances + /// that are deployed once and never re-deployed. Default 1 hour ≫ . + /// + public TimeSpan PendingDeploymentPurgeInterval { get; set; } = TimeSpan.FromHours(1); } diff --git a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs index 4f410ea9..7cc5c4b7 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs @@ -285,10 +285,9 @@ public class DeploymentService // standby node to fetch; deleting now would 404 that in-flight // standby fetch and break failover. Supersession bounds pending rows // to ≤1 per instance and the fetch endpoint enforces the TTL, so - // leaving rows for TTL purge is safe. - // TODO(notify-and-fetch): wire PurgeExpiredPendingDeploymentsAsync - // into a central maintenance cadence (none exists in DeploymentManager - // today; deferred — supersession + endpoint TTL keep this safe). + // leaving rows for TTL purge is safe. Expired rows are swept by the + // central PendingDeploymentPurgeActor singleton on its + // CommunicationOptions.PendingDeploymentPurgeInterval cadence. var response = await _communicationService.RefreshDeploymentAsync(siteId, command, cancellationToken); // WP-1: Update status based on site response. diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index 5012583c..1b04ebd9 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -756,6 +756,60 @@ akka {{ _actorSystem.ActorOf(kpiHistoryProxyProps, "kpi-history-recorder-proxy"); _logger.LogInformation("KpiHistoryRecorderActor singleton created (not readiness-gated)"); + // Notify-and-fetch deploy transport — central singleton that periodically + // reclaims expired (TTL-elapsed) PendingDeployment staging rows. Mirrors the + // kpi-history-recorder singleton pattern above: a ClusterSingletonManager pins + // it to the active central node, a ClusterSingletonProxy gives a stable address, + // and a PhaseClusterLeave graceful-stop task drains the in-flight tick before + // handover. The purge timer self-schedules in PreStart. NOT readiness-gated by + // design: the purge is best-effort hygiene — supersession bounds the table to ≤1 + // pending row per instance and the config-fetch endpoint enforces the TTL on read, + // so correctness never depends on it; pending-deployment-purge is therefore + // deliberately absent from RequiredSingletonsHealthCheck. The actor takes the root + // IServiceProvider and opens its own per-tick DI scope (IDeploymentManagerRepository + // is a scoped EF Core service). + var pendingPurgeLogger = _serviceProvider.GetRequiredService() + .CreateLogger(); + var pendingPurgeCommunicationOptions = + _serviceProvider.GetRequiredService>(); + + var pendingPurgeSingletonProps = ClusterSingletonManager.Props( + singletonProps: Props.Create(() => new PendingDeploymentPurgeActor( + _serviceProvider, + pendingPurgeCommunicationOptions, + pendingPurgeLogger)), + terminationMessage: PoisonPill.Instance, + settings: ClusterSingletonManagerSettings.Create(_actorSystem!) + .WithSingletonName("pending-deployment-purge")); + var pendingPurgeSingletonManager = + _actorSystem!.ActorOf(pendingPurgeSingletonProps, "pending-deployment-purge-singleton"); + + var pendingPurgeShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem); + pendingPurgeShutdown.AddTask( + Akka.Actor.CoordinatedShutdown.PhaseClusterLeave, + "drain-pending-deployment-purge-singleton", + async () => + { + try + { + await pendingPurgeSingletonManager.GracefulStop(TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "PendingDeploymentPurge singleton did not drain within the graceful-stop " + + "timeout; falling through to PoisonPill handover"); + } + return Akka.Done.Instance; + }); + + var pendingPurgeProxyProps = ClusterSingletonProxy.Props( + singletonManagerPath: "/user/pending-deployment-purge-singleton", + settings: ClusterSingletonProxySettings.Create(_actorSystem) + .WithSingletonName("pending-deployment-purge")); + _actorSystem.ActorOf(pendingPurgeProxyProps, "pending-deployment-purge-proxy"); + _logger.LogInformation("PendingDeploymentPurgeActor singleton created (not readiness-gated)"); + _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); } diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/PendingDeploymentPurgeActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/PendingDeploymentPurgeActorTests.cs new file mode 100644 index 00000000..6f3f69aa --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/PendingDeploymentPurgeActorTests.cs @@ -0,0 +1,98 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Communication.Actors; + +namespace ZB.MOM.WW.ScadaBridge.Communication.Tests; + +/// +/// Tests for — the notify-and-fetch central +/// singleton that sweeps expired PendingDeployment staging rows. The actor is thin glue +/// over IDeploymentManagerRepository.PurgeExpiredPendingDeploymentsAsync (whose +/// row-level semantics are covered by the repository tests); these tests pin the actor's +/// own policy: it opens a per-tick DI scope, calls the purge with a now-ish threshold, +/// and — being best-effort hygiene — survives a throwing purge and keeps ticking. +/// +/// +/// The substitute's return is driven by a closure with an +/// counter rather than NSubstitute's ReceivedCalls(), because the repeating +/// scheduler timer records calls on a background thread concurrently with the test +/// thread's assertions — querying NSubstitute's call list under that concurrency can +/// throw "collection was modified". A fenced counter is safe to read while the timer runs. +/// +public class PendingDeploymentPurgeActorTests : TestKit +{ + private IActorRef CreateActor(IDeploymentManagerRepository repo, TimeSpan interval) + { + var services = new ServiceCollection(); + // Mirror AddConfigurationDatabase: IDeploymentManagerRepository is scoped, so the + // actor opens a fresh scope per tick and resolves the repository there. + services.AddScoped(_ => repo); + var sp = services.BuildServiceProvider(); + + var options = Options.Create(new CommunicationOptions { PendingDeploymentPurgeInterval = interval }); + return Sys.ActorOf(Props.Create(() => new PendingDeploymentPurgeActor( + sp, options, NullLogger.Instance))); + } + + [Fact] + public void Tick_PurgesExpiredRows_WithNowThreshold() + { + var calls = 0; + DateTimeOffset captured = default; + var repo = Substitute.For(); + repo.PurgeExpiredPendingDeploymentsAsync(Arg.Any(), Arg.Any()) + .Returns(ci => + { + captured = ci.Arg(); + Interlocked.Increment(ref calls); + return 3; + }); + + // Large interval so the PreStart timer never fires during the test — only the + // single manual tick below drives a purge, making the count deterministic. + var actor = CreateActor(repo, TimeSpan.FromHours(1)); + + var before = DateTimeOffset.UtcNow; + actor.Tell(PendingDeploymentPurgeActor.PurgeTick.Instance); + + AwaitAssert( + () => Assert.Equal(1, Volatile.Read(ref calls)), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + // The purge threshold is "now" — bounded by the window around the tick. Reading + // `captured` after the fenced counter read is safe (Interlocked establishes the + // happens-before with the write that precedes the increment). + var after = DateTimeOffset.UtcNow; + Assert.InRange(captured, before, after); + } + + [Fact] + public void Tick_PurgeThrows_ActorSurvives_AndRetriesOnNextInterval() + { + var calls = 0; + var repo = Substitute.For(); + repo.PurgeExpiredPendingDeploymentsAsync(Arg.Any(), Arg.Any()) + .Returns(_ => + { + Interlocked.Increment(ref calls); + throw new InvalidOperationException("simulated purge failure"); + }); + + // Fast interval so the self-scheduling timer fires repeatedly; the actor must + // swallow each per-tick exception and keep ticking (best-effort hygiene — a + // crash here would drop the singleton until failover). + CreateActor(repo, TimeSpan.FromMilliseconds(100)); + + AwaitAssert( + () => Assert.True(Volatile.Read(ref calls) >= 2, + $"expected >= 2 ticks despite failures, got {Volatile.Read(ref calls)}"), + duration: TimeSpan.FromSeconds(5), + interval: TimeSpan.FromMilliseconds(50)); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/Repositories/PendingDeploymentRepositoryIntegrationTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/Repositories/PendingDeploymentRepositoryIntegrationTests.cs new file mode 100644 index 00000000..ce76e3bf --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/Repositories/PendingDeploymentRepositoryIntegrationTests.cs @@ -0,0 +1,141 @@ +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates; +using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories; +using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests.Migrations; +using Xunit; + +namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests.Repositories; + +/// +/// SQL Server integration coverage for the notify-and-fetch +/// staging store's same-DeploymentId re-stage path. The SQLite in-memory fixture +/// (PendingDeploymentRepositoryTests) covers the logic, but the delete-before-insert +/// ordering that lets +/// re-stage an instance's OWN DeploymentId over an expired row depends on EF emitting +/// the DELETE before the INSERT within a single SaveChanges — against the production +/// UNIQUE index on DeploymentId. SQLite's constraint timing differs from SQL Server's +/// (SQLite defers/relaxes within a transaction where SQL Server enforces per-statement), so +/// this class asserts the behaviour against the real migrated schema via +/// rather than the SQLite provider. +/// +public class PendingDeploymentRepositoryIntegrationTests : IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public PendingDeploymentRepositoryIntegrationTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + [SkippableFact] + public async Task StagePendingIfAbsent_ExpiredRowSameDeploymentId_ReStages_NoUniqueViolation() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var instanceId = await SeedInstanceAsync("ReStageInst"); + var now = DateTimeOffset.UtcNow; + + // Seed an EXPIRED pending row carrying DeploymentId "D1". A startup reconcile re-stages + // the DeployedConfigSnapshot's OWN DeploymentId, so the fresh row reuses "D1" — the exact + // collision the unique index would reject if the DELETE didn't precede the INSERT. + await using (var seedContext = CreateContext()) + { + seedContext.Set().Add(new PendingDeployment( + "D1", instanceId, "rev-old", "{\"old\":true}", "tok-old", + now.AddMinutes(-20), now.AddMinutes(-10))); + await seedContext.SaveChangesAsync(); + } + + // Re-stage "D1" over the expired row. Against the real UNIQUE index on DeploymentId this + // MUST succeed (EF orders delete-before-insert in the single SaveChanges) rather than + // throwing SqlException 2627/2601. + await using (var context = CreateContext()) + { + var repo = new DeploymentManagerRepository(context); + var staged = await repo.StagePendingIfAbsentAsync( + instanceId, "D1", "rev-new", "{\"new\":true}", "tok-new", + now, now.AddMinutes(10)); + Assert.True(staged); + } + + // Exactly one row for "D1" survives — the fresh one. + await using (var readContext = CreateContext()) + { + var rows = await readContext.Set() + .Where(p => p.DeploymentId == "D1") + .ToListAsync(); + Assert.Single(rows); + Assert.Equal("tok-new", rows[0].Token); + Assert.Equal("{\"new\":true}", rows[0].ConfigurationJson); + Assert.True(rows[0].ExpiresAtUtc > now); + } + } + + [SkippableFact] + public async Task AddPendingDeployment_NewDeployDifferentId_SupersedesPriorRow_OnRealSqlServer() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var instanceId = await SeedInstanceAsync("SupersedeInst"); + var now = DateTimeOffset.UtcNow; + + await using (var context = CreateContext()) + { + var repo = new DeploymentManagerRepository(context); + await repo.AddPendingDeploymentAsync(new PendingDeployment( + "dep-A", instanceId, "rev-a", "{\"v\":1}", "tok-a", now, now.AddMinutes(5))); + await repo.SaveChangesAsync(); + } + + // A newer deploy for the SAME instance with a DIFFERENT DeploymentId must replace the + // prior row (delete-then-insert under the per-instance operation lock) with no FK or + // unique conflict on real SQL Server. + await using (var context = CreateContext()) + { + var repo = new DeploymentManagerRepository(context); + await repo.AddPendingDeploymentAsync(new PendingDeployment( + "dep-B", instanceId, "rev-b", "{\"v\":2}", "tok-b", now, now.AddMinutes(5))); + await repo.SaveChangesAsync(); + } + + await using (var readContext = CreateContext()) + { + var rows = await readContext.Set() + .Where(p => p.InstanceId == instanceId) + .ToListAsync(); + Assert.Single(rows); + Assert.Equal("dep-B", rows[0].DeploymentId); + Assert.Equal("{\"v\":2}", rows[0].ConfigurationJson); + } + } + + /// + /// Seeds an Instance (with its required Site + Template) against the fixture database and + /// returns its generated id. The PendingDeployment → Instance FK requires a real parent row. + /// + private async Task SeedInstanceAsync(string uniqueName) + { + await using var context = CreateContext(); + var site = new Site($"Site-{uniqueName}", $"S-{uniqueName}"); + var template = new Template($"T-{uniqueName}"); + context.Sites.Add(site); + context.Templates.Add(template); + await context.SaveChangesAsync(); + + var instance = new Instance(uniqueName) { SiteId = site.Id, TemplateId = template.Id }; + context.Instances.Add(instance); + await context.SaveChangesAsync(); + return instance.Id; + } + + private ScadaBridgeDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaBridgeDbContext(options); + } +}