using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.AuditLog.Central; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests; /// /// Reconciliation-tick tests for (#22, Piece A). /// These exercise the periodic per-site self-heal pull entirely in-memory — /// fake + + a /// recording — so they run in /// milliseconds and do NOT depend on a live MSSQL fixture (unlike the /// MSSQL-backed ). The actor is built via /// the internal test ctor that injects all three collaborators; the /// repo-only test ctor used by the MSSQL tests passes no client/enumerator, so /// the reconciliation tick is gated off there (see /// ). /// public class SiteCallAuditReconciliationTests : TestKit { private static SiteCall NewRow( TrackedOperationId id, string sourceSite, string status = "Submitted", DateTime? updatedAtUtc = null) { var now = updatedAtUtc ?? DateTime.UtcNow; return new SiteCall { TrackedOperationId = id, Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = sourceSite, SourceNode = null, Status = status, RetryCount = 0, LastError = null, HttpStatus = null, CreatedAtUtc = now, UpdatedAtUtc = now, TerminalAtUtc = null, IngestedAtUtc = now, }; } private static SiteCallAuditOptions FastTickOptions(int batchSize = 500) => new() { // 100 ms tick keeps each test under a second; AwaitAssert covers // scheduler jitter so the tick has up to a few seconds to fire. ReconciliationInterval = TimeSpan.FromMinutes(5), ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100), ReconciliationBatchSize = batchSize, }; /// In-memory enumerator returning a static list of sites. private sealed class StaticEnumerator : ISiteEnumerator { private readonly IReadOnlyList _sites; public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; public Task> EnumerateAsync(CancellationToken ct = default) => Task.FromResult(_sites); } /// /// Scripted pull client — returns the next queued response for the site on /// each call (looping the last entry once exhausted) and records every /// invocation so tests can assert call counts + the since cursor. /// private sealed class ScriptedPullClient : IPullSiteCallsClient { public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new(); private readonly Dictionary> _scripted = new(); private readonly Dictionary _throwOnSite = new(); public ScriptedPullClient Script(string siteId, params PullSiteCallsResponse[] responses) { _scripted[siteId] = new Queue(responses); return this; } public ScriptedPullClient ThrowFor(string siteId, Exception ex) { _throwOnSite[siteId] = ex; return this; } public Task PullAsync( string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) { Calls.Add((siteId, sinceUtc, batchSize)); if (_throwOnSite.TryGetValue(siteId, out var ex)) { throw ex; } if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0) { return Task.FromResult(queue.Dequeue()); } return Task.FromResult( new PullSiteCallsResponse(Array.Empty(), MoreAvailable: false)); } } /// /// Recording repository that captures every call /// (keyed by id, last-write-wins on the captured row). The reconciliation /// tick only ever calls ; the read/KPI members are /// inert stubs. /// private sealed class RecordingRepo : ISiteCallAuditRepository { public Dictionary Upserted { get; } = new(); public int UpsertCallCount { get; private set; } public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) { UpsertCallCount++; Upserted[siteCall.TrackedOperationId] = siteCall; return Task.CompletedTask; } public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => Task.FromResult(Upserted.TryGetValue(id, out var row) ? row : null); public Task> QueryAsync( SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => Task.FromResult(0); public Task ComputeKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0)); public Task> ComputePerSiteKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); } private IActorRef CreateActor( ISiteEnumerator sites, IPullSiteCallsClient client, ISiteCallAuditRepository repo, SiteCallAuditOptions options) => Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( repo, sites, client, NullLogger.Instance, options))); // --------------------------------------------------------------------- // 1. AbsentRow_PulledFromSite_IsUpserted // --------------------------------------------------------------------- [Fact] public void ReconciliationTick_AbsentRow_IsUpsertedFromSitePull() { var siteId = "siteA"; var id = TrackedOperationId.New(); var row = NewRow(id, sourceSite: siteId, status: "Parked"); var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083")); var client = new ScriptedPullClient().Script(siteId, new PullSiteCallsResponse(new[] { row }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert( () => { Assert.True(repo.Upserted.ContainsKey(id), "reconciliation tick should upsert the row present at the site but absent centrally"); Assert.Equal("Parked", repo.Upserted[id].Status); Assert.Equal(siteId, repo.Upserted[id].SourceSite); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } // --------------------------------------------------------------------- // 2. Cursor_Advances_ToMaxUpdatedAtUtc_NoRePullOfOldRows // --------------------------------------------------------------------- [Fact] public void ReconciliationTick_SecondTick_AdvancesCursorPastAlreadyPulledRows() { var siteId = "siteA"; var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc); var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc); var r1 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t1); var r2 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t2); var r3 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t3); var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083")); // First pull returns three rows (max UpdatedAtUtc = t3); subsequent // pulls return empty. The second pull's `since` must be t3, proving the // cursor advanced and old rows are not re-pulled from the start. var client = new ScriptedPullClient().Script(siteId, new PullSiteCallsResponse(new[] { r1, r2, r3 }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert( () => Assert.True(client.Calls.Count >= 2, $"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"), duration: TimeSpan.FromSeconds(5), interval: TimeSpan.FromMilliseconds(50)); Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc); Assert.Equal(t3, client.Calls[1].SinceUtc); // The batch size flows through from options. Assert.Equal(500, client.Calls[0].BatchSize); } // --------------------------------------------------------------------- // 3. OneSiteThrows_OtherSitesStillProcessed (failure isolation) // --------------------------------------------------------------------- [Fact] public void ReconciliationTick_OneSiteThrows_OtherSitesStillReconciled() { var siteB = "siteB"; var bId = TrackedOperationId.New(); var bRow = NewRow(bId, sourceSite: siteB, status: "Delivered"); var sites = new StaticEnumerator( new SiteEntry("siteA", "http://siteA:8083"), new SiteEntry(siteB, "http://siteB:8083")); var client = new ScriptedPullClient() .ThrowFor("siteA", new InvalidOperationException("simulated transport failure")) .Script(siteB, new PullSiteCallsResponse(new[] { bRow }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert( () => { // siteA was attempted (and threw) yet siteB's row still landed — // one offline site must not sink the rest of the tick. Assert.Contains(client.Calls, c => c.SiteId == "siteA"); Assert.True(repo.Upserted.ContainsKey(bId), "siteB must be reconciled even though siteA threw"); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } // --------------------------------------------------------------------- // 4. RepoOnly test ctor does NOT start the reconciliation tick // --------------------------------------------------------------------- [Fact] public void TestCtor_RepositoryOnly_DoesNotStartReconciliationTick() { // The repo-only test ctor (used by the MSSQL-backed actor tests) injects // no client/enumerator, so the tick must be gated OFF — otherwise those // tests would fire phantom pulls. Build the actor via that ctor and // confirm no pull ever happens. We can't observe a non-event directly, // so we share a ScriptedPullClient with an isolated actor that DOES run // the tick to bound the wait, then assert the repo-only actor's client // (a separate instance) recorded nothing. var repo = new RecordingRepo(); Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( repo, NullLogger.Instance, FastTickOptions()))); // Run a parallel actor with the full reconciliation ctor and a fast // tick; once IT has pulled we know enough wall-clock elapsed that the // repo-only actor would have ticked too, had it been wired. var liveClient = new ScriptedPullClient(); var liveRepo = new RecordingRepo(); CreateActor( new StaticEnumerator(new SiteEntry("siteX", "http://siteX:8083")), liveClient, liveRepo, FastTickOptions()); AwaitAssert( () => Assert.True(liveClient.Calls.Count >= 1), duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); // The repo-only actor never reconciles: it has no client to pull with, // so it upserts nothing on its own. Assert.Equal(0, repo.UpsertCallCount); } }