feat(sitecallaudit): periodic reconciliation pull back-fills lost telemetry

Add a periodic reconciliation tick to SiteCallAuditActor that, per site,
pulls changed SiteCall rows since a per-site UpdatedAtUtc cursor and upserts
them idempotently (monotonic UpsertAsync) — the documented self-heal for lost
best-effort gRPC telemetry. Mirrors SiteAuditReconciliationActor's structure
(per-site cursor, per-site try/catch failure isolation, advance cursor by max
observed UpdatedAtUtc) minus the stalled-detection EventStream machinery.

Dependency wiring: add an acyclic SiteCallAudit -> AuditLog project reference
and resolve IPullSiteCallsClient + ISiteEnumerator (central-only singletons
registered by AddAuditLogCentralReconciliationClient) from the IServiceProvider
the production ctor already holds — no Host Props.Create change needed. The
repo-only test ctor injects neither collaborator, so the tick is gated off
there. A new public test ctor injects fake client + enumerator + repo so the
tick is unit-testable in-memory (public, not internal: Akka's ActivatorProducer
uses public-only reflection binding).

Options: ReconciliationInterval (default 5 min, clamped >= 1s so a zero config
value can't spin the scheduler) + ReconciliationBatchSize (default 500), plus a
test-only override that bypasses the clamp for millisecond cadences.

Tests (all in-memory, no live MSSQL): absent row is upserted on a tick; second
tick advances the cursor past already-pulled rows; one failing site does not
sink other sites; repo-only ctor does not start the tick.
This commit is contained in:
Joseph Doherty
2026-06-15 12:01:22 -04:00
parent 6b0140dd62
commit e427b38fb3
4 changed files with 623 additions and 12 deletions
@@ -1,6 +1,7 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
@@ -24,13 +25,17 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
/// <remarks>
/// <para>
/// Implemented: direct <see cref="UpsertSiteCallCommand"/> telemetry ingest,
/// query, detail and KPI handlers (Task 4), and the central→site Retry/Discard
/// relay (Task 5 — the relay handlers live in this actor). Deferred (per
/// CLAUDE.md scope discipline — both land in a later follow-up): the periodic
/// per-site reconciliation puller that backfills lost telemetry, and the daily
/// terminal-row purge scheduler (the repository exposes
/// <c>PurgeTerminalAsync</c> but nothing in this module currently invokes it
/// on a schedule).
/// query, detail and KPI handlers (Task 4), the central→site Retry/Discard
/// relay (Task 5 — the relay handlers live in this actor), and the periodic
/// per-site reconciliation puller that backfills lost telemetry (Piece A —
/// <see cref="OnReconciliationTickAsync"/>, the documented self-heal pull). The
/// reconciliation timer is started in <see cref="PreStart"/> and gates on the
/// reconciliation collaborators (<see cref="IPullSiteCallsClient"/> +
/// <see cref="ISiteEnumerator"/>) being available — the repo-only test ctor
/// injects neither, so the timer does not run there. Deferred (next commit):
/// the daily terminal-row purge scheduler (the repository exposes
/// <c>PurgeTerminalAsync</c> but nothing in this module invokes it on a timer
/// yet).
/// </para>
/// <para>
/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" —
@@ -68,6 +73,34 @@ public class SiteCallAuditActor : ReceiveActor
private readonly SiteCallAuditOptions _options;
private readonly ILogger<SiteCallAuditActor> _logger;
/// <summary>
/// Reconciliation collaborators (Piece A). The per-site self-heal pull
/// (<see cref="IPullSiteCallsClient"/>) and the site list
/// (<see cref="ISiteEnumerator"/>). On the production path these are
/// resolved once from the root <see cref="IServiceProvider"/> (central
/// singletons registered by <c>AddAuditLogCentralReconciliationClient</c>);
/// in the test path they are injected directly. They are <c>null</c> when
/// the actor was built via the repo-only test ctor — in that case the
/// reconciliation tick is NOT started (see <see cref="StartReconciliationTimer"/>).
/// </summary>
private readonly IPullSiteCallsClient? _pullClient;
private readonly ISiteEnumerator? _siteEnumerator;
/// <summary>
/// Per-site reconciliation watermark — the highest
/// <see cref="SiteCall.UpdatedAtUtc"/> seen for that site on a previous
/// tick. The next tick asks for rows at or after this cursor; idempotent
/// monotonic <see cref="ISiteCallAuditRepository.UpsertAsync"/> swallows any
/// duplicate-with-same-timestamp rows. In-memory for the singleton's
/// lifetime — a failover / restart resets every cursor to
/// <see cref="DateTime.MinValue"/>, which is conservative but correct
/// (the next tick re-pulls and idempotent upsert dedupes). Mirrors
/// <c>SiteAuditReconciliationActor</c>.
/// </summary>
private readonly Dictionary<string, DateTime> _reconciliationCursors = new();
private ICancelable? _reconciliationTimer;
/// <summary>
/// Task 5 (#22): the central→site command transport — the
/// <c>CentralCommunicationActor</c>, which owns the per-site
@@ -87,6 +120,11 @@ public class SiteCallAuditActor : ReceiveActor
/// across every message. Used by Bundle C's MSSQL-backed TestKit fixture.
/// An optional <paramref name="options"/> lets a test pin the stuck/KPI
/// windows; when omitted the production defaults apply.
/// <para>
/// This ctor injects NO reconciliation client/enumerator, so the
/// reconciliation tick is gated off (see <see cref="StartReconciliationTimer"/>)
/// — the MSSQL-backed read/upsert tests must not fire phantom pulls.
/// </para>
/// </summary>
/// <param name="repository">Concrete repository instance to use for all messages.</param>
/// <param name="logger">Logger for diagnostics and error reporting.</param>
@@ -106,6 +144,48 @@ public class SiteCallAuditActor : ReceiveActor
RegisterHandlers();
}
/// <summary>
/// Test-mode constructor for the reconciliation tick (Piece A) — injects a
/// concrete repository PLUS the two reconciliation collaborators directly,
/// so the per-site self-heal pull is unit-testable in-memory without a DI
/// container or a live gRPC channel. Because the client + enumerator are
/// present, the reconciliation tick IS started (it gates on the
/// collaborators being available — see <see cref="StartReconciliationTimer"/>).
/// </summary>
/// <param name="repository">Concrete repository instance used for upserts and purges.</param>
/// <param name="siteEnumerator">Enumerates the sites to reconcile each tick.</param>
/// <param name="pullClient">Pull client used to fetch changed rows from each site.</param>
/// <param name="logger">Logger for diagnostics and error reporting.</param>
/// <param name="options">Optional configuration overrides; production defaults apply when null.</param>
/// <remarks>
/// Public (not internal) because Akka's default <c>ActivatorProducer</c>
/// instantiates the actor via reflection with public-only binding flags —
/// an internal ctor yields a <c>MissingMethodException</c> at actor
/// creation. Distinguished from the production <see cref="IServiceProvider"/>
/// ctor by its concrete-collaborator parameter list; only the test project
/// (or a host that hand-resolves the collaborators) constructs it this way.
/// </remarks>
public SiteCallAuditActor(
ISiteCallAuditRepository repository,
ISiteEnumerator siteEnumerator,
IPullSiteCallsClient pullClient,
ILogger<SiteCallAuditActor> logger,
SiteCallAuditOptions? options = null)
{
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(siteEnumerator);
ArgumentNullException.ThrowIfNull(pullClient);
ArgumentNullException.ThrowIfNull(logger);
_injectedRepository = repository;
_siteEnumerator = siteEnumerator;
_pullClient = pullClient;
_logger = logger;
_options = options ?? new SiteCallAuditOptions();
RegisterHandlers();
}
/// <summary>
/// Production constructor — resolves <see cref="ISiteCallAuditRepository"/>
/// from a fresh DI scope per message because the repository is a scoped EF
@@ -129,6 +209,17 @@ public class SiteCallAuditActor : ReceiveActor
_options = options;
_logger = logger;
// Reconciliation collaborators (Piece A) are central-only singletons
// registered by AddAuditLogCentralReconciliationClient — always on the
// central composition root (Program.cs). Resolve them once here (the
// actor itself is a long-lived singleton; the repository is the only
// scoped service and is still resolved per-tick/per-message). GetService
// (not GetRequiredService) so a host that somehow omits the helper
// degrades to "no reconciliation tick" rather than a startup crash —
// the tick startup gates on both being non-null.
_pullClient = serviceProvider.GetService<IPullSiteCallsClient>();
_siteEnumerator = serviceProvider.GetService<ISiteEnumerator>();
RegisterHandlers();
}
@@ -154,6 +245,49 @@ public class SiteCallAuditActor : ReceiveActor
});
Receive<RetrySiteCallRequest>(HandleRetrySiteCall);
Receive<DiscardSiteCallRequest>(HandleDiscardSiteCall);
// Piece A (#22): self-tick for the periodic reconciliation pull. The
// handler stays alive across faults via its own per-site try/catch
// (mirroring the ingest path); the timer is only started when the
// reconciliation collaborators are available.
ReceiveAsync<ReconciliationTick>(_ => OnReconciliationTickAsync());
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
StartReconciliationTimer();
}
/// <inheritdoc />
protected override void PostStop()
{
_reconciliationTimer?.Cancel();
base.PostStop();
}
/// <summary>
/// Starts the periodic reconciliation tick — but ONLY when both the pull
/// client and the site enumerator are available. The repo-only test ctor
/// injects neither, so the tick is gated off there (the MSSQL read/upsert
/// tests must not fire phantom pulls); the reconciliation test ctor and the
/// production ctor (which resolves both from the SP) start it.
/// </summary>
private void StartReconciliationTimer()
{
if (_pullClient is null || _siteEnumerator is null)
{
return;
}
var interval = _options.ResolvedReconciliationInterval;
_reconciliationTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: interval,
interval: interval,
receiver: Self,
message: ReconciliationTick.Instance,
sender: Self);
}
/// <inheritdoc />
@@ -212,6 +346,119 @@ public class SiteCallAuditActor : ReceiveActor
}
}
// ── Piece A: periodic per-site reconciliation pull (self-heal) ──
/// <summary>
/// One reconciliation pass: enumerate every known site and, per site, pull
/// changed <see cref="SiteCall"/> rows since that site's cursor and upsert
/// them idempotently — the documented self-heal when best-effort gRPC push
/// telemetry is lost. This is a mirror, NOT a dispatcher: cached-call
/// delivery stays site-local; upserting reconciled rows only refreshes the
/// eventually-consistent central <c>SiteCalls</c> mirror.
/// </summary>
/// <remarks>
/// Mirrors <c>SiteAuditReconciliationActor</c>'s structure (per-site cursor,
/// per-site try/catch failure isolation, advance the cursor by the max
/// observed <see cref="SiteCall.UpdatedAtUtc"/>) but is deliberately simpler:
/// no stalled-detection EventStream machinery — just cursor + pull + upsert
/// + advance. One DI scope per tick is opened and the same repository reused
/// across every site in that tick.
/// </remarks>
private async Task OnReconciliationTickAsync()
{
// The collaborators are guaranteed non-null: the tick is only scheduled
// when both are present (StartReconciliationTimer). Assert via the
// local copies so a future refactor that drops the gate fails loudly.
var enumerator = _siteEnumerator!;
var client = _pullClient!;
IReadOnlyList<SiteEntry> sites;
try
{
// No ambient CancellationToken in a ReceiveActor handler — None is
// intentional; the work is bounded by the reconciliation interval
// plus the singleton's graceful-stop drain on PhaseClusterLeave.
sites = await enumerator.EnumerateAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "SiteCallAudit site enumeration failed; skipping reconciliation tick.");
return;
}
if (sites.Count == 0)
{
return;
}
var (scope, repository) = ResolveRepository();
try
{
foreach (var site in sites)
{
try
{
await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false);
}
catch (Exception ex)
{
// Failure-isolation invariant: one site's fault (transport,
// repository write) must NOT sink the rest of the tick. The
// failing site's cursor is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Issues one <c>PullSiteCalls</c> RPC against the site, upserts the
/// returned rows idempotently, and advances the site's cursor to the maximum
/// <see cref="SiteCall.UpdatedAtUtc"/> observed. The pull client returns rows
/// oldest-first with <c>SourceSite</c> already re-stamped from the dialed
/// site id, so the actor upserts them verbatim (re-stamping
/// <c>IngestedAtUtc</c> at central persist time, as the telemetry path does).
/// </summary>
private async Task ReconcileSiteAsync(
SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
{
var since = _reconciliationCursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
var response = await client
.PullAsync(site.SiteId, since, _options.ReconciliationBatchSize, CancellationToken.None)
.ConfigureAwait(false);
var maxUpdated = since;
var nowUtc = DateTime.UtcNow;
foreach (var row in response.SiteCalls)
{
// IngestedAtUtc is the "central ingested (or last refreshed) this
// row" stamp — owned by the central actor, exactly as OnUpsertAsync
// does for the telemetry path. Monotonic UpsertAsync makes a row
// already present (from a prior push) a silent no-op.
var siteCall = row with { IngestedAtUtc = nowUtc };
await repository.UpsertAsync(siteCall).ConfigureAwait(false);
if (row.UpdatedAtUtc > maxUpdated)
{
maxUpdated = row.UpdatedAtUtc;
}
}
// Advance the cursor to the newest row seen. A MoreAvailable response
// means the site saturated the batch; the next tick continues draining
// from the advanced cursor (no immediate re-pull loop — the natural
// tick cadence drains the backlog, matching SiteAuditReconciliationActor).
_reconciliationCursors[site.SiteId] = maxUpdated;
}
// ── Task 4: read-side (query / detail / KPI) ──
/// <summary>
@@ -693,6 +940,13 @@ public class SiteCallAuditActor : ReceiveActor
{
return string.IsNullOrWhiteSpace(value) ? null : value;
}
/// <summary>Self-tick triggering a reconciliation pass across all sites (Piece A).</summary>
internal sealed class ReconciliationTick
{
public static readonly ReconciliationTick Instance = new();
private ReconciliationTick() { }
}
}
/// <summary>