diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs index a4a31cf2..20f11316 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs @@ -26,16 +26,16 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// Implemented: direct telemetry ingest, /// query, detail and KPI handlers (Task 4), the central→site Retry/Discard -/// relay (Task 5 — the relay handlers live in this actor), and the periodic +/// relay (Task 5 — the relay handlers live in this actor), the periodic /// per-site reconciliation puller that backfills lost telemetry (Piece A — -/// , the documented self-heal pull). The -/// reconciliation timer is started in and gates on the +/// , the documented self-heal pull), and +/// the daily terminal-row purge scheduler (Piece B — +/// , which invokes +/// on a timer). Both +/// background timers are started in and gate on the /// reconciliation collaborators ( + /// ) being available — the repo-only test ctor -/// injects neither, so the timer does not run there. Deferred (next commit): -/// the daily terminal-row purge scheduler (the repository exposes -/// PurgeTerminalAsync but nothing in this module invokes it on a timer -/// yet). +/// injects neither, so neither timer runs there. /// /// /// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — @@ -81,7 +81,8 @@ public class SiteCallAuditActor : ReceiveActor /// singletons registered by AddAuditLogCentralReconciliationClient); /// in the test path they are injected directly. They are null when /// the actor was built via the repo-only test ctor — in that case the - /// reconciliation tick is NOT started (see ). + /// reconciliation tick is NOT started (see ); + /// the purge tick gates on the same collaborators (see ). /// private readonly IPullSiteCallsClient? _pullClient; private readonly ISiteEnumerator? _siteEnumerator; @@ -100,6 +101,7 @@ public class SiteCallAuditActor : ReceiveActor private readonly Dictionary _reconciliationCursors = new(); private ICancelable? _reconciliationTimer; + private ICancelable? _purgeTimer; /// /// Task 5 (#22): the central→site command transport — the @@ -149,8 +151,9 @@ public class SiteCallAuditActor : ReceiveActor /// concrete repository PLUS the two reconciliation collaborators directly, /// so the per-site self-heal pull is unit-testable in-memory without a DI /// container or a live gRPC channel. Because the client + enumerator are - /// present, the reconciliation tick IS started (it gates on the - /// collaborators being available — see ). + /// present, the reconciliation tick IS started; the purge tick is also + /// started (both gate on the collaborators being available — see + /// / ). /// /// Concrete repository instance used for upserts and purges. /// Enumerates the sites to reconcile each tick. @@ -246,11 +249,12 @@ public class SiteCallAuditActor : ReceiveActor Receive(HandleRetrySiteCall); Receive(HandleDiscardSiteCall); - // Piece A (#22): self-tick for the periodic reconciliation pull. The - // handler stays alive across faults via its own per-site try/catch - // (mirroring the ingest path); the timer is only started when the - // reconciliation collaborators are available. + // Piece A/B (#22): self-ticks for the periodic reconciliation pull and + // the daily terminal-row purge. Handlers stay alive across faults via + // their own per-site / per-tick try/catch (mirroring the ingest path); + // the timers are only started when their collaborators are available. ReceiveAsync(_ => OnReconciliationTickAsync()); + ReceiveAsync(_ => OnPurgeTickAsync()); } /// @@ -258,12 +262,14 @@ public class SiteCallAuditActor : ReceiveActor { base.PreStart(); StartReconciliationTimer(); + StartPurgeTimer(); } /// protected override void PostStop() { _reconciliationTimer?.Cancel(); + _purgeTimer?.Cancel(); base.PostStop(); } @@ -290,6 +296,29 @@ public class SiteCallAuditActor : ReceiveActor sender: Self); } + /// + /// Starts the daily purge tick — gated on the same collaborator presence as + /// the reconciliation tick. The purge itself only needs the repository, but + /// gating both schedulers together keeps the repo-only test ctor (no + /// client/enumerator) free of BOTH background timers, so the MSSQL read/ + /// upsert tests see no scheduled side effects. + /// + private void StartPurgeTimer() + { + if (_pullClient is null || _siteEnumerator is null) + { + return; + } + + var interval = _options.ResolvedPurgeInterval; + _purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + initialDelay: interval, + interval: interval, + receiver: Self, + message: PurgeTick.Instance, + sender: Self); + } + /// protected override SupervisorStrategy SupervisorStrategy() { @@ -459,6 +488,51 @@ public class SiteCallAuditActor : ReceiveActor _reconciliationCursors[site.SiteId] = maxUpdated; } + // ── Piece B: daily terminal-row purge scheduler ── + + /// + /// One purge pass: drops terminal SiteCalls rows whose + /// is older than + /// UtcNow - RetentionDays via + /// . Non-terminal + /// rows are never purged (enforced in the repository). The threshold is + /// computed each tick so an operator who lowers RetentionDays sees it + /// applied on the next purge without an actor restart. Mirrors + /// AuditLogPurgeActor's daily cadence + continue-on-error posture: a + /// purge fault is logged and swallowed so the singleton stays alive. + /// + private async Task OnPurgeTickAsync() + { + var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays); + + var (scope, repository) = ResolveRepository(); + try + { + var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false); + if (rowsDeleted > 0) + { + _logger.LogInformation( + "SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.", + rowsDeleted, + threshold); + } + } + catch (Exception ex) + { + // Continue-on-error: a purge fault (transient SQL failure, + // contention) must NOT crash the central singleton. The next tick + // retries the same window. + _logger.LogError( + ex, + "SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.", + threshold); + } + finally + { + scope?.Dispose(); + } + } + // ── Task 4: read-side (query / detail / KPI) ── /// @@ -947,6 +1021,13 @@ public class SiteCallAuditActor : ReceiveActor public static readonly ReconciliationTick Instance = new(); private ReconciliationTick() { } } + + /// Self-tick triggering a terminal-row purge pass (Piece B). + internal sealed class PurgeTick + { + public static readonly PurgeTick Instance = new(); + private PurgeTick() { } + } } /// diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs index 134e606e..317b29f9 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs @@ -2,10 +2,12 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// Configuration options for the Site Call Audit (#22): stuck-call detection + -/// KPI windowing for the read-side, plus the cadence knobs for the periodic -/// per-site reconciliation pull (self-heal for lost telemetry). Mirrors the -/// KPI-relevant subset of NotificationOutboxOptions and the -/// scheduler-cadence shape of SiteAuditReconciliationOptions. +/// KPI windowing for the read-side, plus the cadence/retention knobs for the +/// two central-singleton schedulers — the periodic per-site reconciliation +/// pull (self-heal for lost telemetry) and the daily terminal-row purge. +/// Mirrors the KPI-relevant subset of NotificationOutboxOptions and the +/// scheduler-cadence shape of SiteAuditReconciliationOptions / +/// AuditLogPurgeOptions. /// public class SiteCallAuditOptions { @@ -92,4 +94,51 @@ public class SiteCallAuditOptions : ReconciliationInterval < MinReconciliationInterval ? MinReconciliationInterval : ReconciliationInterval; + + // ── Purge scheduler (#22): daily terminal-row purge ── + + /// + /// Period of the purge tick. Each tick drops terminal SiteCalls rows + /// older than the retention window via + /// . + /// Default 24 hours, matching AuditLogPurgeOptions. Clamped to at + /// least via . + /// + public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24); + + /// + /// Test-only override for the purge tick cadence — bypasses the + /// clamp so unit tests can drop the cadence + /// to milliseconds. Production config never sets this; leave null. + /// + public TimeSpan? PurgeIntervalOverride { get; set; } + + /// + /// Retention window for terminal rows. On each purge tick a row whose + /// TerminalAtUtc is older than UtcNow - RetentionDays is + /// deleted; non-terminal rows are never purged. Default 365 days, matching + /// the central audit-store retention policy. + /// + public int RetentionDays { get; set; } = 365; + + /// + /// Minimum interval the config-bound can resolve + /// to. Clamps a misconfigured 0 (or negative) value away from + /// for the same scheduler-spin reason as + /// ; the purge is daily so the floor + /// is a more generous 1 minute. + /// + private static readonly TimeSpan MinPurgeInterval = TimeSpan.FromMinutes(1); + + /// + /// Resolves the effective purge tick interval: the test override when set + /// (bypassing the clamp), otherwise clamped to at + /// least . + /// + public TimeSpan ResolvedPurgeInterval => + PurgeIntervalOverride is { } o + ? o + : PurgeInterval < MinPurgeInterval + ? MinPurgeInterval + : PurgeInterval; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs new file mode 100644 index 00000000..6352ddec --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditPurgeTests.cs @@ -0,0 +1,175 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; +using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; + +namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests; + +/// +/// Purge-scheduler tests for (#22, Piece B). +/// Exercises the daily terminal-row purge tick in-memory — a recording +/// captures the +/// threshold the actor +/// computes, with no live MSSQL fixture. The reconciliation collaborators are +/// inert stubs (the purge tick doesn't use them, but they must be present to +/// arm the scheduler — both timers gate on the collaborators together). +/// +public class SiteCallAuditPurgeTests : TestKit +{ + private static SiteCallAuditOptions FastPurgeOptions(int retentionDays = 365) => new() + { + // Keep the reconciliation tick slow so it doesn't fight the purge tick + // for the test window; drop the purge tick to 100 ms via its override. + ReconciliationIntervalOverride = TimeSpan.FromMinutes(5), + PurgeIntervalOverride = TimeSpan.FromMilliseconds(100), + RetentionDays = retentionDays, + }; + + /// Empty enumerator — the purge path never touches it, but it must be present to arm the scheduler. + private sealed class EmptyEnumerator : ISiteEnumerator + { + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + /// No-op pull client — present only to arm the scheduler. + private sealed class NoOpPullClient : IPullSiteCallsClient + { + public Task PullAsync( + string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) => + Task.FromResult(new PullSiteCallsResponse(Array.Empty(), MoreAvailable: false)); + } + + /// + /// Recording repository capturing every + /// threshold (and the configured deleted-row count it returns). + /// + private sealed class RecordingRepo : ISiteCallAuditRepository + { + public List PurgeThresholds { get; } = new(); + public int RowsDeletedPerCall { get; set; } + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) + { + PurgeThresholds.Add(olderThanUtc); + return Task.FromResult(RowsDeletedPerCall); + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + Task.FromResult(null); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + /// Repository whose purge always throws — to prove continue-on-error keeps the singleton alive. + private sealed class PurgeThrowingRepo : ISiteCallAuditRepository + { + public int PurgeCallCount; + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) + { + Interlocked.Increment(ref PurgeCallCount); + throw new InvalidOperationException("simulated purge failure"); + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask; + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => Task.FromResult(null); + public Task> QueryAsync(SiteCallQueryFilter f, SiteCallPaging p, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); + public Task ComputeKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); + public Task> ComputePerSiteKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); + } + + private IActorRef CreateActor(ISiteCallAuditRepository repo, SiteCallAuditOptions options) => + Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + repo, + new EmptyEnumerator(), + new NoOpPullClient(), + NullLogger.Instance, + options))); + + // --------------------------------------------------------------------- + // 1. PurgeTick_CallsPurgeTerminal_WithRetentionThreshold + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_CallsPurgeTerminalAsync_WithRetentionThreshold() + { + var repo = new RecordingRepo { RowsDeletedPerCall = 7 }; + // Non-default retention (30 days) so the assertion isn't accidentally + // satisfied by the 365-day default. + CreateActor(repo, FastPurgeOptions(retentionDays: 30)); + + AwaitAssert( + () => Assert.True(repo.PurgeThresholds.Count >= 1, + $"expected >= 1 PurgeTerminalAsync call, got {repo.PurgeThresholds.Count}"), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + // The threshold the actor passed must be ~UtcNow - 30 days. 1-minute + // slack covers scheduling jitter between the tick firing and the assert. + var threshold = repo.PurgeThresholds[0]; + var expected = DateTime.UtcNow - TimeSpan.FromDays(30); + Assert.True( + Math.Abs((threshold - expected).TotalMinutes) < 1.0, + $"purge threshold {threshold:o} should be within 1 minute of {expected:o}"); + } + + // --------------------------------------------------------------------- + // 2. PurgeTick_UsesDefaultRetention_365Days + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_DefaultRetention_Uses365DayThreshold() + { + var repo = new RecordingRepo(); + CreateActor(repo, FastPurgeOptions()); // default 365 days + + AwaitAssert( + () => Assert.True(repo.PurgeThresholds.Count >= 1), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + var threshold = repo.PurgeThresholds[0]; + var expected = DateTime.UtcNow - TimeSpan.FromDays(365); + Assert.True( + Math.Abs((threshold - expected).TotalMinutes) < 1.0, + $"purge threshold {threshold:o} should be within 1 minute of {expected:o}"); + } + + // --------------------------------------------------------------------- + // 3. PurgeTick_RepoThrows_ActorStaysAlive_RetriesNextTick (continue-on-error) + // --------------------------------------------------------------------- + + [Fact] + public void PurgeTick_PurgeThrows_ActorStaysAlive_RetriesNextTick() + { + var repo = new PurgeThrowingRepo(); + CreateActor(repo, FastPurgeOptions()); + + // The singleton must NOT die on a purge fault — a second tick must still + // arrive (continue-on-error). Two purge calls prove the actor survived + // the first throw and the timer kept ticking. + AwaitAssert( + () => Assert.True(repo.PurgeCallCount >= 2, + $"expected >= 2 purge attempts (actor survived the throw), got {repo.PurgeCallCount}"), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } +}