feat(sitecallaudit): daily terminal-row purge scheduler

Add a daily purge tick to SiteCallAuditActor that drops terminal SiteCalls
rows older than the retention window via ISiteCallAuditRepository.PurgeTerminalAsync.
The threshold is computed each tick as UtcNow - RetentionDays so an operator who
lowers RetentionDays sees it on the next purge without a restart. Mirrors
AuditLogPurgeActor's daily cadence + continue-on-error posture: a purge fault is
logged and swallowed so the central singleton stays alive and retries next tick.

The purge timer is started in PreStart alongside the reconciliation timer and
gates on the same collaborators (pull client + enumerator) being available — the
repo-only test ctor injects neither, so neither background timer runs there.

Options: PurgeInterval (default 24h, clamped >= 1 min so a zero config value
can't spin the scheduler) + RetentionDays (default 365), plus a test-only
override that bypasses the clamp for millisecond cadences.

Tests (all in-memory, no live MSSQL): purge tick calls PurgeTerminalAsync with a
UtcNow - RetentionDays threshold (non-default 30 days); default retention yields
a 365-day threshold; a throwing repo does not kill the singleton (a second tick
still arrives).
This commit is contained in:
Joseph Doherty
2026-06-15 12:03:49 -04:00
parent e427b38fb3
commit e675b34500
3 changed files with 323 additions and 18 deletions
@@ -26,16 +26,16 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
/// <para>
/// Implemented: direct <see cref="UpsertSiteCallCommand"/> telemetry ingest,
/// query, detail and KPI handlers (Task 4), the central→site Retry/Discard
/// relay (Task 5 — the relay handlers live in this actor), and the periodic
/// relay (Task 5 — the relay handlers live in this actor), the periodic
/// per-site reconciliation puller that backfills lost telemetry (Piece A —
/// <see cref="OnReconciliationTickAsync"/>, the documented self-heal pull). The
/// reconciliation timer is started in <see cref="PreStart"/> and gates on the
/// <see cref="OnReconciliationTickAsync"/>, the documented self-heal pull), and
/// the daily terminal-row purge scheduler (Piece B —
/// <see cref="OnPurgeTickAsync"/>, which invokes
/// <see cref="ISiteCallAuditRepository.PurgeTerminalAsync"/> on a timer). Both
/// background timers are started in <see cref="PreStart"/> and gate on the
/// reconciliation collaborators (<see cref="IPullSiteCallsClient"/> +
/// <see cref="ISiteEnumerator"/>) being available — the repo-only test ctor
/// injects neither, so the timer does not run there. Deferred (next commit):
/// the daily terminal-row purge scheduler (the repository exposes
/// <c>PurgeTerminalAsync</c> but nothing in this module invokes it on a timer
/// yet).
/// injects neither, so neither timer runs there.
/// </para>
/// <para>
/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" —
@@ -81,7 +81,8 @@ public class SiteCallAuditActor : ReceiveActor
/// singletons registered by <c>AddAuditLogCentralReconciliationClient</c>);
/// in the test path they are injected directly. They are <c>null</c> when
/// the actor was built via the repo-only test ctor — in that case the
/// reconciliation tick is NOT started (see <see cref="StartReconciliationTimer"/>).
/// reconciliation tick is NOT started (see <see cref="StartReconciliationTimer"/>);
/// the purge tick gates on the same collaborators (see <see cref="StartPurgeTimer"/>).
/// </summary>
private readonly IPullSiteCallsClient? _pullClient;
private readonly ISiteEnumerator? _siteEnumerator;
@@ -100,6 +101,7 @@ public class SiteCallAuditActor : ReceiveActor
private readonly Dictionary<string, DateTime> _reconciliationCursors = new();
private ICancelable? _reconciliationTimer;
private ICancelable? _purgeTimer;
/// <summary>
/// Task 5 (#22): the central→site command transport — the
@@ -149,8 +151,9 @@ public class SiteCallAuditActor : ReceiveActor
/// concrete repository PLUS the two reconciliation collaborators directly,
/// so the per-site self-heal pull is unit-testable in-memory without a DI
/// container or a live gRPC channel. Because the client + enumerator are
/// present, the reconciliation tick IS started (it gates on the
/// collaborators being available — see <see cref="StartReconciliationTimer"/>).
/// present, the reconciliation tick IS started; the purge tick is also
/// started (both gate on the collaborators being available — see
/// <see cref="StartReconciliationTimer"/> / <see cref="StartPurgeTimer"/>).
/// </summary>
/// <param name="repository">Concrete repository instance used for upserts and purges.</param>
/// <param name="siteEnumerator">Enumerates the sites to reconcile each tick.</param>
@@ -246,11 +249,12 @@ public class SiteCallAuditActor : ReceiveActor
Receive<RetrySiteCallRequest>(HandleRetrySiteCall);
Receive<DiscardSiteCallRequest>(HandleDiscardSiteCall);
// Piece A (#22): self-tick for the periodic reconciliation pull. The
// handler stays alive across faults via its own per-site try/catch
// (mirroring the ingest path); the timer is only started when the
// reconciliation collaborators are available.
// Piece A/B (#22): self-ticks for the periodic reconciliation pull and
// the daily terminal-row purge. Handlers stay alive across faults via
// their own per-site / per-tick try/catch (mirroring the ingest path);
// the timers are only started when their collaborators are available.
ReceiveAsync<ReconciliationTick>(_ => OnReconciliationTickAsync());
ReceiveAsync<PurgeTick>(_ => OnPurgeTickAsync());
}
/// <inheritdoc />
@@ -258,12 +262,14 @@ public class SiteCallAuditActor : ReceiveActor
{
base.PreStart();
StartReconciliationTimer();
StartPurgeTimer();
}
/// <inheritdoc />
protected override void PostStop()
{
_reconciliationTimer?.Cancel();
_purgeTimer?.Cancel();
base.PostStop();
}
@@ -290,6 +296,29 @@ public class SiteCallAuditActor : ReceiveActor
sender: Self);
}
/// <summary>
/// Starts the daily purge tick — gated on the same collaborator presence as
/// the reconciliation tick. The purge itself only needs the repository, but
/// gating both schedulers together keeps the repo-only test ctor (no
/// client/enumerator) free of BOTH background timers, so the MSSQL read/
/// upsert tests see no scheduled side effects.
/// </summary>
private void StartPurgeTimer()
{
if (_pullClient is null || _siteEnumerator is null)
{
return;
}
var interval = _options.ResolvedPurgeInterval;
_purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: interval,
interval: interval,
receiver: Self,
message: PurgeTick.Instance,
sender: Self);
}
/// <inheritdoc />
protected override SupervisorStrategy SupervisorStrategy()
{
@@ -459,6 +488,51 @@ public class SiteCallAuditActor : ReceiveActor
_reconciliationCursors[site.SiteId] = maxUpdated;
}
// ── Piece B: daily terminal-row purge scheduler ──
/// <summary>
/// One purge pass: drops terminal <c>SiteCalls</c> rows whose
/// <see cref="SiteCall.TerminalAtUtc"/> is older than
/// <c>UtcNow - RetentionDays</c> via
/// <see cref="ISiteCallAuditRepository.PurgeTerminalAsync"/>. Non-terminal
/// rows are never purged (enforced in the repository). The threshold is
/// computed each tick so an operator who lowers <c>RetentionDays</c> sees it
/// applied on the next purge without an actor restart. Mirrors
/// <c>AuditLogPurgeActor</c>'s daily cadence + continue-on-error posture: a
/// purge fault is logged and swallowed so the singleton stays alive.
/// </summary>
private async Task OnPurgeTickAsync()
{
var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays);
var (scope, repository) = ResolveRepository();
try
{
var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false);
if (rowsDeleted > 0)
{
_logger.LogInformation(
"SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.",
rowsDeleted,
threshold);
}
}
catch (Exception ex)
{
// Continue-on-error: a purge fault (transient SQL failure,
// contention) must NOT crash the central singleton. The next tick
// retries the same window.
_logger.LogError(
ex,
"SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.",
threshold);
}
finally
{
scope?.Dispose();
}
}
// ── Task 4: read-side (query / detail / KPI) ──
/// <summary>
@@ -947,6 +1021,13 @@ public class SiteCallAuditActor : ReceiveActor
public static readonly ReconciliationTick Instance = new();
private ReconciliationTick() { }
}
/// <summary>Self-tick triggering a terminal-row purge pass (Piece B).</summary>
internal sealed class PurgeTick
{
public static readonly PurgeTick Instance = new();
private PurgeTick() { }
}
}
/// <summary>
@@ -2,10 +2,12 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
/// <summary>
/// Configuration options for the Site Call Audit (#22): stuck-call detection +
/// KPI windowing for the read-side, plus the cadence knobs for the periodic
/// per-site reconciliation pull (self-heal for lost telemetry). Mirrors the
/// KPI-relevant subset of <c>NotificationOutboxOptions</c> and the
/// scheduler-cadence shape of <c>SiteAuditReconciliationOptions</c>.
/// KPI windowing for the read-side, plus the cadence/retention knobs for the
/// two central-singleton schedulers — the periodic per-site reconciliation
/// pull (self-heal for lost telemetry) and the daily terminal-row purge.
/// Mirrors the KPI-relevant subset of <c>NotificationOutboxOptions</c> and the
/// scheduler-cadence shape of <c>SiteAuditReconciliationOptions</c> /
/// <c>AuditLogPurgeOptions</c>.
/// </summary>
public class SiteCallAuditOptions
{
@@ -92,4 +94,51 @@ public class SiteCallAuditOptions
: ReconciliationInterval < MinReconciliationInterval
? MinReconciliationInterval
: ReconciliationInterval;
// ── Purge scheduler (#22): daily terminal-row purge ──
/// <summary>
/// Period of the purge tick. Each tick drops terminal <c>SiteCalls</c> rows
/// older than the retention window via
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories.ISiteCallAuditRepository.PurgeTerminalAsync"/>.
/// Default 24 hours, matching <c>AuditLogPurgeOptions</c>. Clamped to at
/// least <see cref="MinPurgeInterval"/> via <see cref="ResolvedPurgeInterval"/>.
/// </summary>
public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24);
/// <summary>
/// Test-only override for the purge tick cadence — bypasses the
/// <see cref="MinPurgeInterval"/> clamp so unit tests can drop the cadence
/// to milliseconds. Production config never sets this; leave null.
/// </summary>
public TimeSpan? PurgeIntervalOverride { get; set; }
/// <summary>
/// Retention window for terminal rows. On each purge tick a row whose
/// <c>TerminalAtUtc</c> is older than <c>UtcNow - RetentionDays</c> is
/// deleted; non-terminal rows are never purged. Default 365 days, matching
/// the central audit-store retention policy.
/// </summary>
public int RetentionDays { get; set; } = 365;
/// <summary>
/// Minimum interval the config-bound <see cref="PurgeInterval"/> can resolve
/// to. Clamps a misconfigured <c>0</c> (or negative) value away from
/// <see cref="TimeSpan.Zero"/> for the same scheduler-spin reason as
/// <see cref="MinReconciliationInterval"/>; the purge is daily so the floor
/// is a more generous 1 minute.
/// </summary>
private static readonly TimeSpan MinPurgeInterval = TimeSpan.FromMinutes(1);
/// <summary>
/// Resolves the effective purge tick interval: the test override when set
/// (bypassing the clamp), otherwise <see cref="PurgeInterval"/> clamped to at
/// least <see cref="MinPurgeInterval"/>.
/// </summary>
public TimeSpan ResolvedPurgeInterval =>
PurgeIntervalOverride is { } o
? o
: PurgeInterval < MinPurgeInterval
? MinPurgeInterval
: PurgeInterval;
}
@@ -0,0 +1,175 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests;
/// <summary>
/// Purge-scheduler tests for <see cref="SiteCallAuditActor"/> (#22, Piece B).
/// Exercises the daily terminal-row purge tick in-memory — a recording
/// <see cref="ISiteCallAuditRepository"/> captures the
/// <see cref="ISiteCallAuditRepository.PurgeTerminalAsync"/> threshold the actor
/// computes, with no live MSSQL fixture. The reconciliation collaborators are
/// inert stubs (the purge tick doesn't use them, but they must be present to
/// arm the scheduler — both timers gate on the collaborators together).
/// </summary>
public class SiteCallAuditPurgeTests : TestKit
{
private static SiteCallAuditOptions FastPurgeOptions(int retentionDays = 365) => new()
{
// Keep the reconciliation tick slow so it doesn't fight the purge tick
// for the test window; drop the purge tick to 100 ms via its override.
ReconciliationIntervalOverride = TimeSpan.FromMinutes(5),
PurgeIntervalOverride = TimeSpan.FromMilliseconds(100),
RetentionDays = retentionDays,
};
/// <summary>Empty enumerator — the purge path never touches it, but it must be present to arm the scheduler.</summary>
private sealed class EmptyEnumerator : ISiteEnumerator
{
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<SiteEntry>>(Array.Empty<SiteEntry>());
}
/// <summary>No-op pull client — present only to arm the scheduler.</summary>
private sealed class NoOpPullClient : IPullSiteCallsClient
{
public Task<PullSiteCallsResponse> PullAsync(
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) =>
Task.FromResult(new PullSiteCallsResponse(Array.Empty<SiteCall>(), MoreAvailable: false));
}
/// <summary>
/// Recording repository capturing every <see cref="PurgeTerminalAsync"/>
/// threshold (and the configured deleted-row count it returns).
/// </summary>
private sealed class RecordingRepo : ISiteCallAuditRepository
{
public List<DateTime> PurgeThresholds { get; } = new();
public int RowsDeletedPerCall { get; set; }
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default)
{
PurgeThresholds.Add(olderThanUtc);
return Task.FromResult(RowsDeletedPerCall);
}
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask;
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
Task.FromResult<SiteCall?>(null);
public Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<SiteCall>>(Array.Empty<SiteCall>());
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0));
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<SiteCallSiteKpiSnapshot>>(Array.Empty<SiteCallSiteKpiSnapshot>());
}
/// <summary>Repository whose purge always throws — to prove continue-on-error keeps the singleton alive.</summary>
private sealed class PurgeThrowingRepo : ISiteCallAuditRepository
{
public int PurgeCallCount;
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default)
{
Interlocked.Increment(ref PurgeCallCount);
throw new InvalidOperationException("simulated purge failure");
}
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => Task.CompletedTask;
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) => Task.FromResult<SiteCall?>(null);
public Task<IReadOnlyList<SiteCall>> QueryAsync(SiteCallQueryFilter f, SiteCallPaging p, CancellationToken ct = default) => Task.FromResult<IReadOnlyList<SiteCall>>(Array.Empty<SiteCall>());
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0));
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(DateTime a, DateTime b, CancellationToken ct = default) => Task.FromResult<IReadOnlyList<SiteCallSiteKpiSnapshot>>(Array.Empty<SiteCallSiteKpiSnapshot>());
}
private IActorRef CreateActor(ISiteCallAuditRepository repo, SiteCallAuditOptions options) =>
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
repo,
new EmptyEnumerator(),
new NoOpPullClient(),
NullLogger<SiteCallAuditActor>.Instance,
options)));
// ---------------------------------------------------------------------
// 1. PurgeTick_CallsPurgeTerminal_WithRetentionThreshold
// ---------------------------------------------------------------------
[Fact]
public void PurgeTick_CallsPurgeTerminalAsync_WithRetentionThreshold()
{
var repo = new RecordingRepo { RowsDeletedPerCall = 7 };
// Non-default retention (30 days) so the assertion isn't accidentally
// satisfied by the 365-day default.
CreateActor(repo, FastPurgeOptions(retentionDays: 30));
AwaitAssert(
() => Assert.True(repo.PurgeThresholds.Count >= 1,
$"expected >= 1 PurgeTerminalAsync call, got {repo.PurgeThresholds.Count}"),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
// The threshold the actor passed must be ~UtcNow - 30 days. 1-minute
// slack covers scheduling jitter between the tick firing and the assert.
var threshold = repo.PurgeThresholds[0];
var expected = DateTime.UtcNow - TimeSpan.FromDays(30);
Assert.True(
Math.Abs((threshold - expected).TotalMinutes) < 1.0,
$"purge threshold {threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 2. PurgeTick_UsesDefaultRetention_365Days
// ---------------------------------------------------------------------
[Fact]
public void PurgeTick_DefaultRetention_Uses365DayThreshold()
{
var repo = new RecordingRepo();
CreateActor(repo, FastPurgeOptions()); // default 365 days
AwaitAssert(
() => Assert.True(repo.PurgeThresholds.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
var threshold = repo.PurgeThresholds[0];
var expected = DateTime.UtcNow - TimeSpan.FromDays(365);
Assert.True(
Math.Abs((threshold - expected).TotalMinutes) < 1.0,
$"purge threshold {threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 3. PurgeTick_RepoThrows_ActorStaysAlive_RetriesNextTick (continue-on-error)
// ---------------------------------------------------------------------
[Fact]
public void PurgeTick_PurgeThrows_ActorStaysAlive_RetriesNextTick()
{
var repo = new PurgeThrowingRepo();
CreateActor(repo, FastPurgeOptions());
// The singleton must NOT die on a purge fault — a second tick must still
// arrive (continue-on-error). Two purge calls prove the actor survived
// the first throw and the timer kept ticking.
AwaitAssert(
() => Assert.True(repo.PurgeCallCount >= 2,
$"expected >= 2 purge attempts (actor survived the throw), got {repo.PurgeCallCount}"),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
}