From e427b38fb3794aa4f7704c13d74a95d23961b7fb Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:01:22 -0400 Subject: [PATCH] feat(sitecallaudit): periodic reconciliation pull back-fills lost telemetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a periodic reconciliation tick to SiteCallAuditActor that, per site, pulls changed SiteCall rows since a per-site UpdatedAtUtc cursor and upserts them idempotently (monotonic UpsertAsync) — the documented self-heal for lost best-effort gRPC telemetry. Mirrors SiteAuditReconciliationActor's structure (per-site cursor, per-site try/catch failure isolation, advance cursor by max observed UpdatedAtUtc) minus the stalled-detection EventStream machinery. Dependency wiring: add an acyclic SiteCallAudit -> AuditLog project reference and resolve IPullSiteCallsClient + ISiteEnumerator (central-only singletons registered by AddAuditLogCentralReconciliationClient) from the IServiceProvider the production ctor already holds — no Host Props.Create change needed. The repo-only test ctor injects neither collaborator, so the tick is gated off there. A new public test ctor injects fake client + enumerator + repo so the tick is unit-testable in-memory (public, not internal: Akka's ActivatorProducer uses public-only reflection binding). Options: ReconciliationInterval (default 5 min, clamped >= 1s so a zero config value can't spin the scheduler) + ReconciliationBatchSize (default 500), plus a test-only override that bypasses the clamp for millisecond cadences. Tests (all in-memory, no live MSSQL): absent row is upserted on a tick; second tick advances the cursor past already-pulled rows; one failing site does not sink other sites; repo-only ctor does not start the tick. --- .../SiteCallAuditActor.cs | 268 +++++++++++++++- .../SiteCallAuditOptions.cs | 58 +++- ...ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj | 9 + .../SiteCallAuditReconciliationTests.cs | 300 ++++++++++++++++++ 4 files changed, 623 insertions(+), 12 deletions(-) create mode 100644 tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditReconciliationTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs index 320a7227..a4a31cf2 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs @@ -1,6 +1,7 @@ using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; @@ -24,13 +25,17 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// /// Implemented: direct telemetry ingest, -/// query, detail and KPI handlers (Task 4), and the central→site Retry/Discard -/// relay (Task 5 — the relay handlers live in this actor). Deferred (per -/// CLAUDE.md scope discipline — both land in a later follow-up): the periodic -/// per-site reconciliation puller that backfills lost telemetry, and the daily -/// terminal-row purge scheduler (the repository exposes -/// PurgeTerminalAsync but nothing in this module currently invokes it -/// on a schedule). +/// query, detail and KPI handlers (Task 4), the central→site Retry/Discard +/// relay (Task 5 — the relay handlers live in this actor), and the periodic +/// per-site reconciliation puller that backfills lost telemetry (Piece A — +/// , the documented self-heal pull). The +/// reconciliation timer is started in and gates on the +/// reconciliation collaborators ( + +/// ) being available — the repo-only test ctor +/// injects neither, so the timer does not run there. Deferred (next commit): +/// the daily terminal-row purge scheduler (the repository exposes +/// PurgeTerminalAsync but nothing in this module invokes it on a timer +/// yet). /// /// /// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — @@ -68,6 +73,34 @@ public class SiteCallAuditActor : ReceiveActor private readonly SiteCallAuditOptions _options; private readonly ILogger _logger; + /// + /// Reconciliation collaborators (Piece A). The per-site self-heal pull + /// () and the site list + /// (). On the production path these are + /// resolved once from the root (central + /// singletons registered by AddAuditLogCentralReconciliationClient); + /// in the test path they are injected directly. They are null when + /// the actor was built via the repo-only test ctor — in that case the + /// reconciliation tick is NOT started (see ). + /// + private readonly IPullSiteCallsClient? _pullClient; + private readonly ISiteEnumerator? _siteEnumerator; + + /// + /// Per-site reconciliation watermark — the highest + /// seen for that site on a previous + /// tick. The next tick asks for rows at or after this cursor; idempotent + /// monotonic swallows any + /// duplicate-with-same-timestamp rows. In-memory for the singleton's + /// lifetime — a failover / restart resets every cursor to + /// , which is conservative but correct + /// (the next tick re-pulls and idempotent upsert dedupes). Mirrors + /// SiteAuditReconciliationActor. + /// + private readonly Dictionary _reconciliationCursors = new(); + + private ICancelable? _reconciliationTimer; + /// /// Task 5 (#22): the central→site command transport — the /// CentralCommunicationActor, which owns the per-site @@ -87,6 +120,11 @@ public class SiteCallAuditActor : ReceiveActor /// across every message. Used by Bundle C's MSSQL-backed TestKit fixture. /// An optional lets a test pin the stuck/KPI /// windows; when omitted the production defaults apply. + /// + /// This ctor injects NO reconciliation client/enumerator, so the + /// reconciliation tick is gated off (see ) + /// — the MSSQL-backed read/upsert tests must not fire phantom pulls. + /// /// /// Concrete repository instance to use for all messages. /// Logger for diagnostics and error reporting. @@ -106,6 +144,48 @@ public class SiteCallAuditActor : ReceiveActor RegisterHandlers(); } + /// + /// Test-mode constructor for the reconciliation tick (Piece A) — injects a + /// concrete repository PLUS the two reconciliation collaborators directly, + /// so the per-site self-heal pull is unit-testable in-memory without a DI + /// container or a live gRPC channel. Because the client + enumerator are + /// present, the reconciliation tick IS started (it gates on the + /// collaborators being available — see ). + /// + /// Concrete repository instance used for upserts and purges. + /// Enumerates the sites to reconcile each tick. + /// Pull client used to fetch changed rows from each site. + /// Logger for diagnostics and error reporting. + /// Optional configuration overrides; production defaults apply when null. + /// + /// Public (not internal) because Akka's default ActivatorProducer + /// instantiates the actor via reflection with public-only binding flags — + /// an internal ctor yields a MissingMethodException at actor + /// creation. Distinguished from the production + /// ctor by its concrete-collaborator parameter list; only the test project + /// (or a host that hand-resolves the collaborators) constructs it this way. + /// + public SiteCallAuditActor( + ISiteCallAuditRepository repository, + ISiteEnumerator siteEnumerator, + IPullSiteCallsClient pullClient, + ILogger logger, + SiteCallAuditOptions? options = null) + { + ArgumentNullException.ThrowIfNull(repository); + ArgumentNullException.ThrowIfNull(siteEnumerator); + ArgumentNullException.ThrowIfNull(pullClient); + ArgumentNullException.ThrowIfNull(logger); + + _injectedRepository = repository; + _siteEnumerator = siteEnumerator; + _pullClient = pullClient; + _logger = logger; + _options = options ?? new SiteCallAuditOptions(); + + RegisterHandlers(); + } + /// /// Production constructor — resolves /// from a fresh DI scope per message because the repository is a scoped EF @@ -129,6 +209,17 @@ public class SiteCallAuditActor : ReceiveActor _options = options; _logger = logger; + // Reconciliation collaborators (Piece A) are central-only singletons + // registered by AddAuditLogCentralReconciliationClient — always on the + // central composition root (Program.cs). Resolve them once here (the + // actor itself is a long-lived singleton; the repository is the only + // scoped service and is still resolved per-tick/per-message). GetService + // (not GetRequiredService) so a host that somehow omits the helper + // degrades to "no reconciliation tick" rather than a startup crash — + // the tick startup gates on both being non-null. + _pullClient = serviceProvider.GetService(); + _siteEnumerator = serviceProvider.GetService(); + RegisterHandlers(); } @@ -154,6 +245,49 @@ public class SiteCallAuditActor : ReceiveActor }); Receive(HandleRetrySiteCall); Receive(HandleDiscardSiteCall); + + // Piece A (#22): self-tick for the periodic reconciliation pull. The + // handler stays alive across faults via its own per-site try/catch + // (mirroring the ingest path); the timer is only started when the + // reconciliation collaborators are available. + ReceiveAsync(_ => OnReconciliationTickAsync()); + } + + /// + protected override void PreStart() + { + base.PreStart(); + StartReconciliationTimer(); + } + + /// + protected override void PostStop() + { + _reconciliationTimer?.Cancel(); + base.PostStop(); + } + + /// + /// Starts the periodic reconciliation tick — but ONLY when both the pull + /// client and the site enumerator are available. The repo-only test ctor + /// injects neither, so the tick is gated off there (the MSSQL read/upsert + /// tests must not fire phantom pulls); the reconciliation test ctor and the + /// production ctor (which resolves both from the SP) start it. + /// + private void StartReconciliationTimer() + { + if (_pullClient is null || _siteEnumerator is null) + { + return; + } + + var interval = _options.ResolvedReconciliationInterval; + _reconciliationTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + initialDelay: interval, + interval: interval, + receiver: Self, + message: ReconciliationTick.Instance, + sender: Self); } /// @@ -212,6 +346,119 @@ public class SiteCallAuditActor : ReceiveActor } } + // ── Piece A: periodic per-site reconciliation pull (self-heal) ── + + /// + /// One reconciliation pass: enumerate every known site and, per site, pull + /// changed rows since that site's cursor and upsert + /// them idempotently — the documented self-heal when best-effort gRPC push + /// telemetry is lost. This is a mirror, NOT a dispatcher: cached-call + /// delivery stays site-local; upserting reconciled rows only refreshes the + /// eventually-consistent central SiteCalls mirror. + /// + /// + /// Mirrors SiteAuditReconciliationActor's structure (per-site cursor, + /// per-site try/catch failure isolation, advance the cursor by the max + /// observed ) but is deliberately simpler: + /// no stalled-detection EventStream machinery — just cursor + pull + upsert + /// + advance. One DI scope per tick is opened and the same repository reused + /// across every site in that tick. + /// + private async Task OnReconciliationTickAsync() + { + // The collaborators are guaranteed non-null: the tick is only scheduled + // when both are present (StartReconciliationTimer). Assert via the + // local copies so a future refactor that drops the gate fails loudly. + var enumerator = _siteEnumerator!; + var client = _pullClient!; + + IReadOnlyList sites; + try + { + // No ambient CancellationToken in a ReceiveActor handler — None is + // intentional; the work is bounded by the reconciliation interval + // plus the singleton's graceful-stop drain on PhaseClusterLeave. + sites = await enumerator.EnumerateAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "SiteCallAudit site enumeration failed; skipping reconciliation tick."); + return; + } + + if (sites.Count == 0) + { + return; + } + + var (scope, repository) = ResolveRepository(); + try + { + foreach (var site in sites) + { + try + { + await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false); + } + catch (Exception ex) + { + // Failure-isolation invariant: one site's fault (transport, + // repository write) must NOT sink the rest of the tick. The + // failing site's cursor is left at its previous value so the + // next tick retries the same window. + _logger.LogWarning( + ex, + "SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.", + site.SiteId); + } + } + } + finally + { + scope?.Dispose(); + } + } + + /// + /// Issues one PullSiteCalls RPC against the site, upserts the + /// returned rows idempotently, and advances the site's cursor to the maximum + /// observed. The pull client returns rows + /// oldest-first with SourceSite already re-stamped from the dialed + /// site id, so the actor upserts them verbatim (re-stamping + /// IngestedAtUtc at central persist time, as the telemetry path does). + /// + private async Task ReconcileSiteAsync( + SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository) + { + var since = _reconciliationCursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue; + var response = await client + .PullAsync(site.SiteId, since, _options.ReconciliationBatchSize, CancellationToken.None) + .ConfigureAwait(false); + + var maxUpdated = since; + var nowUtc = DateTime.UtcNow; + foreach (var row in response.SiteCalls) + { + // IngestedAtUtc is the "central ingested (or last refreshed) this + // row" stamp — owned by the central actor, exactly as OnUpsertAsync + // does for the telemetry path. Monotonic UpsertAsync makes a row + // already present (from a prior push) a silent no-op. + var siteCall = row with { IngestedAtUtc = nowUtc }; + await repository.UpsertAsync(siteCall).ConfigureAwait(false); + + if (row.UpdatedAtUtc > maxUpdated) + { + maxUpdated = row.UpdatedAtUtc; + } + } + + // Advance the cursor to the newest row seen. A MoreAvailable response + // means the site saturated the batch; the next tick continues draining + // from the advanced cursor (no immediate re-pull loop — the natural + // tick cadence drains the backlog, matching SiteAuditReconciliationActor). + _reconciliationCursors[site.SiteId] = maxUpdated; + } + // ── Task 4: read-side (query / detail / KPI) ── /// @@ -693,6 +940,13 @@ public class SiteCallAuditActor : ReceiveActor { return string.IsNullOrWhiteSpace(value) ? null : value; } + + /// Self-tick triggering a reconciliation pass across all sites (Piece A). + internal sealed class ReconciliationTick + { + public static readonly ReconciliationTick Instance = new(); + private ReconciliationTick() { } + } } /// diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs index a5db3102..134e606e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs @@ -1,11 +1,11 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// -/// Configuration options for the Site Call Audit (#22) read-side: stuck-call -/// detection and KPI windowing. Mirrors the KPI-relevant subset of -/// NotificationOutboxOptions — the reconciliation, purge and dispatch -/// cadence options the Notification Outbox carries are not part of the Site -/// Call Audit read-side backend and are deliberately omitted here. +/// Configuration options for the Site Call Audit (#22): stuck-call detection + +/// KPI windowing for the read-side, plus the cadence knobs for the periodic +/// per-site reconciliation pull (self-heal for lost telemetry). Mirrors the +/// KPI-relevant subset of NotificationOutboxOptions and the +/// scheduler-cadence shape of SiteAuditReconciliationOptions. /// public class SiteCallAuditOptions { @@ -44,4 +44,52 @@ public class SiteCallAuditOptions /// /// public TimeSpan RelayTimeout { get; set; } = TimeSpan.FromSeconds(10); + + // ── Reconciliation tick (#22): periodic per-site self-heal pull ── + + /// + /// Period of the reconciliation tick. Each tick visits every known site + /// once, pulls changed SiteCall rows since a per-site cursor, and + /// upserts them idempotently — the documented self-heal when best-effort + /// push telemetry is lost. Default 5 minutes, matching the sibling + /// SiteAuditReconciliationOptions (#23) cadence. Clamped to at least + /// via . + /// + public TimeSpan ReconciliationInterval { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// Test-only override for the reconciliation tick cadence — bypasses the + /// clamp so unit tests can drop the + /// cadence to milliseconds. Production config never sets this; leave null. + /// + public TimeSpan? ReconciliationIntervalOverride { get; set; } + + /// + /// Maximum number of SiteCall rows requested per PullSiteCalls + /// RPC. Default 500. A MoreAvailable=true response signals the cursor + /// advanced and the next tick should keep draining the backlog. + /// + public int ReconciliationBatchSize { get; set; } = 500; + + /// + /// Minimum interval the config-bound can + /// resolve to. Clamps a misconfigured 0 (or negative) value away from + /// , which would make Akka's + /// ScheduleTellRepeatedlyCancelable spin — the exact footgun flagged in + /// a prior review of the sibling reconciliation options. + /// + private static readonly TimeSpan MinReconciliationInterval = TimeSpan.FromSeconds(1); + + /// + /// Resolves the effective reconciliation tick interval: the test override + /// when set (bypassing the clamp), otherwise + /// clamped to at least so a + /// zero/negative config value can never yield . + /// + public TimeSpan ResolvedReconciliationInterval => + ReconciliationIntervalOverride is { } o + ? o + : ReconciliationInterval < MinReconciliationInterval + ? MinReconciliationInterval + : ReconciliationInterval; } diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj index cca4f3eb..c2de6728 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj +++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj @@ -29,6 +29,15 @@ the same transport every other central→site command uses. SiteEnvelope is defined in ZB.MOM.WW.ScadaBridge.Communication (no cycle: Communication does not reference SiteCallAudit). --> + + diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditReconciliationTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditReconciliationTests.cs new file mode 100644 index 00000000..ac2f86b0 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditReconciliationTests.cs @@ -0,0 +1,300 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; +using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; + +namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests; + +/// +/// Reconciliation-tick tests for (#22, Piece A). +/// These exercise the periodic per-site self-heal pull entirely in-memory — +/// fake + + a +/// recording — so they run in +/// milliseconds and do NOT depend on a live MSSQL fixture (unlike the +/// MSSQL-backed ). The actor is built via +/// the internal test ctor that injects all three collaborators; the +/// repo-only test ctor used by the MSSQL tests passes no client/enumerator, so +/// the reconciliation tick is gated off there (see +/// ). +/// +public class SiteCallAuditReconciliationTests : TestKit +{ + private static SiteCall NewRow( + TrackedOperationId id, + string sourceSite, + string status = "Submitted", + DateTime? updatedAtUtc = null) + { + var now = updatedAtUtc ?? DateTime.UtcNow; + return new SiteCall + { + TrackedOperationId = id, + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = sourceSite, + SourceNode = null, + Status = status, + RetryCount = 0, + LastError = null, + HttpStatus = null, + CreatedAtUtc = now, + UpdatedAtUtc = now, + TerminalAtUtc = null, + IngestedAtUtc = now, + }; + } + + private static SiteCallAuditOptions FastTickOptions(int batchSize = 500) => new() + { + // 100 ms tick keeps each test under a second; AwaitAssert covers + // scheduler jitter so the tick has up to a few seconds to fire. + ReconciliationInterval = TimeSpan.FromMinutes(5), + ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100), + ReconciliationBatchSize = batchSize, + }; + + /// In-memory enumerator returning a static list of sites. + private sealed class StaticEnumerator : ISiteEnumerator + { + private readonly IReadOnlyList _sites; + public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult(_sites); + } + + /// + /// Scripted pull client — returns the next queued response for the site on + /// each call (looping the last entry once exhausted) and records every + /// invocation so tests can assert call counts + the since cursor. + /// + private sealed class ScriptedPullClient : IPullSiteCallsClient + { + public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new(); + private readonly Dictionary> _scripted = new(); + private readonly Dictionary _throwOnSite = new(); + + public ScriptedPullClient Script(string siteId, params PullSiteCallsResponse[] responses) + { + _scripted[siteId] = new Queue(responses); + return this; + } + + public ScriptedPullClient ThrowFor(string siteId, Exception ex) + { + _throwOnSite[siteId] = ex; + return this; + } + + public Task PullAsync( + string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) + { + Calls.Add((siteId, sinceUtc, batchSize)); + if (_throwOnSite.TryGetValue(siteId, out var ex)) + { + throw ex; + } + if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0) + { + return Task.FromResult(queue.Dequeue()); + } + return Task.FromResult( + new PullSiteCallsResponse(Array.Empty(), MoreAvailable: false)); + } + } + + /// + /// Recording repository that captures every call + /// (keyed by id, last-write-wins on the captured row). The reconciliation + /// tick only ever calls ; the read/KPI members are + /// inert stubs. + /// + private sealed class RecordingRepo : ISiteCallAuditRepository + { + public Dictionary Upserted { get; } = new(); + public int UpsertCallCount { get; private set; } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) + { + UpsertCallCount++; + Upserted[siteCall.TrackedOperationId] = siteCall; + return Task.CompletedTask; + } + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + Task.FromResult(Upserted.TryGetValue(id, out var row) ? row : null); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + Task.FromResult(0); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + private IActorRef CreateActor( + ISiteEnumerator sites, + IPullSiteCallsClient client, + ISiteCallAuditRepository repo, + SiteCallAuditOptions options) => + Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + repo, + sites, + client, + NullLogger.Instance, + options))); + + // --------------------------------------------------------------------- + // 1. AbsentRow_PulledFromSite_IsUpserted + // --------------------------------------------------------------------- + + [Fact] + public void ReconciliationTick_AbsentRow_IsUpsertedFromSitePull() + { + var siteId = "siteA"; + var id = TrackedOperationId.New(); + var row = NewRow(id, sourceSite: siteId, status: "Parked"); + + var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083")); + var client = new ScriptedPullClient().Script(siteId, + new PullSiteCallsResponse(new[] { row }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert( + () => + { + Assert.True(repo.Upserted.ContainsKey(id), + "reconciliation tick should upsert the row present at the site but absent centrally"); + Assert.Equal("Parked", repo.Upserted[id].Status); + Assert.Equal(siteId, repo.Upserted[id].SourceSite); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + // --------------------------------------------------------------------- + // 2. Cursor_Advances_ToMaxUpdatedAtUtc_NoRePullOfOldRows + // --------------------------------------------------------------------- + + [Fact] + public void ReconciliationTick_SecondTick_AdvancesCursorPastAlreadyPulledRows() + { + var siteId = "siteA"; + var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc); + var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc); + var r1 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t1); + var r2 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t2); + var r3 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t3); + + var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083")); + // First pull returns three rows (max UpdatedAtUtc = t3); subsequent + // pulls return empty. The second pull's `since` must be t3, proving the + // cursor advanced and old rows are not re-pulled from the start. + var client = new ScriptedPullClient().Script(siteId, + new PullSiteCallsResponse(new[] { r1, r2, r3 }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert( + () => Assert.True(client.Calls.Count >= 2, + $"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"), + duration: TimeSpan.FromSeconds(5), + interval: TimeSpan.FromMilliseconds(50)); + + Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc); + Assert.Equal(t3, client.Calls[1].SinceUtc); + // The batch size flows through from options. + Assert.Equal(500, client.Calls[0].BatchSize); + } + + // --------------------------------------------------------------------- + // 3. OneSiteThrows_OtherSitesStillProcessed (failure isolation) + // --------------------------------------------------------------------- + + [Fact] + public void ReconciliationTick_OneSiteThrows_OtherSitesStillReconciled() + { + var siteB = "siteB"; + var bId = TrackedOperationId.New(); + var bRow = NewRow(bId, sourceSite: siteB, status: "Delivered"); + + var sites = new StaticEnumerator( + new SiteEntry("siteA", "http://siteA:8083"), + new SiteEntry(siteB, "http://siteB:8083")); + var client = new ScriptedPullClient() + .ThrowFor("siteA", new InvalidOperationException("simulated transport failure")) + .Script(siteB, new PullSiteCallsResponse(new[] { bRow }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert( + () => + { + // siteA was attempted (and threw) yet siteB's row still landed — + // one offline site must not sink the rest of the tick. + Assert.Contains(client.Calls, c => c.SiteId == "siteA"); + Assert.True(repo.Upserted.ContainsKey(bId), + "siteB must be reconciled even though siteA threw"); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + // --------------------------------------------------------------------- + // 4. RepoOnly test ctor does NOT start the reconciliation tick + // --------------------------------------------------------------------- + + [Fact] + public void TestCtor_RepositoryOnly_DoesNotStartReconciliationTick() + { + // The repo-only test ctor (used by the MSSQL-backed actor tests) injects + // no client/enumerator, so the tick must be gated OFF — otherwise those + // tests would fire phantom pulls. Build the actor via that ctor and + // confirm no pull ever happens. We can't observe a non-event directly, + // so we share a ScriptedPullClient with an isolated actor that DOES run + // the tick to bound the wait, then assert the repo-only actor's client + // (a separate instance) recorded nothing. + var repo = new RecordingRepo(); + Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + repo, + NullLogger.Instance, + FastTickOptions()))); + + // Run a parallel actor with the full reconciliation ctor and a fast + // tick; once IT has pulled we know enough wall-clock elapsed that the + // repo-only actor would have ticked too, had it been wired. + var liveClient = new ScriptedPullClient(); + var liveRepo = new RecordingRepo(); + CreateActor( + new StaticEnumerator(new SiteEntry("siteX", "http://siteX:8083")), + liveClient, + liveRepo, + FastTickOptions()); + + AwaitAssert( + () => Assert.True(liveClient.Calls.Count >= 1), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + + // The repo-only actor never reconciles: it has no client to pull with, + // so it upserts nothing on its own. + Assert.Equal(0, repo.UpsertCallCount); + } +}