e427b38fb3
Add a periodic reconciliation tick to SiteCallAuditActor that, per site, pulls changed SiteCall rows since a per-site UpdatedAtUtc cursor and upserts them idempotently (monotonic UpsertAsync) — the documented self-heal for lost best-effort gRPC telemetry. Mirrors SiteAuditReconciliationActor's structure (per-site cursor, per-site try/catch failure isolation, advance cursor by max observed UpdatedAtUtc) minus the stalled-detection EventStream machinery. Dependency wiring: add an acyclic SiteCallAudit -> AuditLog project reference and resolve IPullSiteCallsClient + ISiteEnumerator (central-only singletons registered by AddAuditLogCentralReconciliationClient) from the IServiceProvider the production ctor already holds — no Host Props.Create change needed. The repo-only test ctor injects neither collaborator, so the tick is gated off there. A new public test ctor injects fake client + enumerator + repo so the tick is unit-testable in-memory (public, not internal: Akka's ActivatorProducer uses public-only reflection binding). Options: ReconciliationInterval (default 5 min, clamped >= 1s so a zero config value can't spin the scheduler) + ReconciliationBatchSize (default 500), plus a test-only override that bypasses the clamp for millisecond cadences. Tests (all in-memory, no live MSSQL): absent row is upserted on a tick; second tick advances the cursor past already-pulled rows; one failing site does not sink other sites; repo-only ctor does not start the tick.
301 lines
13 KiB
C#
301 lines
13 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit.Tests;
|
|
|
|
/// <summary>
|
|
/// Reconciliation-tick tests for <see cref="SiteCallAuditActor"/> (#22, Piece A).
|
|
/// These exercise the periodic per-site self-heal pull entirely in-memory —
|
|
/// fake <see cref="IPullSiteCallsClient"/> + <see cref="ISiteEnumerator"/> + a
|
|
/// recording <see cref="ISiteCallAuditRepository"/> — so they run in
|
|
/// milliseconds and do NOT depend on a live MSSQL fixture (unlike the
|
|
/// MSSQL-backed <see cref="SiteCallAuditActorTests"/>). The actor is built via
|
|
/// the internal test ctor that injects all three collaborators; the
|
|
/// repo-only test ctor used by the MSSQL tests passes no client/enumerator, so
|
|
/// the reconciliation tick is gated off there (see
|
|
/// <see cref="TestCtor_RepositoryOnly_DoesNotStartReconciliationTick"/>).
|
|
/// </summary>
|
|
public class SiteCallAuditReconciliationTests : TestKit
|
|
{
|
|
private static SiteCall NewRow(
|
|
TrackedOperationId id,
|
|
string sourceSite,
|
|
string status = "Submitted",
|
|
DateTime? updatedAtUtc = null)
|
|
{
|
|
var now = updatedAtUtc ?? DateTime.UtcNow;
|
|
return new SiteCall
|
|
{
|
|
TrackedOperationId = id,
|
|
Channel = "ApiOutbound",
|
|
Target = "ERP.GetOrder",
|
|
SourceSite = sourceSite,
|
|
SourceNode = null,
|
|
Status = status,
|
|
RetryCount = 0,
|
|
LastError = null,
|
|
HttpStatus = null,
|
|
CreatedAtUtc = now,
|
|
UpdatedAtUtc = now,
|
|
TerminalAtUtc = null,
|
|
IngestedAtUtc = now,
|
|
};
|
|
}
|
|
|
|
private static SiteCallAuditOptions FastTickOptions(int batchSize = 500) => new()
|
|
{
|
|
// 100 ms tick keeps each test under a second; AwaitAssert covers
|
|
// scheduler jitter so the tick has up to a few seconds to fire.
|
|
ReconciliationInterval = TimeSpan.FromMinutes(5),
|
|
ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
|
|
ReconciliationBatchSize = batchSize,
|
|
};
|
|
|
|
/// <summary>In-memory enumerator returning a static list of sites.</summary>
|
|
private sealed class StaticEnumerator : ISiteEnumerator
|
|
{
|
|
private readonly IReadOnlyList<SiteEntry> _sites;
|
|
public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
|
|
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
|
|
Task.FromResult(_sites);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Scripted pull client — returns the next queued response for the site on
|
|
/// each call (looping the last entry once exhausted) and records every
|
|
/// invocation so tests can assert call counts + the <c>since</c> cursor.
|
|
/// </summary>
|
|
private sealed class ScriptedPullClient : IPullSiteCallsClient
|
|
{
|
|
public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new();
|
|
private readonly Dictionary<string, Queue<PullSiteCallsResponse>> _scripted = new();
|
|
private readonly Dictionary<string, Exception> _throwOnSite = new();
|
|
|
|
public ScriptedPullClient Script(string siteId, params PullSiteCallsResponse[] responses)
|
|
{
|
|
_scripted[siteId] = new Queue<PullSiteCallsResponse>(responses);
|
|
return this;
|
|
}
|
|
|
|
public ScriptedPullClient ThrowFor(string siteId, Exception ex)
|
|
{
|
|
_throwOnSite[siteId] = ex;
|
|
return this;
|
|
}
|
|
|
|
public Task<PullSiteCallsResponse> PullAsync(
|
|
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
|
|
{
|
|
Calls.Add((siteId, sinceUtc, batchSize));
|
|
if (_throwOnSite.TryGetValue(siteId, out var ex))
|
|
{
|
|
throw ex;
|
|
}
|
|
if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0)
|
|
{
|
|
return Task.FromResult(queue.Dequeue());
|
|
}
|
|
return Task.FromResult(
|
|
new PullSiteCallsResponse(Array.Empty<SiteCall>(), MoreAvailable: false));
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Recording repository that captures every <see cref="UpsertAsync"/> call
|
|
/// (keyed by id, last-write-wins on the captured row). The reconciliation
|
|
/// tick only ever calls <see cref="UpsertAsync"/>; the read/KPI members are
|
|
/// inert stubs.
|
|
/// </summary>
|
|
private sealed class RecordingRepo : ISiteCallAuditRepository
|
|
{
|
|
public Dictionary<TrackedOperationId, SiteCall> Upserted { get; } = new();
|
|
public int UpsertCallCount { get; private set; }
|
|
|
|
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default)
|
|
{
|
|
UpsertCallCount++;
|
|
Upserted[siteCall.TrackedOperationId] = siteCall;
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
|
|
Task.FromResult(Upserted.TryGetValue(id, out var row) ? row : null);
|
|
|
|
public Task<IReadOnlyList<SiteCall>> QueryAsync(
|
|
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
|
|
Task.FromResult<IReadOnlyList<SiteCall>>(Array.Empty<SiteCall>());
|
|
|
|
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
|
|
Task.FromResult(0);
|
|
|
|
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
Task.FromResult(new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0));
|
|
|
|
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
Task.FromResult<IReadOnlyList<SiteCallSiteKpiSnapshot>>(Array.Empty<SiteCallSiteKpiSnapshot>());
|
|
}
|
|
|
|
private IActorRef CreateActor(
|
|
ISiteEnumerator sites,
|
|
IPullSiteCallsClient client,
|
|
ISiteCallAuditRepository repo,
|
|
SiteCallAuditOptions options) =>
|
|
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
|
|
repo,
|
|
sites,
|
|
client,
|
|
NullLogger<SiteCallAuditActor>.Instance,
|
|
options)));
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 1. AbsentRow_PulledFromSite_IsUpserted
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ReconciliationTick_AbsentRow_IsUpsertedFromSitePull()
|
|
{
|
|
var siteId = "siteA";
|
|
var id = TrackedOperationId.New();
|
|
var row = NewRow(id, sourceSite: siteId, status: "Parked");
|
|
|
|
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083"));
|
|
var client = new ScriptedPullClient().Script(siteId,
|
|
new PullSiteCallsResponse(new[] { row }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
Assert.True(repo.Upserted.ContainsKey(id),
|
|
"reconciliation tick should upsert the row present at the site but absent centrally");
|
|
Assert.Equal("Parked", repo.Upserted[id].Status);
|
|
Assert.Equal(siteId, repo.Upserted[id].SourceSite);
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 2. Cursor_Advances_ToMaxUpdatedAtUtc_NoRePullOfOldRows
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ReconciliationTick_SecondTick_AdvancesCursorPastAlreadyPulledRows()
|
|
{
|
|
var siteId = "siteA";
|
|
var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
|
var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc);
|
|
var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc);
|
|
var r1 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t1);
|
|
var r2 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t2);
|
|
var r3 = NewRow(TrackedOperationId.New(), siteId, updatedAtUtc: t3);
|
|
|
|
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://siteA:8083"));
|
|
// First pull returns three rows (max UpdatedAtUtc = t3); subsequent
|
|
// pulls return empty. The second pull's `since` must be t3, proving the
|
|
// cursor advanced and old rows are not re-pulled from the start.
|
|
var client = new ScriptedPullClient().Script(siteId,
|
|
new PullSiteCallsResponse(new[] { r1, r2, r3 }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(
|
|
() => Assert.True(client.Calls.Count >= 2,
|
|
$"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"),
|
|
duration: TimeSpan.FromSeconds(5),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
|
|
Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc);
|
|
Assert.Equal(t3, client.Calls[1].SinceUtc);
|
|
// The batch size flows through from options.
|
|
Assert.Equal(500, client.Calls[0].BatchSize);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 3. OneSiteThrows_OtherSitesStillProcessed (failure isolation)
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ReconciliationTick_OneSiteThrows_OtherSitesStillReconciled()
|
|
{
|
|
var siteB = "siteB";
|
|
var bId = TrackedOperationId.New();
|
|
var bRow = NewRow(bId, sourceSite: siteB, status: "Delivered");
|
|
|
|
var sites = new StaticEnumerator(
|
|
new SiteEntry("siteA", "http://siteA:8083"),
|
|
new SiteEntry(siteB, "http://siteB:8083"));
|
|
var client = new ScriptedPullClient()
|
|
.ThrowFor("siteA", new InvalidOperationException("simulated transport failure"))
|
|
.Script(siteB, new PullSiteCallsResponse(new[] { bRow }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
// siteA was attempted (and threw) yet siteB's row still landed —
|
|
// one offline site must not sink the rest of the tick.
|
|
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
|
|
Assert.True(repo.Upserted.ContainsKey(bId),
|
|
"siteB must be reconciled even though siteA threw");
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 4. RepoOnly test ctor does NOT start the reconciliation tick
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void TestCtor_RepositoryOnly_DoesNotStartReconciliationTick()
|
|
{
|
|
// The repo-only test ctor (used by the MSSQL-backed actor tests) injects
|
|
// no client/enumerator, so the tick must be gated OFF — otherwise those
|
|
// tests would fire phantom pulls. Build the actor via that ctor and
|
|
// confirm no pull ever happens. We can't observe a non-event directly,
|
|
// so we share a ScriptedPullClient with an isolated actor that DOES run
|
|
// the tick to bound the wait, then assert the repo-only actor's client
|
|
// (a separate instance) recorded nothing.
|
|
var repo = new RecordingRepo();
|
|
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
|
|
repo,
|
|
NullLogger<SiteCallAuditActor>.Instance,
|
|
FastTickOptions())));
|
|
|
|
// Run a parallel actor with the full reconciliation ctor and a fast
|
|
// tick; once IT has pulled we know enough wall-clock elapsed that the
|
|
// repo-only actor would have ticked too, had it been wired.
|
|
var liveClient = new ScriptedPullClient();
|
|
var liveRepo = new RecordingRepo();
|
|
CreateActor(
|
|
new StaticEnumerator(new SiteEntry("siteX", "http://siteX:8083")),
|
|
liveClient,
|
|
liveRepo,
|
|
FastTickOptions());
|
|
|
|
AwaitAssert(
|
|
() => Assert.True(liveClient.Calls.Count >= 1),
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
|
|
// The repo-only actor never reconciles: it has no client to pull with,
|
|
// so it upserts nothing on its own.
|
|
Assert.Equal(0, repo.UpsertCallCount);
|
|
}
|
|
}
|