diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs index 20f11316..b89ae01a 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs @@ -420,31 +420,51 @@ public class SiteCallAuditActor : ReceiveActor return; } - var (scope, repository) = ResolveRepository(); - try + // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using + // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes + // asynchronously at end of tick rather than blocking the Akka dispatcher + // thread on a synchronous Dispose() of pending connection cleanup — the tick + // holds the scope across many awaited UpsertAsync calls. Mirrors the sibling + // SiteAuditReconciliationActor.OnTickAsync. ResolveRepository() (sync Dispose) + // is retained for the synchronous message-handler paths. In the injected- + // repository test path there is no scope to open and the test repo is reused. + if (_injectedRepository is not null) { - foreach (var site in sites) - { - try - { - await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false); - } - catch (Exception ex) - { - // Failure-isolation invariant: one site's fault (transport, - // repository write) must NOT sink the rest of the tick. The - // failing site's cursor is left at its previous value so the - // next tick retries the same window. - _logger.LogWarning( - ex, - "SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.", - site.SiteId); - } - } + await ReconcileSitesAsync(sites, client, _injectedRepository).ConfigureAwait(false); + return; } - finally + + await using var scope = _serviceProvider!.CreateAsyncScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + await ReconcileSitesAsync(sites, client, repository).ConfigureAwait(false); + } + + /// + /// Reconciles every site in the tick against a single resolved repository, + /// isolating per-site faults so one bad site never sinks the rest of the + /// pass (the failing site's cursor is left at its previous value so the next + /// tick retries the same window). + /// + private async Task ReconcileSitesAsync( + IReadOnlyList sites, IPullSiteCallsClient client, ISiteCallAuditRepository repository) + { + foreach (var site in sites) { - scope?.Dispose(); + try + { + await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false); + } + catch (Exception ex) + { + // Failure-isolation invariant: one site's fault (transport, + // repository write) must NOT sink the rest of the tick. The + // failing site's cursor is left at its previous value so the + // next tick retries the same window. + _logger.LogWarning( + ex, + "SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.", + site.SiteId); + } } } @@ -456,6 +476,31 @@ public class SiteCallAuditActor : ReceiveActor /// site id, so the actor upserts them verbatim (re-stamping /// IngestedAtUtc at central persist time, as the telemetry path does). /// + /// + /// + /// Coarse per-site retry — a deliberate divergence from + /// SiteAuditReconciliationActor. That sibling (AuditLog-004) tracks + /// a per-EventId attempt counter and permanently abandons a row after a + /// threshold so a single un-insertable row cannot block a site's cursor + /// forever. This actor deliberately does NOT: any throw inside the loop + /// propagates to 's per-site catch, + /// which leaves the site's cursor at its previous value, so the next tick + /// re-pulls the whole batch from since. A persistently-bad row therefore + /// holds the site's cursor and re-pulls the batch every tick. This is + /// acceptable here because is + /// monotonic and idempotent — re-pulling already-ingested rows is a cheap + /// no-op — and the SiteCalls table is an eventually-consistent mirror, + /// not the source of truth, so a slow site simply lags rather than corrupts. + /// + /// + /// Inclusive cursor boundary. The cursor is advanced to the maximum + /// seen, and the pull asks for rows at or + /// after it (since is >=, not >). The row whose + /// timestamp equals the cursor is therefore re-pulled on the next tick and + /// deduplicated by the idempotent monotonic upsert — the same inclusive-boundary + /// contract as SiteAuditReconciliationActor's cursor. + /// + /// private async Task ReconcileSiteAsync( SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository) { @@ -505,7 +550,30 @@ public class SiteCallAuditActor : ReceiveActor { var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays); - var (scope, repository) = ResolveRepository(); + // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using + // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes + // asynchronously rather than blocking the Akka dispatcher thread on a + // synchronous Dispose(). Mirrors SiteAuditReconciliationActor; the + // injected-repository test path reuses the test repo with no scope. + if (_injectedRepository is not null) + { + await PurgeWithRepositoryAsync(_injectedRepository, threshold).ConfigureAwait(false); + return; + } + + await using var scope = _serviceProvider!.CreateAsyncScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + await PurgeWithRepositoryAsync(repository, threshold).ConfigureAwait(false); + } + + /// + /// Runs one terminal-row purge against the resolved repository, logging and + /// swallowing any fault (continue-on-error) so a transient SQL failure or + /// contention never crashes the central singleton — the next tick retries + /// the same window. + /// + private async Task PurgeWithRepositoryAsync(ISiteCallAuditRepository repository, DateTime threshold) + { try { var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false); @@ -527,10 +595,6 @@ public class SiteCallAuditActor : ReceiveActor "SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.", threshold); } - finally - { - scope?.Dispose(); - } } // ── Task 4: read-side (query / detail / KPI) ── diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs index e9e4d950..703b366c 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs @@ -11,5 +11,108 @@ public class SiteCallAuditOptionsTests Assert.Equal(TimeSpan.FromMinutes(10), options.StuckAgeThreshold); // KPI interval mirrors NotificationOutboxOptions.DeliveredKpiWindow. Assert.Equal(TimeSpan.FromMinutes(1), options.KpiInterval); + + // Reconciliation tick cadence mirrors SiteAuditReconciliationOptions (#23). + Assert.Equal(TimeSpan.FromMinutes(5), options.ReconciliationInterval); + // Purge tick cadence mirrors AuditLogPurgeOptions. + Assert.Equal(TimeSpan.FromHours(24), options.PurgeInterval); + // Retention window mirrors the central audit-store retention policy. + Assert.Equal(365, options.RetentionDays); + } + + [Fact] + public void ResolvedReconciliationInterval_DefaultsToConfiguredValue() + { + var options = new SiteCallAuditOptions(); + + Assert.Equal(options.ReconciliationInterval, options.ResolvedReconciliationInterval); + } + + [Theory] + [InlineData(0)] + [InlineData(-5)] + public void ResolvedReconciliationInterval_ClampsZeroOrNegativeToMinimum(int configuredSeconds) + { + // A misconfigured 0 / negative interval must never resolve to TimeSpan.Zero + // (which would make Akka's ScheduleTellRepeatedlyCancelable spin). The + // documented floor is >= 1 second. + var options = new SiteCallAuditOptions + { + ReconciliationInterval = TimeSpan.FromSeconds(configuredSeconds), + }; + + Assert.True( + options.ResolvedReconciliationInterval >= TimeSpan.FromSeconds(1), + $"expected the resolved interval to clamp to >= 1s, got {options.ResolvedReconciliationInterval}"); + Assert.Equal(TimeSpan.FromSeconds(1), options.ResolvedReconciliationInterval); + } + + [Fact] + public void ResolvedReconciliationInterval_OverrideBypassesClamp() + { + // The test-only override drops the cadence below the clamp floor so unit + // tests can run the tick at millisecond cadence. + var sub1Second = TimeSpan.FromMilliseconds(50); + var options = new SiteCallAuditOptions + { + ReconciliationInterval = TimeSpan.FromMinutes(5), + ReconciliationIntervalOverride = sub1Second, + }; + + Assert.Equal(sub1Second, options.ResolvedReconciliationInterval); + } + + [Fact] + public void ResolvedPurgeInterval_DefaultsToConfiguredValue() + { + var options = new SiteCallAuditOptions(); + + Assert.Equal(options.PurgeInterval, options.ResolvedPurgeInterval); + } + + [Theory] + [InlineData(0)] + [InlineData(-30)] + public void ResolvedPurgeInterval_ClampsZeroOrNegativeToMinimum(int configuredSeconds) + { + // A misconfigured 0 / negative purge interval clamps to the documented + // >= 1 minute floor (the purge is daily, so a more generous floor than + // the reconciliation tick). + var options = new SiteCallAuditOptions + { + PurgeInterval = TimeSpan.FromSeconds(configuredSeconds), + }; + + Assert.True( + options.ResolvedPurgeInterval >= TimeSpan.FromMinutes(1), + $"expected the resolved interval to clamp to >= 1min, got {options.ResolvedPurgeInterval}"); + Assert.Equal(TimeSpan.FromMinutes(1), options.ResolvedPurgeInterval); + } + + [Fact] + public void ResolvedPurgeInterval_BelowMinuteFloorClampsToMinimum() + { + // A positive-but-sub-minute config value still clamps to the 1-minute floor. + var options = new SiteCallAuditOptions + { + PurgeInterval = TimeSpan.FromSeconds(5), + }; + + Assert.Equal(TimeSpan.FromMinutes(1), options.ResolvedPurgeInterval); + } + + [Fact] + public void ResolvedPurgeInterval_OverrideBypassesClamp() + { + // The test-only override drops the cadence below the clamp floor so unit + // tests can run the purge tick at millisecond cadence. + var subMinute = TimeSpan.FromMilliseconds(50); + var options = new SiteCallAuditOptions + { + PurgeInterval = TimeSpan.FromHours(24), + PurgeIntervalOverride = subMinute, + }; + + Assert.Equal(subMinute, options.ResolvedPurgeInterval); } }