From e675b3450092b642f119b6b4faa02f56f4b30344 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:03:49 -0400 Subject: [PATCH] feat(sitecallaudit): daily terminal-row purge scheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a daily purge tick to SiteCallAuditActor that drops terminal SiteCalls rows older than the retention window via ISiteCallAuditRepository.PurgeTerminalAsync. The threshold is computed each tick as UtcNow - RetentionDays so an operator who lowers RetentionDays sees it on the next purge without a restart. Mirrors AuditLogPurgeActor's daily cadence + continue-on-error posture: a purge fault is logged and swallowed so the central singleton stays alive and retries next tick. The purge timer is started in PreStart alongside the reconciliation timer and gates on the same collaborators (pull client + enumerator) being available — the repo-only test ctor injects neither, so neither background timer runs there. Options: PurgeInterval (default 24h, clamped >= 1 min so a zero config value can't spin the scheduler) + RetentionDays (default 365), plus a test-only override that bypasses the clamp for millisecond cadences. Tests (all in-memory, no live MSSQL): purge tick calls PurgeTerminalAsync with a UtcNow - RetentionDays threshold (non-default 30 days); default retention yields a 365-day threshold; a throwing repo does not kill the singleton (a second tick still arrives). --- .../SiteCallAuditActor.cs | 109 +++++++++-- .../SiteCallAuditOptions.cs | 57 +++++- .../SiteCallAuditPurgeTests.cs | 175 ++++++++++++++++++ 3 files changed, 323 insertions(+), 18 deletions(-) create mode 100644 tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs index a4a31cf2..20f11316 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs @@ -26,16 +26,16 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// Implemented: direct telemetry ingest, /// query, detail and KPI handlers (Task 4), the central→site Retry/Discard -/// relay (Task 5 — the relay handlers live in this actor), and the periodic +/// relay (Task 5 — the relay handlers live in this actor), the periodic /// per-site reconciliation puller that backfills lost telemetry (Piece A — -/// , the documented self-heal pull). The -/// reconciliation timer is started in and gates on the +/// , the documented self-heal pull), and +/// the daily terminal-row purge scheduler (Piece B — +/// , which invokes +/// on a timer). Both +/// background timers are started in and gate on the /// reconciliation collaborators ( + /// ) being available — the repo-only test ctor -/// injects neither, so the timer does not run there. Deferred (next commit): -/// the daily terminal-row purge scheduler (the repository exposes -/// PurgeTerminalAsync but nothing in this module invokes it on a timer -/// yet). +/// injects neither, so neither timer runs there. /// /// /// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — @@ -81,7 +81,8 @@ public class SiteCallAuditActor : ReceiveActor /// singletons registered by AddAuditLogCentralReconciliationClient); /// in the test path they are injected directly. They are null when /// the actor was built via the repo-only test ctor — in that case the - /// reconciliation tick is NOT started (see ). + /// reconciliation tick is NOT started (see ); + /// the purge tick gates on the same collaborators (see ). /// private readonly IPullSiteCallsClient? _pullClient; private readonly ISiteEnumerator? _siteEnumerator; @@ -100,6 +101,7 @@ public class SiteCallAuditActor : ReceiveActor private readonly Dictionary _reconciliationCursors = new(); private ICancelable? _reconciliationTimer; + private ICancelable? _purgeTimer; /// /// Task 5 (#22): the central→site command transport — the @@ -149,8 +151,9 @@ public class SiteCallAuditActor : ReceiveActor /// concrete repository PLUS the two reconciliation collaborators directly, /// so the per-site self-heal pull is unit-testable in-memory without a DI /// container or a live gRPC channel. Because the client + enumerator are - /// present, the reconciliation tick IS started (it gates on the - /// collaborators being available — see ). + /// present, the reconciliation tick IS started; the purge tick is also + /// started (both gate on the collaborators being available — see + /// / ). /// /// Concrete repository instance used for upserts and purges. /// Enumerates the sites to reconcile each tick. @@ -246,11 +249,12 @@ public class SiteCallAuditActor : ReceiveActor Receive(HandleRetrySiteCall); Receive(HandleDiscardSiteCall); - // Piece A (#22): self-tick for the periodic reconciliation pull. The - // handler stays alive across faults via its own per-site try/catch - // (mirroring the ingest path); the timer is only started when the - // reconciliation collaborators are available. + // Piece A/B (#22): self-ticks for the periodic reconciliation pull and + // the daily terminal-row purge. Handlers stay alive across faults via + // their own per-site / per-tick try/catch (mirroring the ingest path); + // the timers are only started when their collaborators are available. ReceiveAsync(_ => OnReconciliationTickAsync()); + ReceiveAsync(_ => OnPurgeTickAsync()); } /// @@ -258,12 +262,14 @@ public class SiteCallAuditActor : ReceiveActor { base.PreStart(); StartReconciliationTimer(); + StartPurgeTimer(); } /// protected override void PostStop() { _reconciliationTimer?.Cancel(); + _purgeTimer?.Cancel(); base.PostStop(); } @@ -290,6 +296,29 @@ public class SiteCallAuditActor : ReceiveActor sender: Self); } + /// + /// Starts the daily purge tick — gated on the same collaborator presence as + /// the reconciliation tick. The purge itself only needs the repository, but + /// gating both schedulers together keeps the repo-only test ctor (no + /// client/enumerator) free of BOTH background timers, so the MSSQL read/ + /// upsert tests see no scheduled side effects. + /// + private void StartPurgeTimer() + { + if (_pullClient is null || _siteEnumerator is null) + { + return; + } + + var interval = _options.ResolvedPurgeInterval; + _purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + initialDelay: interval, + interval: interval, + receiver: Self, + message: PurgeTick.Instance, + sender: Self); + } + /// protected override SupervisorStrategy SupervisorStrategy() { @@ -459,6 +488,51 @@ public class SiteCallAuditActor : ReceiveActor _reconciliationCursors[site.SiteId] = maxUpdated; } + // ── Piece B: daily terminal-row purge scheduler ── + + /// + /// One purge pass: drops terminal SiteCalls rows whose + /// is older than + /// UtcNow - RetentionDays via + /// . Non-terminal + /// rows are never purged (enforced in the repository). The threshold is + /// computed each tick so an operator who lowers RetentionDays sees it + /// applied on the next purge without an actor restart. Mirrors + /// AuditLogPurgeActor's daily cadence + continue-on-error posture: a + /// purge fault is logged and swallowed so the singleton stays alive. + /// + private async Task OnPurgeTickAsync() + { + var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays); + + var (scope, repository) = ResolveRepository(); + try + { + var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false); + if (rowsDeleted > 0) + { + _logger.LogInformation( + "SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.", + rowsDeleted, + threshold); + } + } + catch (Exception ex) + { + // Continue-on-error: a purge fault (transient SQL failure, + // contention) must NOT crash the central singleton. The next tick + // retries the same window. + _logger.LogError( + ex, + "SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.", + threshold); + } + finally + { + scope?.Dispose(); + } + } + // ── Task 4: read-side (query / detail / KPI) ── /// @@ -947,6 +1021,13 @@ public class SiteCallAuditActor : ReceiveActor public static readonly ReconciliationTick Instance = new(); private ReconciliationTick() { } } + + /// Self-tick triggering a terminal-row purge pass (Piece B). + internal sealed class PurgeTick + { + public static readonly PurgeTick Instance = new(); + private PurgeTick() { } + } } /// diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs index 134e606e..317b29f9 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs @@ -2,10 +2,12 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// Configuration options for the Site Call Audit (#22): stuck-call detection + -/// KPI windowing for the read-side, plus the cadence knobs for the periodic -/// per-site reconciliation pull (self-heal for lost telemetry). Mirrors the -/// KPI-relevant subset of NotificationOutboxOptions and the -/// scheduler-cadence shape of SiteAuditReconciliationOptions. +/// KPI windowing for the read-side, plus the cadence/retention knobs for the +/// two central-singleton schedulers — the periodic per-site reconciliation +/// pull (self-heal for lost telemetry) and the daily terminal-row purge. +/// Mirrors the KPI-relevant subset of NotificationOutboxOptions and the +/// scheduler-cadence shape of SiteAuditReconciliationOptions / +/// AuditLogPurgeOptions. /// public class SiteCallAuditOptions { @@ -92,4 +94,51 @@ public class SiteCallAuditOptions : ReconciliationInterval < MinReconciliationInterval ? MinReconciliationInterval : ReconciliationInterval; + + // ── Purge scheduler (#22): daily terminal-row purge ── + + /// + /// Period of the purge tick. Each tick drops terminal SiteCalls rows + /// older than the retention window via + /// . + /// Default 24 hours, matching AuditLogPurgeOptions. Clamped to at + /// least via . + /// + public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24); + + /// + /// Test-only override for the purge tick cadence — bypasses the + /// clamp so unit tests can drop the cadence + /// to milliseconds. Production config never sets this; leave null. + /// + public TimeSpan? PurgeIntervalOverride { get; set; } + + /// + /// Retention window for terminal rows. On each purge tick a row whose + /// TerminalAtUtc is older than UtcNow - RetentionDays is + /// deleted; non-terminal rows are never purged. Default 365 days, matching + /// the central audit-store retention policy. + /// + public int RetentionDays { get; set; } = 365; + + /// + /// Minimum interval the config-bound can resolve + /// to. Clamps a misconfigured 0 (or negative) value away from + /// for the same scheduler-spin reason as + /// ; the purge is daily so the floor + /// is a more generous 1 minute. + /// + private static readonly TimeSpan MinPurgeInterval = TimeSpan.FromMinutes(1); + + /// + /// Resolves the effective purge tick interval: the test override when set + /// (bypassing the clamp), otherwise clamped to at + /// least . + /// + public TimeSpan ResolvedPurgeInterval => + PurgeIntervalOverride is { } o + ? o + : PurgeInterval < MinPurgeInterval + ? MinPurgeInterval + : PurgeInterval; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs new file mode 100644 index 00000000..6352ddec --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs @@ -0,0 +1,175 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; +using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; + +namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests; + +/// +/// Purge-scheduler tests for (#22, Piece B). +/// Exercises the daily terminal-row purge tick in-memory — a recording +/// captures the +/// threshold the actor +/// computes, with no live MSSQL fixture. The reconciliation collaborators are +/// inert stubs (the purge tick doesn't use them, but they must be present to +/// arm the scheduler — both timers gate on the collaborators together). +/// +public class SiteCallAuditPurgeTests : TestKit +{ + private static SiteCallAuditOptions FastPurgeOptions(int retentionDays = 365) => new() + { + // Keep the reconciliation tick slow so it doesn't fight the purge tick + // for the test window; drop the purge tick to 100 ms via its override. + ReconciliationIntervalOverride = TimeSpan.FromMinutes(5), + PurgeIntervalOverride = TimeSpan.FromMilliseconds(100), + RetentionDays = retentionDays, + }; + + /// Empty enumerator — the purge path never touches it, but it must be present to arm the scheduler. + private sealed class EmptyEnumerator : ISiteEnumerator + { + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + /// No-op pull client — present only to arm the scheduler. + private sealed class NoOpPullClient : IPullSiteCallsClient + { + public Task PullAsync( + string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) => + Task.FromResult(new PullSiteCallsResponse(Array.Empty(), MoreAvailable: false)); + } + + /// + /// Recording repository capturing every + /// threshold (and the configured deleted-row count it returns). + /// + private sealed class RecordingRepo : ISiteCallAuditRepository + { + public List PurgeThresholds { get; } = new(); + public int RowsDeletedPerCall { get; set; } + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) + { + PurgeThresholds.Add(olderThanUtc); + return Task.FromResult(RowsDeletedPerCall); + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + Task.FromResult(null); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + /// Repository whose purge always throws — to prove continue-on-error keeps the singleton alive. + private sealed class PurgeThrowingRepo : ISiteCallAuditRepository + { + public int PurgeCallCount; + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) + { + Interlocked.Increment(ref PurgeCallCount); + throw new InvalidOperationException("simulated purge failure"); + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask; + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => Task.FromResult(null); + public Task> QueryAsync(SiteCallQueryFilter f, SiteCallPaging p, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); + public Task ComputeKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); + public Task> ComputePerSiteKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); + } + + private IActorRef CreateActor(ISiteCallAuditRepository repo, SiteCallAuditOptions options) => + Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + repo, + new EmptyEnumerator(), + new NoOpPullClient(), + NullLogger.Instance, + options))); + + // --------------------------------------------------------------------- + // 1. PurgeTick_CallsPurgeTerminal_WithRetentionThreshold + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_CallsPurgeTerminalAsync_WithRetentionThreshold() + { + var repo = new RecordingRepo { RowsDeletedPerCall = 7 }; + // Non-default retention (30 days) so the assertion isn't accidentally + // satisfied by the 365-day default. + CreateActor(repo, FastPurgeOptions(retentionDays: 30)); + + AwaitAssert( + () => Assert.True(repo.PurgeThresholds.Count >= 1, + $"expected >= 1 PurgeTerminalAsync call, got {repo.PurgeThresholds.Count}"), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + // The threshold the actor passed must be ~UtcNow - 30 days. 1-minute + // slack covers scheduling jitter between the tick firing and the assert. + var threshold = repo.PurgeThresholds[0]; + var expected = DateTime.UtcNow - TimeSpan.FromDays(30); + Assert.True( + Math.Abs((threshold - expected).TotalMinutes) < 1.0, + $"purge threshold {threshold:o} should be within 1 minute of {expected:o}"); + } + + // --------------------------------------------------------------------- + // 2. PurgeTick_UsesDefaultRetention_365Days + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_DefaultRetention_Uses365DayThreshold() + { + var repo = new RecordingRepo(); + CreateActor(repo, FastPurgeOptions()); // default 365 days + + AwaitAssert( + () => Assert.True(repo.PurgeThresholds.Count >= 1), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + var threshold = repo.PurgeThresholds[0]; + var expected = DateTime.UtcNow - TimeSpan.FromDays(365); + Assert.True( + Math.Abs((threshold - expected).TotalMinutes) < 1.0, + $"purge threshold {threshold:o} should be within 1 minute of {expected:o}"); + } + + // --------------------------------------------------------------------- + // 3. PurgeTick_RepoThrows_ActorStaysAlive_RetriesNextTick (continue-on-error) + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_PurgeThrows_ActorStaysAlive_RetriesNextTick() + { + var repo = new PurgeThrowingRepo(); + CreateActor(repo, FastPurgeOptions()); + + // The singleton must NOT die on a purge fault — a second tick must still + // arrive (continue-on-error). Two purge calls prove the actor survived + // the first throw and the timer kept ticking. + AwaitAssert( + () => Assert.True(repo.PurgeCallCount >= 2, + $"expected >= 2 purge attempts (actor survived the throw), got {repo.PurgeCallCount}"), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } +}