Files
scadalink-design/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs
Joseph Doherty c754666a3d fix(ui): carry SourceNode on SiteCallDetail + NotificationDetail records
The Site Calls and Notifications detail modals were reading SourceNode from
the summary record (d.SourceNode) while every other field read from the
detail record (det.X). The pattern works today because the modal always
opens via a row click that pre-loads the summary, but a future drill-in
from a deep link or refresh path could leave the summary stale or null and
the field would render blank or wrong.

Add SourceNode to both detail records, project it through the actor's
ToDetail mapping, and switch the razor markup to read det.SourceNode. Now
the modal binds uniformly to the detail record across all fields.
2026-05-23 18:37:53 -04:00

680 lines
29 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
namespace ScadaLink.SiteCallAudit.Tests;
/// <summary>
/// Bundle C1 (#22, #23 M3) tests for <see cref="SiteCallAuditActor"/>. Uses the
/// same <see cref="MsSqlMigrationFixture"/> as the Bundle B3 repository tests
/// so the actor exercises the real monotonic-upsert SQL end to end against the
/// <c>SiteCalls</c> schema. Each test scopes its data by minting a fresh
/// <see cref="TrackedOperationId"/> (and a per-test <c>SourceSite</c> suffix)
/// so tests neither collide nor require teardown.
/// </summary>
public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public SiteCallAuditActorTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private static string NewSiteId() =>
"test-bundle-c1-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private static SiteCall NewRow(
TrackedOperationId id,
string sourceSite,
string status = "Submitted",
int retryCount = 0,
string? lastError = null,
DateTime? createdAtUtc = null,
DateTime? updatedAtUtc = null,
bool terminal = false,
string? sourceNode = null)
{
var created = createdAtUtc ?? DateTime.UtcNow;
var updated = updatedAtUtc ?? created;
return new SiteCall
{
TrackedOperationId = id,
Channel = "ApiOutbound",
Target = "ERP.GetOrder",
SourceSite = sourceSite,
SourceNode = sourceNode,
Status = status,
RetryCount = retryCount,
LastError = lastError,
HttpStatus = null,
CreatedAtUtc = created,
UpdatedAtUtc = updated,
TerminalAtUtc = terminal ? updated : null,
IngestedAtUtc = DateTime.UtcNow,
};
}
private IActorRef CreateActor(
ISiteCallAuditRepository repository, SiteCallAuditOptions? options = null) =>
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
repository,
NullLogger<SiteCallAuditActor>.Instance,
options)));
[SkippableFact]
public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var id = TrackedOperationId.New();
var row = NewRow(id, siteId, status: "Submitted", retryCount: 0);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(reply.Accepted, "Actor should reply Accepted=true on a successful upsert.");
Assert.Equal(id, reply.TrackedOperationId);
// Verify the row landed in MSSQL via a fresh context (separate from the
// actor's repository context).
await using var readContext = CreateContext();
var rows = await readContext.Set<SiteCall>()
.Where(s => s.SourceSite == siteId)
.ToListAsync();
Assert.Single(rows);
Assert.Equal("Submitted", rows[0].Status);
}
[SkippableFact]
public async Task Receive_DuplicateUpsert_OlderStatus_NoOp_StillRepliesAccepted()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Idempotency contract: a stale/duplicate packet (lower rank than the
// stored status) is a silent no-op at the repository — the actor must
// still reply Accepted=true so the site is free to consider its
// packet acked. Storage state is consistent either way.
var siteId = NewSiteId();
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
// Land Attempted (rank 2) first.
actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Attempted", retryCount: 1, lastError: "first")), TestActor);
var firstReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(firstReply.Accepted);
// Late-arriving Submitted (rank 0) — must be no-op in storage and
// still acked by the actor.
actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Submitted", retryCount: 0)), TestActor);
var secondReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(secondReply.Accepted, "Stale upsert must still be acked (idempotent contract).");
// Storage must still show the rank-2 row, not rolled back.
await using var readContext = CreateContext();
var stored = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(stored);
Assert.Equal("Attempted", stored[0].Status);
Assert.Equal(1, stored[0].RetryCount);
Assert.Equal("first", stored[0].LastError);
}
[SkippableFact]
public async Task Receive_RepoThrowsTransient_RepliesAccepted_False_ActorStaysAlive()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Per CLAUDE.md: audit-write failure NEVER aborts the user-facing
// action. The actor must catch the throw, reply Accepted=false, and
// stay alive — a follow-up message on the same actor must still be
// processed (the singleton cannot die on a transient repo error).
var siteId = NewSiteId();
var poisonId = TrackedOperationId.New();
var healthyId = TrackedOperationId.New();
await using var context = CreateContext();
var realRepo = new SiteCallAuditRepository(context);
var wrappedRepo = new ThrowingRepository(realRepo, poisonId);
var actor = CreateActor(wrappedRepo);
// Poison row — the wrapper throws when this id arrives.
actor.Tell(new UpsertSiteCallCommand(NewRow(poisonId, siteId, status: "Submitted")), TestActor);
var poisonReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.False(poisonReply.Accepted, "Actor should reply Accepted=false when the repo throws.");
Assert.Equal(poisonId, poisonReply.TrackedOperationId);
// Healthy follow-up on the SAME actor — must still be processed
// (singleton staying alive proves the actor did not crash).
actor.Tell(new UpsertSiteCallCommand(NewRow(healthyId, siteId, status: "Submitted")), TestActor);
var healthyReply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(healthyReply.Accepted, "Actor must stay alive after a transient repo failure.");
Assert.Equal(healthyId, healthyReply.TrackedOperationId);
// Verify storage: healthy row landed, poison row did not.
await using var readContext = CreateContext();
var rows = await readContext.Set<SiteCall>()
.Where(s => s.SourceSite == siteId)
.ToListAsync();
Assert.Single(rows);
Assert.Equal(healthyId, rows[0].TrackedOperationId);
}
// ── Task 4: read-side (query / detail / KPI) handlers ──
[SkippableFact]
public async Task SiteCallQueryRequest_FilterBySourceSite_ReturnsMatchingSummaries()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
var t0 = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: t0));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: t0.AddMinutes(1), terminal: true));
actor.Tell(
new SiteCallQueryRequest(
"corr-q1", StatusFilter: null, SourceSiteFilter: siteId, ChannelFilter: null,
TargetKeyword: null, StuckOnly: false, FromUtc: null, ToUtc: null,
AfterCreatedAtUtc: null, AfterId: null, PageSize: 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.Equal("corr-q1", response.CorrelationId);
Assert.Equal(2, response.SiteCalls.Count);
Assert.All(response.SiteCalls, s => Assert.Equal(siteId, s.SourceSite));
// Newest first — ordered (CreatedAtUtc DESC).
Assert.Equal("Delivered", response.SiteCalls[0].Status);
// Cursor echoes the last (oldest) row of the page.
Assert.Equal(t0, response.NextAfterCreatedAtUtc);
Assert.Equal(response.SiteCalls[^1].TrackedOperationId, response.NextAfterId);
}
[SkippableFact]
public async Task SiteCallQueryRequest_FilterBySourceNode_ReturnsMatchingSummaries()
{
// Task 17: the new Node filter input pushes a string into
// SiteCallQueryRequest.SourceNodeFilter — the actor must thread it
// onto SiteCallQueryFilter.SourceNode and the response summaries must
// mirror the row's SourceNode column.
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
var t0 = new DateTime(2026, 5, 20, 14, 0, 0, DateTimeKind.Utc);
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Attempted",
createdAtUtc: t0, sourceNode: "site-plant-a-node-a"));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: t0.AddMinutes(1), terminal: true, sourceNode: "site-plant-a-node-b"));
actor.Tell(
new SiteCallQueryRequest(
"corr-node", StatusFilter: null, SourceSiteFilter: siteId, ChannelFilter: null,
TargetKeyword: null, StuckOnly: false, FromUtc: null, ToUtc: null,
AfterCreatedAtUtc: null, AfterId: null, PageSize: 50,
SourceNodeFilter: "site-plant-a-node-a"),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.Single(response.SiteCalls);
Assert.Equal("site-plant-a-node-a", response.SiteCalls[0].SourceNode);
Assert.Equal("Attempted", response.SiteCalls[0].Status);
}
[SkippableFact]
public async Task SiteCallQueryRequest_KeysetPaging_AdvancesViaCursor()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc);
for (var i = 0; i < 3; i++)
{
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, createdAtUtc: t0.AddMinutes(i)));
}
actor.Tell(
new SiteCallQueryRequest(
"corr-q2", null, siteId, null, null, false, null, null, null, null, PageSize: 2),
TestActor);
var page1 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.Equal(2, page1.SiteCalls.Count);
actor.Tell(
new SiteCallQueryRequest(
"corr-q3", null, siteId, null, null, false, null, null,
page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2),
TestActor);
var page2 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.Single(page2.SiteCalls);
// No overlap across the two pages.
var allIds = page1.SiteCalls.Concat(page2.SiteCalls)
.Select(s => s.TrackedOperationId).ToHashSet();
Assert.Equal(3, allIds.Count);
}
[SkippableFact]
public async Task SiteCallQueryRequest_StuckOnly_ReturnsOnlyOldNonTerminalRows()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// 10-minute stuck threshold (the production default).
var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) });
var now = DateTime.UtcNow;
// Stuck: non-terminal (Attempted, TerminalAtUtc null), created 30 min ago.
var stuckId = TrackedOperationId.New();
await repo.UpsertAsync(NewRow(stuckId, siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Not stuck: non-terminal but recent.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
// Not stuck: old but terminal (Delivered, TerminalAtUtc set).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: now.AddMinutes(-40), terminal: true));
actor.Tell(
new SiteCallQueryRequest(
"corr-stuck", null, siteId, null, null, StuckOnly: true,
null, null, null, null, PageSize: 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.Single(response.SiteCalls);
Assert.Equal(stuckId.Value, response.SiteCalls[0].TrackedOperationId);
Assert.True(response.SiteCalls[0].IsStuck);
}
[SkippableFact]
public async Task SiteCallQueryRequest_StuckOnly_PagesAreFull_NoEmptyPagesWithCursor()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) });
var now = DateTime.UtcNow;
// Three stuck rows interleaved (by CreatedAtUtc) with three non-stuck
// rows: recent non-terminal and old-but-terminal. With the StuckOnly
// filter pushed into SQL, a page-size-2 query must return exactly the
// stuck rows two-per-page — never an under-filled page with a non-null
// next cursor caused by post-filtering.
for (var i = 0; i < 3; i++)
{
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Attempted",
createdAtUtc: now.AddMinutes(-30 - i)));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Attempted",
createdAtUtc: now.AddMinutes(-2 - i)));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: now.AddMinutes(-40 - i), terminal: true));
}
actor.Tell(
new SiteCallQueryRequest(
"corr-stuck-p1", null, siteId, null, null, StuckOnly: true,
null, null, null, null, PageSize: 2),
TestActor);
var page1 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(page1.Success);
// Page is full — two stuck rows, both honestly stuck.
Assert.Equal(2, page1.SiteCalls.Count);
Assert.All(page1.SiteCalls, s => Assert.True(s.IsStuck));
Assert.NotNull(page1.NextAfterCreatedAtUtc);
actor.Tell(
new SiteCallQueryRequest(
"corr-stuck-p2", null, siteId, null, null, StuckOnly: true,
null, null, page1.NextAfterCreatedAtUtc, page1.NextAfterId,
PageSize: 2),
TestActor);
var page2 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(page2.Success);
// Final page — the third stuck row, the only remaining match.
Assert.Single(page2.SiteCalls);
Assert.All(page2.SiteCalls, s => Assert.True(s.IsStuck));
// No overlap, exactly the three stuck rows across both pages.
var allIds = page1.SiteCalls.Concat(page2.SiteCalls)
.Select(s => s.TrackedOperationId).ToHashSet();
Assert.Equal(3, allIds.Count);
}
[SkippableFact]
public async Task SiteCallDetailRequest_KnownId_ReturnsFullDetail()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
await repo.UpsertAsync(NewRow(
id, siteId, status: "Attempted", retryCount: 2, lastError: "503", sourceNode: "node-a"));
actor.Tell(new SiteCallDetailRequest("corr-d1", id.Value), TestActor);
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.NotNull(response.Detail);
Assert.Equal(id.Value, response.Detail!.TrackedOperationId);
Assert.Equal("Attempted", response.Detail.Status);
Assert.Equal(2, response.Detail.RetryCount);
Assert.Equal("503", response.Detail.LastError);
Assert.Equal(siteId, response.Detail.SourceSite);
// SourceNode flows through ToDetail so the report detail modal binds
// uniformly to the detail record (was previously read off the summary).
Assert.Equal("node-a", response.Detail.SourceNode);
}
[SkippableFact]
public async Task SiteCallDetailRequest_UnknownId_RepliesNotFound()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new SiteCallDetailRequest("corr-d2", Guid.NewGuid()), TestActor);
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
Assert.False(response.Success);
Assert.Null(response.Detail);
Assert.NotNull(response.ErrorMessage);
}
[SkippableFact]
public async Task SiteCallKpiRequest_ComputesPointInTimeCounts()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo, new SiteCallAuditOptions
{
StuckAgeThreshold = TimeSpan.FromMinutes(10),
KpiInterval = TimeSpan.FromHours(1),
});
var now = DateTime.UtcNow;
// Buffered (non-terminal Attempted) + stuck (created 30 min ago).
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Buffered (non-terminal Attempted), not stuck.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
// Parked (terminal).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Parked",
createdAtUtc: now.AddMinutes(-5), terminal: true));
// Delivered within the interval.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: now.AddMinutes(-3), updatedAtUtc: now.AddMinutes(-1), terminal: true));
actor.Tell(new SiteCallKpiRequest("corr-kpi"), TestActor);
var response = ExpectMsg<SiteCallKpiResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
// Per-site rows are isolated by the unique siteId — but KPIs are global,
// so assert the floor (>=) rather than exact counts: other tests' rows
// may share the table.
Assert.True(response.BufferedCount >= 2);
Assert.True(response.ParkedCount >= 1);
Assert.True(response.DeliveredLastInterval >= 1);
Assert.True(response.StuckCount >= 1);
Assert.NotNull(response.OldestPendingAge);
}
[SkippableFact]
public async Task PerSiteSiteCallKpiRequest_ScopesCountsToEachSite()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo, new SiteCallAuditOptions
{
StuckAgeThreshold = TimeSpan.FromMinutes(10),
KpiInterval = TimeSpan.FromHours(1),
});
var now = DateTime.UtcNow;
// Non-terminal Attempted, created 30 min ago — buffered + stuck.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Terminal Parked.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Parked",
createdAtUtc: now.AddMinutes(-5), terminal: true));
actor.Tell(new PerSiteSiteCallKpiRequest("corr-psk"), TestActor);
var response = ExpectMsg<PerSiteSiteCallKpiResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
var mySite = Assert.Single(response.Sites, s => s.SourceSite == siteId);
Assert.Equal(1, mySite.BufferedCount);
Assert.Equal(1, mySite.ParkedCount);
Assert.Equal(1, mySite.StuckCount);
Assert.NotNull(mySite.OldestPendingAge);
}
[SkippableFact]
public async Task SiteCallQueryRequest_RepoThrows_RepliesFailure_ActorStaysAlive()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var realRepo = new SiteCallAuditRepository(context);
var actor = CreateActor(new QueryThrowingRepository(realRepo));
actor.Tell(
new SiteCallQueryRequest(
"corr-fault", null, siteId, null, null, false, null, null, null, null, 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.False(response.Success);
Assert.Empty(response.SiteCalls);
Assert.NotNull(response.ErrorMessage);
Assert.Equal("corr-fault", response.CorrelationId);
}
// ── SourceNode-stamping (Task 14): end-to-end actor → repo persistence ──
[SkippableFact]
public async Task UpsertSiteCallCommand_PersistsSourceNode_EndToEnd()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): an UpsertSiteCallCommand carrying
// SourceNode "node-a" must land in the SiteCalls row's SourceNode
// column unchanged — verifies the actor's mapping path does not
// strip the column AND the repository INSERT writes it.
var siteId = NewSiteId();
var id = TrackedOperationId.New();
var row = NewRow(id, siteId, status: "Submitted", sourceNode: "node-a");
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(reply.Accepted);
await using var readContext = CreateContext();
var stored = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(stored);
Assert.Equal("node-a", stored[0].SourceNode);
}
[SkippableFact]
public async Task UpsertSiteCallCommand_NullSourceNode_PersistsNull()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Mirror of the above for unstamped packets — a command with null
// SourceNode persists NULL on the row rather than falling back to a
// placeholder. The first audit packet from a legacy host (or a node
// without INodeIdentityProvider wired) must NOT inject a fabricated
// value central-side.
var siteId = NewSiteId();
var id = TrackedOperationId.New();
var row = NewRow(id, siteId, status: "Submitted", sourceNode: null);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(reply.Accepted);
await using var readContext = CreateContext();
var stored = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(stored);
Assert.Null(stored[0].SourceNode);
}
/// <summary>
/// Test double whose <see cref="ISiteCallAuditRepository.QueryAsync"/> always
/// throws — used to verify the query handler's failure projection produces a
/// <c>Success=false</c> response without crashing the actor.
/// </summary>
private sealed class QueryThrowingRepository : ISiteCallAuditRepository
{
private readonly ISiteCallAuditRepository _inner;
public QueryThrowingRepository(ISiteCallAuditRepository inner)
{
_inner = inner;
}
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) =>
_inner.UpsertAsync(siteCall, ct);
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
_inner.GetAsync(id, ct);
public Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
throw new InvalidOperationException("simulated query failure");
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
/// <summary>
/// Tiny test double that delegates to a real repository but throws on a
/// specified <see cref="TrackedOperationId"/>. Used to verify the actor's
/// fault-isolation behaviour: a transient repository failure must produce
/// <c>Accepted=false</c> without crashing the singleton.
/// </summary>
private sealed class ThrowingRepository : ISiteCallAuditRepository
{
private readonly ISiteCallAuditRepository _inner;
private readonly TrackedOperationId _poisonId;
public ThrowingRepository(ISiteCallAuditRepository inner, TrackedOperationId poisonId)
{
_inner = inner;
_poisonId = poisonId;
}
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default)
{
if (siteCall.TrackedOperationId == _poisonId)
{
throw new InvalidOperationException("simulated transient repo failure for poison row");
}
return _inner.UpsertAsync(siteCall, ct);
}
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
_inner.GetAsync(id, ct);
public Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
_inner.QueryAsync(filter, paging, ct);
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
}