using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Audit; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.SiteCallAudit.Tests; /// /// Bundle C1 (#22, #23 M3) tests for . Uses the /// same as the Bundle B3 repository tests /// so the actor exercises the real monotonic-upsert SQL end to end against the /// SiteCalls schema. Each test scopes its data by minting a fresh /// (and a per-test SourceSite suffix) /// so tests neither collide nor require teardown. /// public class SiteCallAuditActorTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public SiteCallAuditActorTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private ScadaLinkDbContext CreateContext() { var options = new DbContextOptionsBuilder() .UseSqlServer(_fixture.ConnectionString) .Options; return new ScadaLinkDbContext(options); } private static string NewSiteId() => "test-bundle-c1-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static SiteCall NewRow( TrackedOperationId id, string sourceSite, string status = "Submitted", int retryCount = 0, string? lastError = null, DateTime? createdAtUtc = null, DateTime? updatedAtUtc = null, bool terminal = false) { var created = createdAtUtc ?? DateTime.UtcNow; var updated = updatedAtUtc ?? created; return new SiteCall { TrackedOperationId = id, Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = sourceSite, Status = status, RetryCount = retryCount, LastError = lastError, HttpStatus = null, CreatedAtUtc = created, UpdatedAtUtc = updated, TerminalAtUtc = terminal ? updated : null, IngestedAtUtc = DateTime.UtcNow, }; } private IActorRef CreateActor( ISiteCallAuditRepository repository, SiteCallAuditOptions? options = null) => Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( repository, NullLogger.Instance, options))); [SkippableFact] public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var id = TrackedOperationId.New(); var row = NewRow(id, siteId, status: "Submitted", retryCount: 0); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); actor.Tell(new UpsertSiteCallCommand(row), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(reply.Accepted, "Actor should reply Accepted=true on a successful upsert."); Assert.Equal(id, reply.TrackedOperationId); // Verify the row landed in MSSQL via a fresh context (separate from the // actor's repository context). await using var readContext = CreateContext(); var rows = await readContext.Set() .Where(s => s.SourceSite == siteId) .ToListAsync(); Assert.Single(rows); Assert.Equal("Submitted", rows[0].Status); } [SkippableFact] public async Task Receive_DuplicateUpsert_OlderStatus_NoOp_StillRepliesAccepted() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); // Idempotency contract: a stale/duplicate packet (lower rank than the // stored status) is a silent no-op at the repository — the actor must // still reply Accepted=true so the site is free to consider its // packet acked. Storage state is consistent either way. var siteId = NewSiteId(); var id = TrackedOperationId.New(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); // Land Attempted (rank 2) first. actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Attempted", retryCount: 1, lastError: "first")), TestActor); var firstReply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(firstReply.Accepted); // Late-arriving Submitted (rank 0) — must be no-op in storage and // still acked by the actor. actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Submitted", retryCount: 0)), TestActor); var secondReply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(secondReply.Accepted, "Stale upsert must still be acked (idempotent contract)."); // Storage must still show the rank-2 row, not rolled back. await using var readContext = CreateContext(); var stored = await readContext.Set() .Where(s => s.TrackedOperationId == id) .ToListAsync(); Assert.Single(stored); Assert.Equal("Attempted", stored[0].Status); Assert.Equal(1, stored[0].RetryCount); Assert.Equal("first", stored[0].LastError); } [SkippableFact] public async Task Receive_RepoThrowsTransient_RepliesAccepted_False_ActorStaysAlive() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); // Per CLAUDE.md: audit-write failure NEVER aborts the user-facing // action. The actor must catch the throw, reply Accepted=false, and // stay alive — a follow-up message on the same actor must still be // processed (the singleton cannot die on a transient repo error). var siteId = NewSiteId(); var poisonId = TrackedOperationId.New(); var healthyId = TrackedOperationId.New(); await using var context = CreateContext(); var realRepo = new SiteCallAuditRepository(context); var wrappedRepo = new ThrowingRepository(realRepo, poisonId); var actor = CreateActor(wrappedRepo); // Poison row — the wrapper throws when this id arrives. actor.Tell(new UpsertSiteCallCommand(NewRow(poisonId, siteId, status: "Submitted")), TestActor); var poisonReply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.False(poisonReply.Accepted, "Actor should reply Accepted=false when the repo throws."); Assert.Equal(poisonId, poisonReply.TrackedOperationId); // Healthy follow-up on the SAME actor — must still be processed // (singleton staying alive proves the actor did not crash). actor.Tell(new UpsertSiteCallCommand(NewRow(healthyId, siteId, status: "Submitted")), TestActor); var healthyReply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(healthyReply.Accepted, "Actor must stay alive after a transient repo failure."); Assert.Equal(healthyId, healthyReply.TrackedOperationId); // Verify storage: healthy row landed, poison row did not. await using var readContext = CreateContext(); var rows = await readContext.Set() .Where(s => s.SourceSite == siteId) .ToListAsync(); Assert.Single(rows); Assert.Equal(healthyId, rows[0].TrackedOperationId); } // ── Task 4: read-side (query / detail / KPI) handlers ── [SkippableFact] public async Task SiteCallQueryRequest_FilterBySourceSite_ReturnsMatchingSummaries() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); var t0 = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc); await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: t0)); await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Delivered", createdAtUtc: t0.AddMinutes(1), terminal: true)); actor.Tell( new SiteCallQueryRequest( "corr-q1", StatusFilter: null, SourceSiteFilter: siteId, ChannelFilter: null, TargetKeyword: null, StuckOnly: false, FromUtc: null, ToUtc: null, AfterCreatedAtUtc: null, AfterId: null, PageSize: 50), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(response.Success); Assert.Equal("corr-q1", response.CorrelationId); Assert.Equal(2, response.SiteCalls.Count); Assert.All(response.SiteCalls, s => Assert.Equal(siteId, s.SourceSite)); // Newest first — ordered (CreatedAtUtc DESC). Assert.Equal("Delivered", response.SiteCalls[0].Status); // Cursor echoes the last (oldest) row of the page. Assert.Equal(t0, response.NextAfterCreatedAtUtc); Assert.Equal(response.SiteCalls[^1].TrackedOperationId, response.NextAfterId); } [SkippableFact] public async Task SiteCallQueryRequest_KeysetPaging_AdvancesViaCursor() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc); for (var i = 0; i < 3; i++) { await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, createdAtUtc: t0.AddMinutes(i))); } actor.Tell( new SiteCallQueryRequest( "corr-q2", null, siteId, null, null, false, null, null, null, null, PageSize: 2), TestActor); var page1 = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal(2, page1.SiteCalls.Count); actor.Tell( new SiteCallQueryRequest( "corr-q3", null, siteId, null, null, false, null, null, page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2), TestActor); var page2 = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Single(page2.SiteCalls); // No overlap across the two pages. var allIds = page1.SiteCalls.Concat(page2.SiteCalls) .Select(s => s.TrackedOperationId).ToHashSet(); Assert.Equal(3, allIds.Count); } [SkippableFact] public async Task SiteCallQueryRequest_StuckOnly_ReturnsOnlyOldNonTerminalRows() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); // 10-minute stuck threshold (the production default). var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) }); var now = DateTime.UtcNow; // Stuck: non-terminal (Attempted, TerminalAtUtc null), created 30 min ago. var stuckId = TrackedOperationId.New(); await repo.UpsertAsync(NewRow(stuckId, siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); // Not stuck: non-terminal but recent. await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); // Not stuck: old but terminal (Delivered, TerminalAtUtc set). await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Delivered", createdAtUtc: now.AddMinutes(-40), terminal: true)); actor.Tell( new SiteCallQueryRequest( "corr-stuck", null, siteId, null, null, StuckOnly: true, null, null, null, null, PageSize: 50), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(response.Success); Assert.Single(response.SiteCalls); Assert.Equal(stuckId.Value, response.SiteCalls[0].TrackedOperationId); Assert.True(response.SiteCalls[0].IsStuck); } [SkippableFact] public async Task SiteCallQueryRequest_StuckOnly_PagesAreFull_NoEmptyPagesWithCursor() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) }); var now = DateTime.UtcNow; // Three stuck rows interleaved (by CreatedAtUtc) with three non-stuck // rows: recent non-terminal and old-but-terminal. With the StuckOnly // filter pushed into SQL, a page-size-2 query must return exactly the // stuck rows two-per-page — never an under-filled page with a non-null // next cursor caused by post-filtering. for (var i = 0; i < 3; i++) { await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30 - i))); await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2 - i))); await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Delivered", createdAtUtc: now.AddMinutes(-40 - i), terminal: true)); } actor.Tell( new SiteCallQueryRequest( "corr-stuck-p1", null, siteId, null, null, StuckOnly: true, null, null, null, null, PageSize: 2), TestActor); var page1 = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(page1.Success); // Page is full — two stuck rows, both honestly stuck. Assert.Equal(2, page1.SiteCalls.Count); Assert.All(page1.SiteCalls, s => Assert.True(s.IsStuck)); Assert.NotNull(page1.NextAfterCreatedAtUtc); actor.Tell( new SiteCallQueryRequest( "corr-stuck-p2", null, siteId, null, null, StuckOnly: true, null, null, page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2), TestActor); var page2 = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(page2.Success); // Final page — the third stuck row, the only remaining match. Assert.Single(page2.SiteCalls); Assert.All(page2.SiteCalls, s => Assert.True(s.IsStuck)); // No overlap, exactly the three stuck rows across both pages. var allIds = page1.SiteCalls.Concat(page2.SiteCalls) .Select(s => s.TrackedOperationId).ToHashSet(); Assert.Equal(3, allIds.Count); } [SkippableFact] public async Task SiteCallDetailRequest_KnownId_ReturnsFullDetail() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var id = TrackedOperationId.New(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); await repo.UpsertAsync(NewRow(id, siteId, status: "Attempted", retryCount: 2, lastError: "503")); actor.Tell(new SiteCallDetailRequest("corr-d1", id.Value), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(response.Success); Assert.NotNull(response.Detail); Assert.Equal(id.Value, response.Detail!.TrackedOperationId); Assert.Equal("Attempted", response.Detail.Status); Assert.Equal(2, response.Detail.RetryCount); Assert.Equal("503", response.Detail.LastError); Assert.Equal(siteId, response.Detail.SourceSite); } [SkippableFact] public async Task SiteCallDetailRequest_UnknownId_RepliesNotFound() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo); actor.Tell(new SiteCallDetailRequest("corr-d2", Guid.NewGuid()), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.False(response.Success); Assert.Null(response.Detail); Assert.NotNull(response.ErrorMessage); } [SkippableFact] public async Task SiteCallKpiRequest_ComputesPointInTimeCounts() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10), KpiInterval = TimeSpan.FromHours(1), }); var now = DateTime.UtcNow; // Buffered (non-terminal Attempted) + stuck (created 30 min ago). await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); // Buffered (non-terminal Attempted), not stuck. await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); // Parked (terminal). await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Parked", createdAtUtc: now.AddMinutes(-5), terminal: true)); // Delivered within the interval. await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Delivered", createdAtUtc: now.AddMinutes(-3), updatedAtUtc: now.AddMinutes(-1), terminal: true)); actor.Tell(new SiteCallKpiRequest("corr-kpi"), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(response.Success); // Per-site rows are isolated by the unique siteId — but KPIs are global, // so assert the floor (>=) rather than exact counts: other tests' rows // may share the table. Assert.True(response.BufferedCount >= 2); Assert.True(response.ParkedCount >= 1); Assert.True(response.DeliveredLastInterval >= 1); Assert.True(response.StuckCount >= 1); Assert.NotNull(response.OldestPendingAge); } [SkippableFact] public async Task PerSiteSiteCallKpiRequest_ScopesCountsToEachSite() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var repo = new SiteCallAuditRepository(context); var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10), KpiInterval = TimeSpan.FromHours(1), }); var now = DateTime.UtcNow; // Non-terminal Attempted, created 30 min ago — buffered + stuck. await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); // Terminal Parked. await repo.UpsertAsync(NewRow( TrackedOperationId.New(), siteId, status: "Parked", createdAtUtc: now.AddMinutes(-5), terminal: true)); actor.Tell(new PerSiteSiteCallKpiRequest("corr-psk"), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.True(response.Success); var mySite = Assert.Single(response.Sites, s => s.SourceSite == siteId); Assert.Equal(1, mySite.BufferedCount); Assert.Equal(1, mySite.ParkedCount); Assert.Equal(1, mySite.StuckCount); Assert.NotNull(mySite.OldestPendingAge); } [SkippableFact] public async Task SiteCallQueryRequest_RepoThrows_RepliesFailure_ActorStaysAlive() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); await using var context = CreateContext(); var realRepo = new SiteCallAuditRepository(context); var actor = CreateActor(new QueryThrowingRepository(realRepo)); actor.Tell( new SiteCallQueryRequest( "corr-fault", null, siteId, null, null, false, null, null, null, null, 50), TestActor); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.False(response.Success); Assert.Empty(response.SiteCalls); Assert.NotNull(response.ErrorMessage); Assert.Equal("corr-fault", response.CorrelationId); } /// /// Test double whose always /// throws — used to verify the query handler's failure projection produces a /// Success=false response without crashing the actor. /// private sealed class QueryThrowingRepository : ISiteCallAuditRepository { private readonly ISiteCallAuditRepository _inner; public QueryThrowingRepository(ISiteCallAuditRepository inner) { _inner = inner; } public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => _inner.UpsertAsync(siteCall, ct); public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => _inner.GetAsync(id, ct); public Task> QueryAsync( SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => throw new InvalidOperationException("simulated query failure"); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); public Task ComputeKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); public Task> ComputePerSiteKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } /// /// Tiny test double that delegates to a real repository but throws on a /// specified . Used to verify the actor's /// fault-isolation behaviour: a transient repository failure must produce /// Accepted=false without crashing the singleton. /// private sealed class ThrowingRepository : ISiteCallAuditRepository { private readonly ISiteCallAuditRepository _inner; private readonly TrackedOperationId _poisonId; public ThrowingRepository(ISiteCallAuditRepository inner, TrackedOperationId poisonId) { _inner = inner; _poisonId = poisonId; } public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) { if (siteCall.TrackedOperationId == _poisonId) { throw new InvalidOperationException("simulated transient repo failure for poison row"); } return _inner.UpsertAsync(siteCall, ct); } public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => _inner.GetAsync(id, ct); public Task> QueryAsync( SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => _inner.QueryAsync(filter, paging, ct); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); public Task ComputeKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); public Task> ComputePerSiteKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } }