576 lines
25 KiB
C#
576 lines
25 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Messages.Audit;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Audit;
|
|
using ScadaLink.ConfigurationDatabase;
|
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
|
|
|
namespace ScadaLink.SiteCallAudit.Tests;
|
|
|
|
/// <summary>
|
|
/// Bundle C1 (#22, #23 M3) tests for <see cref="SiteCallAuditActor"/>. Uses the
|
|
/// same <see cref="MsSqlMigrationFixture"/> as the Bundle B3 repository tests
|
|
/// so the actor exercises the real monotonic-upsert SQL end to end against the
|
|
/// <c>SiteCalls</c> schema. Each test scopes its data by minting a fresh
|
|
/// <see cref="TrackedOperationId"/> (and a per-test <c>SourceSite</c> suffix)
|
|
/// so tests neither collide nor require teardown.
|
|
/// </summary>
|
|
public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
|
{
|
|
private readonly MsSqlMigrationFixture _fixture;
|
|
|
|
public SiteCallAuditActorTests(MsSqlMigrationFixture fixture)
|
|
{
|
|
_fixture = fixture;
|
|
}
|
|
|
|
private ScadaLinkDbContext CreateContext()
|
|
{
|
|
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
|
.UseSqlServer(_fixture.ConnectionString)
|
|
.Options;
|
|
return new ScadaLinkDbContext(options);
|
|
}
|
|
|
|
private static string NewSiteId() =>
|
|
"test-bundle-c1-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
|
|
|
private static SiteCall NewRow(
|
|
TrackedOperationId id,
|
|
string sourceSite,
|
|
string status = "Submitted",
|
|
int retryCount = 0,
|
|
string? lastError = null,
|
|
DateTime? createdAtUtc = null,
|
|
DateTime? updatedAtUtc = null,
|
|
bool terminal = false)
|
|
{
|
|
var created = createdAtUtc ?? DateTime.UtcNow;
|
|
var updated = updatedAtUtc ?? created;
|
|
return new SiteCall
|
|
{
|
|
TrackedOperationId = id,
|
|
Channel = "ApiOutbound",
|
|
Target = "ERP.GetOrder",
|
|
SourceSite = sourceSite,
|
|
Status = status,
|
|
RetryCount = retryCount,
|
|
LastError = lastError,
|
|
HttpStatus = null,
|
|
CreatedAtUtc = created,
|
|
UpdatedAtUtc = updated,
|
|
TerminalAtUtc = terminal ? updated : null,
|
|
IngestedAtUtc = DateTime.UtcNow,
|
|
};
|
|
}
|
|
|
|
private IActorRef CreateActor(
|
|
ISiteCallAuditRepository repository, SiteCallAuditOptions? options = null) =>
|
|
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
|
|
repository,
|
|
NullLogger<SiteCallAuditActor>.Instance,
|
|
options)));
|
|
|
|
[SkippableFact]
|
|
public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
var id = TrackedOperationId.New();
|
|
var row = NewRow(id, siteId, status: "Submitted", retryCount: 0);
|
|
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
|
|
|
|
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
|
|
Assert.True(reply.Accepted, "Actor should reply Accepted=true on a successful upsert.");
|
|
Assert.Equal(id, reply.TrackedOperationId);
|
|
|
|
// Verify the row landed in MSSQL via a fresh context (separate from the
|
|
// actor's repository context).
|
|
await using var readContext = CreateContext();
|
|
var rows = await readContext.Set<SiteCall>()
|
|
.Where(s => s.SourceSite == siteId)
|
|
.ToListAsync();
|
|
Assert.Single(rows);
|
|
Assert.Equal("Submitted", rows[0].Status);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task Receive_DuplicateUpsert_OlderStatus_NoOp_StillRepliesAccepted()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
// Idempotency contract: a stale/duplicate packet (lower rank than the
|
|
// stored status) is a silent no-op at the repository — the actor must
|
|
// still reply Accepted=true so the site is free to consider its
|
|
// packet acked. Storage state is consistent either way.
|
|
var siteId = NewSiteId();
|
|
var id = TrackedOperationId.New();
|
|
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
// Land Attempted (rank 2) first.
|
|
actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Attempted", retryCount: 1, lastError: "first")), TestActor);
|
|
var firstReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
|
|
Assert.True(firstReply.Accepted);
|
|
|
|
// Late-arriving Submitted (rank 0) — must be no-op in storage and
|
|
// still acked by the actor.
|
|
actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Submitted", retryCount: 0)), TestActor);
|
|
var secondReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
|
|
Assert.True(secondReply.Accepted, "Stale upsert must still be acked (idempotent contract).");
|
|
|
|
// Storage must still show the rank-2 row, not rolled back.
|
|
await using var readContext = CreateContext();
|
|
var stored = await readContext.Set<SiteCall>()
|
|
.Where(s => s.TrackedOperationId == id)
|
|
.ToListAsync();
|
|
Assert.Single(stored);
|
|
Assert.Equal("Attempted", stored[0].Status);
|
|
Assert.Equal(1, stored[0].RetryCount);
|
|
Assert.Equal("first", stored[0].LastError);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task Receive_RepoThrowsTransient_RepliesAccepted_False_ActorStaysAlive()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
// Per CLAUDE.md: audit-write failure NEVER aborts the user-facing
|
|
// action. The actor must catch the throw, reply Accepted=false, and
|
|
// stay alive — a follow-up message on the same actor must still be
|
|
// processed (the singleton cannot die on a transient repo error).
|
|
var siteId = NewSiteId();
|
|
var poisonId = TrackedOperationId.New();
|
|
var healthyId = TrackedOperationId.New();
|
|
|
|
await using var context = CreateContext();
|
|
var realRepo = new SiteCallAuditRepository(context);
|
|
var wrappedRepo = new ThrowingRepository(realRepo, poisonId);
|
|
var actor = CreateActor(wrappedRepo);
|
|
|
|
// Poison row — the wrapper throws when this id arrives.
|
|
actor.Tell(new UpsertSiteCallCommand(NewRow(poisonId, siteId, status: "Submitted")), TestActor);
|
|
var poisonReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
|
|
Assert.False(poisonReply.Accepted, "Actor should reply Accepted=false when the repo throws.");
|
|
Assert.Equal(poisonId, poisonReply.TrackedOperationId);
|
|
|
|
// Healthy follow-up on the SAME actor — must still be processed
|
|
// (singleton staying alive proves the actor did not crash).
|
|
actor.Tell(new UpsertSiteCallCommand(NewRow(healthyId, siteId, status: "Submitted")), TestActor);
|
|
var healthyReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
|
|
Assert.True(healthyReply.Accepted, "Actor must stay alive after a transient repo failure.");
|
|
Assert.Equal(healthyId, healthyReply.TrackedOperationId);
|
|
|
|
// Verify storage: healthy row landed, poison row did not.
|
|
await using var readContext = CreateContext();
|
|
var rows = await readContext.Set<SiteCall>()
|
|
.Where(s => s.SourceSite == siteId)
|
|
.ToListAsync();
|
|
Assert.Single(rows);
|
|
Assert.Equal(healthyId, rows[0].TrackedOperationId);
|
|
}
|
|
|
|
// ── Task 4: read-side (query / detail / KPI) handlers ──
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallQueryRequest_FilterBySourceSite_ReturnsMatchingSummaries()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
var t0 = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: t0));
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Delivered",
|
|
createdAtUtc: t0.AddMinutes(1), terminal: true));
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-q1", StatusFilter: null, SourceSiteFilter: siteId, ChannelFilter: null,
|
|
TargetKeyword: null, StuckOnly: false, FromUtc: null, ToUtc: null,
|
|
AfterCreatedAtUtc: null, AfterId: null, PageSize: 50),
|
|
TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(response.Success);
|
|
Assert.Equal("corr-q1", response.CorrelationId);
|
|
Assert.Equal(2, response.SiteCalls.Count);
|
|
Assert.All(response.SiteCalls, s => Assert.Equal(siteId, s.SourceSite));
|
|
// Newest first — ordered (CreatedAtUtc DESC).
|
|
Assert.Equal("Delivered", response.SiteCalls[0].Status);
|
|
// Cursor echoes the last (oldest) row of the page.
|
|
Assert.Equal(t0, response.NextAfterCreatedAtUtc);
|
|
Assert.Equal(response.SiteCalls[^1].TrackedOperationId, response.NextAfterId);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallQueryRequest_KeysetPaging_AdvancesViaCursor()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc);
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, createdAtUtc: t0.AddMinutes(i)));
|
|
}
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-q2", null, siteId, null, null, false, null, null, null, null, PageSize: 2),
|
|
TestActor);
|
|
var page1 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.Equal(2, page1.SiteCalls.Count);
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-q3", null, siteId, null, null, false, null, null,
|
|
page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2),
|
|
TestActor);
|
|
var page2 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.Single(page2.SiteCalls);
|
|
|
|
// No overlap across the two pages.
|
|
var allIds = page1.SiteCalls.Concat(page2.SiteCalls)
|
|
.Select(s => s.TrackedOperationId).ToHashSet();
|
|
Assert.Equal(3, allIds.Count);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallQueryRequest_StuckOnly_ReturnsOnlyOldNonTerminalRows()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
// 10-minute stuck threshold (the production default).
|
|
var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) });
|
|
|
|
var now = DateTime.UtcNow;
|
|
// Stuck: non-terminal (Attempted, TerminalAtUtc null), created 30 min ago.
|
|
var stuckId = TrackedOperationId.New();
|
|
await repo.UpsertAsync(NewRow(stuckId, siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
|
|
// Not stuck: non-terminal but recent.
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
|
|
// Not stuck: old but terminal (Delivered, TerminalAtUtc set).
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Delivered",
|
|
createdAtUtc: now.AddMinutes(-40), terminal: true));
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-stuck", null, siteId, null, null, StuckOnly: true,
|
|
null, null, null, null, PageSize: 50),
|
|
TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(response.Success);
|
|
Assert.Single(response.SiteCalls);
|
|
Assert.Equal(stuckId.Value, response.SiteCalls[0].TrackedOperationId);
|
|
Assert.True(response.SiteCalls[0].IsStuck);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallQueryRequest_StuckOnly_PagesAreFull_NoEmptyPagesWithCursor()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) });
|
|
|
|
var now = DateTime.UtcNow;
|
|
// Three stuck rows interleaved (by CreatedAtUtc) with three non-stuck
|
|
// rows: recent non-terminal and old-but-terminal. With the StuckOnly
|
|
// filter pushed into SQL, a page-size-2 query must return exactly the
|
|
// stuck rows two-per-page — never an under-filled page with a non-null
|
|
// next cursor caused by post-filtering.
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Attempted",
|
|
createdAtUtc: now.AddMinutes(-30 - i)));
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Attempted",
|
|
createdAtUtc: now.AddMinutes(-2 - i)));
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Delivered",
|
|
createdAtUtc: now.AddMinutes(-40 - i), terminal: true));
|
|
}
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-stuck-p1", null, siteId, null, null, StuckOnly: true,
|
|
null, null, null, null, PageSize: 2),
|
|
TestActor);
|
|
var page1 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(page1.Success);
|
|
// Page is full — two stuck rows, both honestly stuck.
|
|
Assert.Equal(2, page1.SiteCalls.Count);
|
|
Assert.All(page1.SiteCalls, s => Assert.True(s.IsStuck));
|
|
Assert.NotNull(page1.NextAfterCreatedAtUtc);
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-stuck-p2", null, siteId, null, null, StuckOnly: true,
|
|
null, null, page1.NextAfterCreatedAtUtc, page1.NextAfterId,
|
|
PageSize: 2),
|
|
TestActor);
|
|
var page2 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(page2.Success);
|
|
// Final page — the third stuck row, the only remaining match.
|
|
Assert.Single(page2.SiteCalls);
|
|
Assert.All(page2.SiteCalls, s => Assert.True(s.IsStuck));
|
|
|
|
// No overlap, exactly the three stuck rows across both pages.
|
|
var allIds = page1.SiteCalls.Concat(page2.SiteCalls)
|
|
.Select(s => s.TrackedOperationId).ToHashSet();
|
|
Assert.Equal(3, allIds.Count);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallDetailRequest_KnownId_ReturnsFullDetail()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
var id = TrackedOperationId.New();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
await repo.UpsertAsync(NewRow(id, siteId, status: "Attempted", retryCount: 2, lastError: "503"));
|
|
|
|
actor.Tell(new SiteCallDetailRequest("corr-d1", id.Value), TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(response.Success);
|
|
Assert.NotNull(response.Detail);
|
|
Assert.Equal(id.Value, response.Detail!.TrackedOperationId);
|
|
Assert.Equal("Attempted", response.Detail.Status);
|
|
Assert.Equal(2, response.Detail.RetryCount);
|
|
Assert.Equal("503", response.Detail.LastError);
|
|
Assert.Equal(siteId, response.Detail.SourceSite);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallDetailRequest_UnknownId_RepliesNotFound()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo);
|
|
|
|
actor.Tell(new SiteCallDetailRequest("corr-d2", Guid.NewGuid()), TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.False(response.Success);
|
|
Assert.Null(response.Detail);
|
|
Assert.NotNull(response.ErrorMessage);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallKpiRequest_ComputesPointInTimeCounts()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo, new SiteCallAuditOptions
|
|
{
|
|
StuckAgeThreshold = TimeSpan.FromMinutes(10),
|
|
KpiInterval = TimeSpan.FromHours(1),
|
|
});
|
|
|
|
var now = DateTime.UtcNow;
|
|
// Buffered (non-terminal Attempted) + stuck (created 30 min ago).
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
|
|
// Buffered (non-terminal Attempted), not stuck.
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
|
|
// Parked (terminal).
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Parked",
|
|
createdAtUtc: now.AddMinutes(-5), terminal: true));
|
|
// Delivered within the interval.
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Delivered",
|
|
createdAtUtc: now.AddMinutes(-3), updatedAtUtc: now.AddMinutes(-1), terminal: true));
|
|
|
|
actor.Tell(new SiteCallKpiRequest("corr-kpi"), TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallKpiResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(response.Success);
|
|
// Per-site rows are isolated by the unique siteId — but KPIs are global,
|
|
// so assert the floor (>=) rather than exact counts: other tests' rows
|
|
// may share the table.
|
|
Assert.True(response.BufferedCount >= 2);
|
|
Assert.True(response.ParkedCount >= 1);
|
|
Assert.True(response.DeliveredLastInterval >= 1);
|
|
Assert.True(response.StuckCount >= 1);
|
|
Assert.NotNull(response.OldestPendingAge);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task PerSiteSiteCallKpiRequest_ScopesCountsToEachSite()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var repo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(repo, new SiteCallAuditOptions
|
|
{
|
|
StuckAgeThreshold = TimeSpan.FromMinutes(10),
|
|
KpiInterval = TimeSpan.FromHours(1),
|
|
});
|
|
|
|
var now = DateTime.UtcNow;
|
|
// Non-terminal Attempted, created 30 min ago — buffered + stuck.
|
|
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
|
|
// Terminal Parked.
|
|
await repo.UpsertAsync(NewRow(
|
|
TrackedOperationId.New(), siteId, status: "Parked",
|
|
createdAtUtc: now.AddMinutes(-5), terminal: true));
|
|
|
|
actor.Tell(new PerSiteSiteCallKpiRequest("corr-psk"), TestActor);
|
|
|
|
var response = ExpectMsg<PerSiteSiteCallKpiResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.True(response.Success);
|
|
|
|
var mySite = Assert.Single(response.Sites, s => s.SourceSite == siteId);
|
|
Assert.Equal(1, mySite.BufferedCount);
|
|
Assert.Equal(1, mySite.ParkedCount);
|
|
Assert.Equal(1, mySite.StuckCount);
|
|
Assert.NotNull(mySite.OldestPendingAge);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task SiteCallQueryRequest_RepoThrows_RepliesFailure_ActorStaysAlive()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
await using var context = CreateContext();
|
|
var realRepo = new SiteCallAuditRepository(context);
|
|
var actor = CreateActor(new QueryThrowingRepository(realRepo));
|
|
|
|
actor.Tell(
|
|
new SiteCallQueryRequest(
|
|
"corr-fault", null, siteId, null, null, false, null, null, null, null, 50),
|
|
TestActor);
|
|
|
|
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
|
|
Assert.False(response.Success);
|
|
Assert.Empty(response.SiteCalls);
|
|
Assert.NotNull(response.ErrorMessage);
|
|
Assert.Equal("corr-fault", response.CorrelationId);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Test double whose <see cref="ISiteCallAuditRepository.QueryAsync"/> always
|
|
/// throws — used to verify the query handler's failure projection produces a
|
|
/// <c>Success=false</c> response without crashing the actor.
|
|
/// </summary>
|
|
private sealed class QueryThrowingRepository : ISiteCallAuditRepository
|
|
{
|
|
private readonly ISiteCallAuditRepository _inner;
|
|
|
|
public QueryThrowingRepository(ISiteCallAuditRepository inner)
|
|
{
|
|
_inner = inner;
|
|
}
|
|
|
|
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) =>
|
|
_inner.UpsertAsync(siteCall, ct);
|
|
|
|
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
|
|
_inner.GetAsync(id, ct);
|
|
|
|
public Task<IReadOnlyList<SiteCall>> QueryAsync(
|
|
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
|
|
throw new InvalidOperationException("simulated query failure");
|
|
|
|
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
|
|
_inner.PurgeTerminalAsync(olderThanUtc, ct);
|
|
|
|
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
|
|
|
|
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tiny test double that delegates to a real repository but throws on a
|
|
/// specified <see cref="TrackedOperationId"/>. Used to verify the actor's
|
|
/// fault-isolation behaviour: a transient repository failure must produce
|
|
/// <c>Accepted=false</c> without crashing the singleton.
|
|
/// </summary>
|
|
private sealed class ThrowingRepository : ISiteCallAuditRepository
|
|
{
|
|
private readonly ISiteCallAuditRepository _inner;
|
|
private readonly TrackedOperationId _poisonId;
|
|
|
|
public ThrowingRepository(ISiteCallAuditRepository inner, TrackedOperationId poisonId)
|
|
{
|
|
_inner = inner;
|
|
_poisonId = poisonId;
|
|
}
|
|
|
|
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default)
|
|
{
|
|
if (siteCall.TrackedOperationId == _poisonId)
|
|
{
|
|
throw new InvalidOperationException("simulated transient repo failure for poison row");
|
|
}
|
|
return _inner.UpsertAsync(siteCall, ct);
|
|
}
|
|
|
|
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
|
|
_inner.GetAsync(id, ct);
|
|
|
|
public Task<IReadOnlyList<SiteCall>> QueryAsync(
|
|
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
|
|
_inner.QueryAsync(filter, paging, ct);
|
|
|
|
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
|
|
_inner.PurgeTerminalAsync(olderThanUtc, ct);
|
|
|
|
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
|
|
|
|
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
|
|
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
|
|
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
|
|
}
|
|
}
|