Files
ScadaBridge/tests/ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests/SiteCallAuditReconciliationTests.cs
T
Joseph Doherty e427b38fb3 feat(sitecallaudit): periodic reconciliation pull back-fills lost telemetry
Add a periodic reconciliation tick to SiteCallAuditActor that, per site,
pulls changed SiteCall rows since a per-site UpdatedAtUtc cursor and upserts
them idempotently (monotonic UpsertAsync) — the documented self-heal for lost
best-effort gRPC telemetry. Mirrors SiteAuditReconciliationActor's structure
(per-site cursor, per-site try/catch failure isolation, advance cursor by max
observed UpdatedAtUtc) minus the stalled-detection EventStream machinery.

Dependency wiring: add an acyclic SiteCallAudit -> AuditLog project reference
and resolve IPullSiteCallsClient + ISiteEnumerator (central-only singletons
registered by AddAuditLogCentralReconciliationClient) from the IServiceProvider
the production ctor already holds — no Host Props.Create change needed. The
repo-only test ctor injects neither collaborator, so the tick is gated off
there. A new public test ctor injects fake client + enumerator + repo so the
tick is unit-testable in-memory (public, not internal: Akka's ActivatorProducer
uses public-only reflection binding).

Options: ReconciliationInterval (default 5 min, clamped >= 1s so a zero config
value can't spin the scheduler) + ReconciliationBatchSize (default 500), plus a
test-only override that bypasses the clamp for millisecond cadences.

Tests (all in-memory, no live MSSQL): absent row is upserted on a tick; second
tick advances the cursor past already-pulled rows; one failing site does not
sink other sites; repo-only ctor does not start the tick.
2026-06-15 12:01:22 -04:00

301 lines
13 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests;
/// <summary>
/// Reconciliation-tick tests for <see cref="SiteCallAuditActor"/> (#22, Piece A).
/// These exercise the periodic per-site self-heal pull entirely in-memory —
/// fake <see cref="IPullSiteCallsClient"/> + <see cref="ISiteEnumerator"/> + a
/// recording <see cref="ISiteCallAuditRepository"/> — so they run in
/// milliseconds and do NOT depend on a live MSSQL fixture (unlike the
/// MSSQL-backed <see cref="SiteCallAuditActorTests"/>). The actor is built via
/// the internal test ctor that injects all three collaborators; the
/// repo-only test ctor used by the MSSQL tests passes no client/enumerator, so
/// the reconciliation tick is gated off there (see
/// <see cref="TestCtor_RepositoryOnly_DoesNotStartReconciliationTick"/>).
/// </summary>
public class SiteCallAuditReconciliationTests : TestKit
{
private static SiteCall NewRow(
TrackedOperationId id,
string sourceSite,
string status = "Submitted",
DateTime? updatedAtUtc = null)
{
var now = updatedAtUtc ?? DateTime.UtcNow;
return new SiteCall
{
TrackedOperationId = id,
Channel = "ApiOutbound",
Target = "ERP.GetOrder",
SourceSite = sourceSite,
SourceNode = null,
Status = status,
RetryCount = 0,
LastError = null,
HttpStatus = null,
CreatedAtUtc = now,
UpdatedAtUtc = now,
TerminalAtUtc = null,
IngestedAtUtc = now,
};
}
private static SiteCallAuditOptions FastTickOptions(int batchSize = 500) => new()
{
// 100 ms tick keeps each test under a second; AwaitAssert covers
// scheduler jitter so the tick has up to a few seconds to fire.
ReconciliationInterval = TimeSpan.FromMinutes(5),
ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
ReconciliationBatchSize = batchSize,
};
/// <summary>In-memory enumerator returning a static list of sites.</summary>
private sealed class StaticEnumerator : ISiteEnumerator
{
private readonly IReadOnlyList<SiteEntry> _sites;
public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
Task.FromResult(_sites);
}
/// <summary>
/// Scripted pull client — returns the next queued response for the site on
/// each call (looping the last entry once exhausted) and records every
/// invocation so tests can assert call counts + the <c>since</c> cursor.
/// </summary>
private sealed class ScriptedPullClient : IPullSiteCallsClient
{
public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new();
private readonly Dictionary<string, Queue<PullSiteCallsResponse>> _scripted = new();
private readonly Dictionary<string, Exception> _throwOnSite = new();
public ScriptedPullClient Script(string siteId, params PullSiteCallsResponse[] responses)
{
_scripted[siteId] = new Queue<PullSiteCallsResponse>(responses);
return this;
}
public ScriptedPullClient ThrowFor(string siteId, Exception ex)
{
_throwOnSite[siteId] = ex;
return this;
}
public Task<PullSiteCallsResponse> PullAsync(
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
{
Calls.Add((siteId, sinceUtc, batchSize));
if (_throwOnSite.TryGetValue(siteId, out var ex))
{
throw ex;
}
if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0)
{
return Task.FromResult(queue.Dequeue());
}
return Task.FromResult(
new PullSiteCallsResponse(Array.Empty<SiteCall>(), MoreAvailable: false));
}
}
/// <summary>
/// Recording repository that captures every <see cref="UpsertAsync"/> call
/// (keyed by id, last-write-wins on the captured row). The reconciliation
/// tick only ever calls <see cref="UpsertAsync"/>; the read/KPI members are
/// inert stubs.
/// </summary>
private sealed class RecordingRepo : ISiteCallAuditRepository
{
public Dictionary<TrackedOperationId, SiteCall> Upserted { get; } = new();
public int UpsertCallCount { get; private set; }
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default)
{
UpsertCallCount++;
Upserted[siteCall.TrackedOperationId] = siteCall;
return Task.CompletedTask;
}
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
Task.FromResult(Upserted.TryGetValue(id, out var row) ? row : null);
public Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<SiteCall>>(Array.Empty<SiteCall>());
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
Task.FromResult(0);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0));
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<SiteCallSiteKpiSnapshot>>(Array.Empty<SiteCallSiteKpiSnapshot>());
}
private IActorRef CreateActor(
ISiteEnumerator sites,
IPullSiteCallsClient client,
ISiteCallAuditRepository repo,
SiteCallAuditOptions options) =>
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
repo,
sites,
client,
NullLogger<SiteCallAuditActor>.Instance,
options)));
// ---------------------------------------------------------------------
// 1. AbsentRow_PulledFromSite_IsUpserted
// ---------------------------------------------------------------------
[Fact]
public void ReconciliationTick_AbsentRow_IsUpsertedFromSitePull()
{
var siteId = "siteA";
var id = TrackedOperationId.New();
var row = NewRow(id, sourceSite: siteId, status: "Parked");
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083"));
var client = new ScriptedPullClient().Script(siteId,
new PullSiteCallsResponse(new[] { row }, MoreAvailable: false));
var repo = new RecordingRepo();
CreateActor(sites, client, repo, FastTickOptions());
AwaitAssert(
() =>
{
Assert.True(repo.Upserted.ContainsKey(id),
"reconciliation tick should upsert the row present at the site but absent centrally");
Assert.Equal("Parked", repo.Upserted[id].Status);
Assert.Equal(siteId, repo.Upserted[id].SourceSite);
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
// ---------------------------------------------------------------------
// 2. Cursor_Advances_ToMaxUpdatedAtUtc_NoRePullOfOldRows
// ---------------------------------------------------------------------
[Fact]
public void ReconciliationTick_SecondTick_AdvancesCursorPastAlreadyPulledRows()
{
var siteId = "siteA";
var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc);
var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc);
var r1 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t1);
var r2 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t2);
var r3 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t3);
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083"));
// First pull returns three rows (max UpdatedAtUtc = t3); subsequent
// pulls return empty. The second pull's `since` must be t3, proving the
// cursor advanced and old rows are not re-pulled from the start.
var client = new ScriptedPullClient().Script(siteId,
new PullSiteCallsResponse(new[] { r1, r2, r3 }, MoreAvailable: false));
var repo = new RecordingRepo();
CreateActor(sites, client, repo, FastTickOptions());
AwaitAssert(
() => Assert.True(client.Calls.Count >= 2,
$"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"),
duration: TimeSpan.FromSeconds(5),
interval: TimeSpan.FromMilliseconds(50));
Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc);
Assert.Equal(t3, client.Calls[1].SinceUtc);
// The batch size flows through from options.
Assert.Equal(500, client.Calls[0].BatchSize);
}
// ---------------------------------------------------------------------
// 3. OneSiteThrows_OtherSitesStillProcessed (failure isolation)
// ---------------------------------------------------------------------
[Fact]
public void ReconciliationTick_OneSiteThrows_OtherSitesStillReconciled()
{
var siteB = "siteB";
var bId = TrackedOperationId.New();
var bRow = NewRow(bId, sourceSite: siteB, status: "Delivered");
var sites = new StaticEnumerator(
new SiteEntry("siteA", "http://siteA:8083"),
new SiteEntry(siteB, "http://siteB:8083"));
var client = new ScriptedPullClient()
.ThrowFor("siteA", new InvalidOperationException("simulated transport failure"))
.Script(siteB, new PullSiteCallsResponse(new[] { bRow }, MoreAvailable: false));
var repo = new RecordingRepo();
CreateActor(sites, client, repo, FastTickOptions());
AwaitAssert(
() =>
{
// siteA was attempted (and threw) yet siteB's row still landed —
// one offline site must not sink the rest of the tick.
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
Assert.True(repo.Upserted.ContainsKey(bId),
"siteB must be reconciled even though siteA threw");
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
// ---------------------------------------------------------------------
// 4. RepoOnly test ctor does NOT start the reconciliation tick
// ---------------------------------------------------------------------
[Fact]
public void TestCtor_RepositoryOnly_DoesNotStartReconciliationTick()
{
// The repo-only test ctor (used by the MSSQL-backed actor tests) injects
// no client/enumerator, so the tick must be gated OFF — otherwise those
// tests would fire phantom pulls. Build the actor via that ctor and
// confirm no pull ever happens. We can't observe a non-event directly,
// so we share a ScriptedPullClient with an isolated actor that DOES run
// the tick to bound the wait, then assert the repo-only actor's client
// (a separate instance) recorded nothing.
var repo = new RecordingRepo();
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
repo,
NullLogger<SiteCallAuditActor>.Instance,
FastTickOptions())));
// Run a parallel actor with the full reconciliation ctor and a fast
// tick; once IT has pulled we know enough wall-clock elapsed that the
// repo-only actor would have ticked too, had it been wired.
var liveClient = new ScriptedPullClient();
var liveRepo = new RecordingRepo();
CreateActor(
new StaticEnumerator(new SiteEntry("siteX", "http://siteX:8083")),
liveClient,
liveRepo,
FastTickOptions());
AwaitAssert(
() => Assert.True(liveClient.Calls.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
// The repo-only actor never reconciles: it has no client to pull with,
// so it upserts nothing on its own.
Assert.Equal(0, repo.UpsertCallCount);
}
}