fix(sitecallaudit): async DI scope in tick paths + options clamp tests + cursor/retry docs (review)

This commit is contained in:
Joseph Doherty
2026-06-15 12:10:54 -04:00
parent e675b34500
commit f49ac51771
2 changed files with 194 additions and 27 deletions
@@ -420,31 +420,51 @@ public class SiteCallAuditActor : ReceiveActor
return;
}
var (scope, repository) = ResolveRepository();
try
// AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
// so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
// asynchronously at end of tick rather than blocking the Akka dispatcher
// thread on a synchronous Dispose() of pending connection cleanup — the tick
// holds the scope across many awaited UpsertAsync calls. Mirrors the sibling
// SiteAuditReconciliationActor.OnTickAsync. ResolveRepository() (sync Dispose)
// is retained for the synchronous message-handler paths. In the injected-
// repository test path there is no scope to open and the test repo is reused.
if (_injectedRepository is not null)
{
foreach (var site in sites)
{
try
{
await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false);
}
catch (Exception ex)
{
// Failure-isolation invariant: one site's fault (transport,
// repository write) must NOT sink the rest of the tick. The
// failing site's cursor is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
await ReconcileSitesAsync(sites, client, _injectedRepository).ConfigureAwait(false);
return;
}
finally
await using var scope = _serviceProvider!.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
await ReconcileSitesAsync(sites, client, repository).ConfigureAwait(false);
}
/// <summary>
/// Reconciles every site in the tick against a single resolved repository,
/// isolating per-site faults so one bad site never sinks the rest of the
/// pass (the failing site's cursor is left at its previous value so the next
/// tick retries the same window).
/// </summary>
private async Task ReconcileSitesAsync(
IReadOnlyList<SiteEntry> sites, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
{
foreach (var site in sites)
{
scope?.Dispose();
try
{
await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false);
}
catch (Exception ex)
{
// Failure-isolation invariant: one site's fault (transport,
// repository write) must NOT sink the rest of the tick. The
// failing site's cursor is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
}
@@ -456,6 +476,31 @@ public class SiteCallAuditActor : ReceiveActor
/// site id, so the actor upserts them verbatim (re-stamping
/// <c>IngestedAtUtc</c> at central persist time, as the telemetry path does).
/// </summary>
/// <remarks>
/// <para>
/// <b>Coarse per-site retry — a deliberate divergence from
/// <c>SiteAuditReconciliationActor</c>.</b> That sibling (AuditLog-004) tracks
/// a per-EventId attempt counter and permanently abandons a row after a
/// threshold so a single un-insertable row cannot block a site's cursor
/// forever. This actor deliberately does NOT: any throw inside the loop
/// propagates to <see cref="OnReconciliationTickAsync"/>'s per-site catch,
/// which leaves the site's cursor at its previous value, so the next tick
/// re-pulls the whole batch from <c>since</c>. A persistently-bad row therefore
/// holds the site's cursor and re-pulls the batch every tick. This is
/// acceptable here because <see cref="ISiteCallAuditRepository.UpsertAsync"/> is
/// monotonic and idempotent — re-pulling already-ingested rows is a cheap
/// no-op — and the <c>SiteCalls</c> table is an eventually-consistent mirror,
/// not the source of truth, so a slow site simply lags rather than corrupts.
/// </para>
/// <para>
/// <b>Inclusive cursor boundary.</b> The cursor is advanced to the maximum
/// <see cref="SiteCall.UpdatedAtUtc"/> seen, and the pull asks for rows at or
/// after it (<c>since</c> is <c>&gt;=</c>, not <c>&gt;</c>). The row whose
/// timestamp equals the cursor is therefore re-pulled on the next tick and
/// deduplicated by the idempotent monotonic upsert — the same inclusive-boundary
/// contract as <c>SiteAuditReconciliationActor</c>'s cursor.
/// </para>
/// </remarks>
private async Task ReconcileSiteAsync(
SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
{
@@ -505,7 +550,30 @@ public class SiteCallAuditActor : ReceiveActor
{
var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays);
var (scope, repository) = ResolveRepository();
// AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
// so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
// asynchronously rather than blocking the Akka dispatcher thread on a
// synchronous Dispose(). Mirrors SiteAuditReconciliationActor; the
// injected-repository test path reuses the test repo with no scope.
if (_injectedRepository is not null)
{
await PurgeWithRepositoryAsync(_injectedRepository, threshold).ConfigureAwait(false);
return;
}
await using var scope = _serviceProvider!.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
await PurgeWithRepositoryAsync(repository, threshold).ConfigureAwait(false);
}
/// <summary>
/// Runs one terminal-row purge against the resolved repository, logging and
/// swallowing any fault (continue-on-error) so a transient SQL failure or
/// contention never crashes the central singleton — the next tick retries
/// the same window.
/// </summary>
private async Task PurgeWithRepositoryAsync(ISiteCallAuditRepository repository, DateTime threshold)
{
try
{
var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false);
@@ -527,10 +595,6 @@ public class SiteCallAuditActor : ReceiveActor
"SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.",
threshold);
}
finally
{
scope?.Dispose();
}
}
// ── Task 4: read-side (query / detail / KPI) ──
@@ -11,5 +11,108 @@ public class SiteCallAuditOptionsTests
Assert.Equal(TimeSpan.FromMinutes(10), options.StuckAgeThreshold);
// KPI interval mirrors NotificationOutboxOptions.DeliveredKpiWindow.
Assert.Equal(TimeSpan.FromMinutes(1), options.KpiInterval);
// Reconciliation tick cadence mirrors SiteAuditReconciliationOptions (#23).
Assert.Equal(TimeSpan.FromMinutes(5), options.ReconciliationInterval);
// Purge tick cadence mirrors AuditLogPurgeOptions.
Assert.Equal(TimeSpan.FromHours(24), options.PurgeInterval);
// Retention window mirrors the central audit-store retention policy.
Assert.Equal(365, options.RetentionDays);
}
[Fact]
public void ResolvedReconciliationInterval_DefaultsToConfiguredValue()
{
var options = new SiteCallAuditOptions();
Assert.Equal(options.ReconciliationInterval, options.ResolvedReconciliationInterval);
}
[Theory]
[InlineData(0)]
[InlineData(-5)]
public void ResolvedReconciliationInterval_ClampsZeroOrNegativeToMinimum(int configuredSeconds)
{
// A misconfigured 0 / negative interval must never resolve to TimeSpan.Zero
// (which would make Akka's ScheduleTellRepeatedlyCancelable spin). The
// documented floor is >= 1 second.
var options = new SiteCallAuditOptions
{
ReconciliationInterval = TimeSpan.FromSeconds(configuredSeconds),
};
Assert.True(
options.ResolvedReconciliationInterval >= TimeSpan.FromSeconds(1),
$"expected the resolved interval to clamp to >= 1s, got {options.ResolvedReconciliationInterval}");
Assert.Equal(TimeSpan.FromSeconds(1), options.ResolvedReconciliationInterval);
}
[Fact]
public void ResolvedReconciliationInterval_OverrideBypassesClamp()
{
// The test-only override drops the cadence below the clamp floor so unit
// tests can run the tick at millisecond cadence.
var sub1Second = TimeSpan.FromMilliseconds(50);
var options = new SiteCallAuditOptions
{
ReconciliationInterval = TimeSpan.FromMinutes(5),
ReconciliationIntervalOverride = sub1Second,
};
Assert.Equal(sub1Second, options.ResolvedReconciliationInterval);
}
[Fact]
public void ResolvedPurgeInterval_DefaultsToConfiguredValue()
{
var options = new SiteCallAuditOptions();
Assert.Equal(options.PurgeInterval, options.ResolvedPurgeInterval);
}
[Theory]
[InlineData(0)]
[InlineData(-30)]
public void ResolvedPurgeInterval_ClampsZeroOrNegativeToMinimum(int configuredSeconds)
{
// A misconfigured 0 / negative purge interval clamps to the documented
// >= 1 minute floor (the purge is daily, so a more generous floor than
// the reconciliation tick).
var options = new SiteCallAuditOptions
{
PurgeInterval = TimeSpan.FromSeconds(configuredSeconds),
};
Assert.True(
options.ResolvedPurgeInterval >= TimeSpan.FromMinutes(1),
$"expected the resolved interval to clamp to >= 1min, got {options.ResolvedPurgeInterval}");
Assert.Equal(TimeSpan.FromMinutes(1), options.ResolvedPurgeInterval);
}
[Fact]
public void ResolvedPurgeInterval_BelowMinuteFloorClampsToMinimum()
{
// A positive-but-sub-minute config value still clamps to the 1-minute floor.
var options = new SiteCallAuditOptions
{
PurgeInterval = TimeSpan.FromSeconds(5),
};
Assert.Equal(TimeSpan.FromMinutes(1), options.ResolvedPurgeInterval);
}
[Fact]
public void ResolvedPurgeInterval_OverrideBypassesClamp()
{
// The test-only override drops the cadence below the clamp floor so unit
// tests can run the purge tick at millisecond cadence.
var subMinute = TimeSpan.FromMilliseconds(50);
var options = new SiteCallAuditOptions
{
PurgeInterval = TimeSpan.FromHours(24),
PurgeIntervalOverride = subMinute,
};
Assert.Equal(subMinute, options.ResolvedPurgeInterval);
}
}