diff --git a/ScadaLink.slnx b/ScadaLink.slnx index 21b4f3a..3a70a13 100644 --- a/ScadaLink.slnx +++ b/ScadaLink.slnx @@ -12,6 +12,7 @@ + @@ -35,6 +36,7 @@ + diff --git a/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallCommand.cs b/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallCommand.cs new file mode 100644 index 0000000..ec0da24 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallCommand.cs @@ -0,0 +1,19 @@ +using ScadaLink.Commons.Entities.Audit; + +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Akka message sent to the central SiteCallAuditActor (Site Call Audit +/// #22, Audit Log #23 M3 Bundle C) carrying one row to +/// be persisted via ISiteCallAuditRepository.UpsertAsync. The repository +/// performs an insert-if-not-exists then monotonic update — duplicate gRPC +/// packets and reconciliation pulls can both feed the actor without rolling +/// state back. +/// +/// +/// Lives in ScadaLink.Commons rather than ScadaLink.SiteCallAudit +/// so the gRPC server in ScadaLink.Communication can construct it +/// without taking a project reference on the actor's host project (Bundle D +/// adds the IngestCachedTelemetry RPC that will Tell this command). +/// +public sealed record UpsertSiteCallCommand(SiteCall SiteCall); diff --git a/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallReply.cs b/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallReply.cs new file mode 100644 index 0000000..de31cec --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/UpsertSiteCallReply.cs @@ -0,0 +1,14 @@ +using ScadaLink.Commons.Types; + +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Reply from the central SiteCallAuditActor for an +/// . is true +/// when the upsert reached the repository without throwing (including the +/// monotonic-no-op case where the stored status' rank wins) and false +/// when persistence raised an exception. The actor itself stays alive in +/// either case — audit-write failures must NEVER abort the user-facing action +/// (Audit Log #23 §13). +/// +public sealed record UpsertSiteCallReply(TrackedOperationId TrackedOperationId, bool Accepted); diff --git a/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj new file mode 100644 index 0000000..d8b0e7a --- /dev/null +++ b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj @@ -0,0 +1,31 @@ + + + + net10.0 + enable + enable + true + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs b/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..6eb0d80 --- /dev/null +++ b/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace ScadaLink.SiteCallAudit; + +/// +/// Composition root for the Site Call Audit (#22) component. +/// +/// +/// +/// M3 Bundle C ships the ingest-only minimum surface (the actor itself); the +/// full DI surface — reconciliation puller, KPI projector, central→site +/// Retry/Discard relay, options + validators — is deferred to a follow-up. +/// +/// +/// The repository (ISiteCallAuditRepository) is registered by +/// ScadaLink.ConfigurationDatabase.ServiceCollectionExtensions.AddConfigurationDatabase, +/// so callers (the Host on the central node) must also call that. The actor's +/// Props are wired up in Host registration (Bundle F); this extension +/// is currently a no-op placeholder kept for symmetry with the AuditLog and +/// NotificationOutbox composition roots — adding it now means consumers can +/// reference the method without re-touching the Host project later. +/// +/// +public static class ServiceCollectionExtensions +{ + /// + /// Registers Site Call Audit (#22) services. Currently a no-op + /// placeholder — Bundle F will populate this with the actor's Props + /// factory + options bindings. The method is exposed now so the Host + /// wiring call already exists at the API boundary. + /// + public static IServiceCollection AddSiteCallAudit(this IServiceCollection services) + { + ArgumentNullException.ThrowIfNull(services); + // Actor props are constructed in Host wiring (Bundle F). This + // extension is a placeholder for future config + DI. + return services; + } +} diff --git a/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs new file mode 100644 index 0000000..7506681 --- /dev/null +++ b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs @@ -0,0 +1,140 @@ +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; + +namespace ScadaLink.SiteCallAudit; + +/// +/// Central singleton for Site Call Audit (#22). Receives +/// messages and persists each +/// row via +/// — idempotent monotonic +/// upsert. Out-of-order or duplicate updates are silent no-ops at the +/// repository layer; the actor always replies +/// with Accepted=true in that case because storage state is consistent +/// and the site is free to consider its packet acked. +/// +/// +/// +/// M3 ships the minimum surface: ingest only. Reconciliation, KPIs, and +/// central→site Retry/Discard relay are deferred (per CLAUDE.md scope +/// discipline — Site Call Audit's KPIs and the Retry/Discard relay land in a +/// follow-up). +/// +/// +/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — +/// the actor catches every exception from the repository call and replies +/// Accepted=false without rethrowing, so the central singleton stays +/// alive. The uses Resume so an +/// unexpected throw before the catch (defence in depth) does not restart the +/// actor and reset in-flight state. +/// +/// +/// Two constructors exist for the same reason as +/// AuditLogIngestActor: production wiring (Bundle F) resolves the +/// scoped EF repository from a fresh DI scope per message because the actor +/// is a long-lived cluster singleton, while tests inject a concrete +/// against a per-test MSSQL fixture +/// so the actor exercises the real monotonic upsert SQL end to end. +/// +/// +public class SiteCallAuditActor : ReceiveActor +{ + private readonly IServiceProvider? _serviceProvider; + private readonly ISiteCallAuditRepository? _injectedRepository; + private readonly ILogger _logger; + + /// + /// Test-mode constructor — injects a concrete repository instance whose + /// lifetime exceeds the test, so the actor reuses the same instance + /// across every message. Used by Bundle C's MSSQL-backed TestKit fixture. + /// + public SiteCallAuditActor( + ISiteCallAuditRepository repository, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(repository); + ArgumentNullException.ThrowIfNull(logger); + + _injectedRepository = repository; + _logger = logger; + + ReceiveAsync(OnUpsertAsync); + } + + /// + /// Production constructor — resolves + /// from a fresh DI scope per message because the repository is a scoped EF + /// Core service registered by AddConfigurationDatabase. The actor + /// itself is a long-lived cluster singleton, so it cannot hold a scope + /// across messages. + /// + public SiteCallAuditActor( + IServiceProvider serviceProvider, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(serviceProvider); + ArgumentNullException.ThrowIfNull(logger); + + _serviceProvider = serviceProvider; + _logger = logger; + + ReceiveAsync(OnUpsertAsync); + } + + /// + /// Audit-write failures are best-effort by design (CLAUDE.md §Audit): a + /// thrown exception in the upsert pipeline must not crash the actor. + /// Resume keeps the actor's state intact so the next packet is processed + /// against the same repository instance. + /// + protected override SupervisorStrategy SupervisorStrategy() + { + return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider: + Akka.Actor.SupervisorStrategy.DefaultDecider); + } + + private async Task OnUpsertAsync(UpsertSiteCallCommand cmd) + { + // Sender is captured before the first await — Akka resets Sender + // between message dispatches, so a post-await Tell would go to + // DeadLetters. + var replyTo = Sender; + var id = cmd.SiteCall.TrackedOperationId; + + // Scope-per-message mirrors AuditLogIngestActor — production EF + // repository is scoped; the injected-repository mode (tests) skips + // the scope entirely. + IServiceScope? scope = null; + ISiteCallAuditRepository repository; + if (_injectedRepository is not null) + { + repository = _injectedRepository; + } + else + { + scope = _serviceProvider!.CreateScope(); + repository = scope.ServiceProvider.GetRequiredService(); + } + + try + { + await repository.UpsertAsync(cmd.SiteCall).ConfigureAwait(false); + replyTo.Tell(new UpsertSiteCallReply(id, Accepted: true)); + } + catch (Exception ex) + { + // Per CLAUDE.md: audit-write failure NEVER aborts the user-facing + // action — log and reply Accepted=false; do NOT rethrow (the + // central singleton MUST stay alive). + _logger.LogError(ex, "SiteCallAudit upsert failed for {TrackedOperationId}", id); + replyTo.Tell(new UpsertSiteCallReply(id, Accepted: false)); + } + finally + { + scope?.Dispose(); + } + } +} diff --git a/tests/ScadaLink.SiteCallAudit.Tests/ScadaLink.SiteCallAudit.Tests.csproj b/tests/ScadaLink.SiteCallAudit.Tests/ScadaLink.SiteCallAudit.Tests.csproj new file mode 100644 index 0000000..772b7fe --- /dev/null +++ b/tests/ScadaLink.SiteCallAudit.Tests/ScadaLink.SiteCallAudit.Tests.csproj @@ -0,0 +1,51 @@ + + + + net10.0 + enable + enable + true + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs new file mode 100644 index 0000000..e9ef807 --- /dev/null +++ b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs @@ -0,0 +1,221 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.SiteCallAudit.Tests; + +/// +/// Bundle C1 (#22, #23 M3) tests for . Uses the +/// same as the Bundle B3 repository tests +/// so the actor exercises the real monotonic-upsert SQL end to end against the +/// SiteCalls schema. Each test scopes its data by minting a fresh +/// (and a per-test SourceSite suffix) +/// so tests neither collide nor require teardown. +/// +public class SiteCallAuditActorTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public SiteCallAuditActorTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static string NewSiteId() => + "test-bundle-c1-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static SiteCall NewRow( + TrackedOperationId id, + string sourceSite, + string status = "Submitted", + int retryCount = 0, + string? lastError = null, + DateTime? createdAtUtc = null, + DateTime? updatedAtUtc = null, + bool terminal = false) + { + var created = createdAtUtc ?? DateTime.UtcNow; + var updated = updatedAtUtc ?? created; + return new SiteCall + { + TrackedOperationId = id, + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = sourceSite, + Status = status, + RetryCount = retryCount, + LastError = lastError, + HttpStatus = null, + CreatedAtUtc = created, + UpdatedAtUtc = updated, + TerminalAtUtc = terminal ? updated : null, + IngestedAtUtc = DateTime.UtcNow, + }; + } + + private IActorRef CreateActor(ISiteCallAuditRepository repository) => + Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + repository, + NullLogger.Instance))); + + [SkippableFact] + public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var id = TrackedOperationId.New(); + var row = NewRow(id, siteId, status: "Submitted", retryCount: 0); + + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new UpsertSiteCallCommand(row), TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(reply.Accepted, "Actor should reply Accepted=true on a successful upsert."); + Assert.Equal(id, reply.TrackedOperationId); + + // Verify the row landed in MSSQL via a fresh context (separate from the + // actor's repository context). + await using var readContext = CreateContext(); + var rows = await readContext.Set() + .Where(s => s.SourceSite == siteId) + .ToListAsync(); + Assert.Single(rows); + Assert.Equal("Submitted", rows[0].Status); + } + + [SkippableFact] + public async Task Receive_DuplicateUpsert_OlderStatus_NoOp_StillRepliesAccepted() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // Idempotency contract: a stale/duplicate packet (lower rank than the + // stored status) is a silent no-op at the repository — the actor must + // still reply Accepted=true so the site is free to consider its + // packet acked. Storage state is consistent either way. + var siteId = NewSiteId(); + var id = TrackedOperationId.New(); + + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + // Land Attempted (rank 2) first. + actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Attempted", retryCount: 1, lastError: "first")), TestActor); + var firstReply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(firstReply.Accepted); + + // Late-arriving Submitted (rank 0) — must be no-op in storage and + // still acked by the actor. + actor.Tell(new UpsertSiteCallCommand(NewRow(id, siteId, status: "Submitted", retryCount: 0)), TestActor); + var secondReply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(secondReply.Accepted, "Stale upsert must still be acked (idempotent contract)."); + + // Storage must still show the rank-2 row, not rolled back. + await using var readContext = CreateContext(); + var stored = await readContext.Set() + .Where(s => s.TrackedOperationId == id) + .ToListAsync(); + Assert.Single(stored); + Assert.Equal("Attempted", stored[0].Status); + Assert.Equal(1, stored[0].RetryCount); + Assert.Equal("first", stored[0].LastError); + } + + [SkippableFact] + public async Task Receive_RepoThrowsTransient_RepliesAccepted_False_ActorStaysAlive() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // Per CLAUDE.md: audit-write failure NEVER aborts the user-facing + // action. The actor must catch the throw, reply Accepted=false, and + // stay alive — a follow-up message on the same actor must still be + // processed (the singleton cannot die on a transient repo error). + var siteId = NewSiteId(); + var poisonId = TrackedOperationId.New(); + var healthyId = TrackedOperationId.New(); + + await using var context = CreateContext(); + var realRepo = new SiteCallAuditRepository(context); + var wrappedRepo = new ThrowingRepository(realRepo, poisonId); + var actor = CreateActor(wrappedRepo); + + // Poison row — the wrapper throws when this id arrives. + actor.Tell(new UpsertSiteCallCommand(NewRow(poisonId, siteId, status: "Submitted")), TestActor); + var poisonReply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.False(poisonReply.Accepted, "Actor should reply Accepted=false when the repo throws."); + Assert.Equal(poisonId, poisonReply.TrackedOperationId); + + // Healthy follow-up on the SAME actor — must still be processed + // (singleton staying alive proves the actor did not crash). + actor.Tell(new UpsertSiteCallCommand(NewRow(healthyId, siteId, status: "Submitted")), TestActor); + var healthyReply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(healthyReply.Accepted, "Actor must stay alive after a transient repo failure."); + Assert.Equal(healthyId, healthyReply.TrackedOperationId); + + // Verify storage: healthy row landed, poison row did not. + await using var readContext = CreateContext(); + var rows = await readContext.Set() + .Where(s => s.SourceSite == siteId) + .ToListAsync(); + Assert.Single(rows); + Assert.Equal(healthyId, rows[0].TrackedOperationId); + } + + /// + /// Tiny test double that delegates to a real repository but throws on a + /// specified . Used to verify the actor's + /// fault-isolation behaviour: a transient repository failure must produce + /// Accepted=false without crashing the singleton. + /// + private sealed class ThrowingRepository : ISiteCallAuditRepository + { + private readonly ISiteCallAuditRepository _inner; + private readonly TrackedOperationId _poisonId; + + public ThrowingRepository(ISiteCallAuditRepository inner, TrackedOperationId poisonId) + { + _inner = inner; + _poisonId = poisonId; + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) + { + if (siteCall.TrackedOperationId == _poisonId) + { + throw new InvalidOperationException("simulated transient repo failure for poison row"); + } + return _inner.UpsertAsync(siteCall, ct); + } + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + _inner.GetAsync(id, ct); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + _inner.QueryAsync(filter, paging, ct); + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + _inner.PurgeTerminalAsync(olderThanUtc, ct); + } +}