using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.AuditLog.Central; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.AuditLog.Tests.Central; /// /// Bundle D D2 tests for 's M3 combined- /// telemetry dual-write transaction. Uses the same /// as the M1 + M2 repository tests so the actor exercises real /// + /// against a per-test MSSQL /// database. The transaction commits or rolls back inside one /// . /// public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public AuditLogIngestActorCombinedTelemetryTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private ScadaLinkDbContext CreateReadContext() { var options = new DbContextOptionsBuilder() .UseSqlServer(_fixture.ConnectionString) .Options; return new ScadaLinkDbContext(options); } private static string NewSiteId() => "test-bundle-d2-cached-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static (AuditEvent audit, SiteCall siteCall) NewEntry( string siteId, TrackedOperationId? trackedOperationId = null, Guid? eventId = null, string status = "Submitted", AuditStatus auditStatus = AuditStatus.Submitted) { var trackedId = trackedOperationId ?? TrackedOperationId.New(); var now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); var audit = new AuditEvent { EventId = eventId ?? Guid.NewGuid(), OccurredAtUtc = now, Channel = AuditChannel.ApiOutbound, Kind = AuditKind.CachedSubmit, Status = auditStatus, SourceSiteId = siteId, CorrelationId = trackedId.Value, }; var siteCall = new SiteCall { TrackedOperationId = trackedId, Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = siteId, Status = status, RetryCount = 0, CreatedAtUtc = now, UpdatedAtUtc = now, IngestedAtUtc = now, // overwritten by the actor }; return (audit, siteCall); } /// /// Builds a minimal DI container around the per-test MSSQL fixture's /// connection string — DbContext + the two repositories the dual-write /// handler resolves. Mirrors AddConfigurationDatabase without the /// DataProtection wiring (we never write secret columns in these tests). /// private IServiceProvider BuildServiceProvider( Func? siteCallRepoFactory = null) { var services = new ServiceCollection(); services.AddDbContext(opts => opts.UseSqlServer(_fixture.ConnectionString) .ConfigureWarnings(w => w.Ignore( Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))); services.AddScoped(sp => new AuditLogRepository(sp.GetRequiredService())); if (siteCallRepoFactory is null) { services.AddScoped(sp => new SiteCallAuditRepository(sp.GetRequiredService())); } else { services.AddScoped(sp => siteCallRepoFactory(sp.GetRequiredService())); } return services.BuildServiceProvider(); } private IActorRef CreateActor(IServiceProvider serviceProvider) => Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( serviceProvider, NullLogger.Instance))); [SkippableFact] public async Task Receive_OneCachedPacket_WritesAuditRow_AND_SiteCallRow_AcksId() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var (audit, siteCall) = NewEntry(siteId); var sp = BuildServiceProvider(); var actor = CreateActor(sp); actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); Assert.Single(reply.AcceptedEventIds); Assert.Equal(audit.EventId, reply.AcceptedEventIds[0]); // Verify rows landed in both tables. await using var read = CreateReadContext(); var auditRow = await read.Set().SingleOrDefaultAsync(e => e.EventId == audit.EventId); Assert.NotNull(auditRow); Assert.NotNull(auditRow!.IngestedAtUtc); var siteCallRow = await read.Set() .SingleOrDefaultAsync(s => s.TrackedOperationId == siteCall.TrackedOperationId); Assert.NotNull(siteCallRow); Assert.Equal(siteCall.Status, siteCallRow!.Status); } [SkippableFact] public async Task Receive_DuplicateEventId_SameStatus_NoOp_RowCountUnchanged_AcksId() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var eventId = Guid.NewGuid(); var (audit, siteCall) = NewEntry(siteId, trackedId, eventId); var sp = BuildServiceProvider(); var actor = CreateActor(sp); // First write actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), TestActor); ExpectMsg(TimeSpan.FromSeconds(15)); // Second write — same EventId and TrackedOperationId, same status. Both // the audit insert (idempotent) and the SiteCalls upsert (monotonic // same-rank → no-op) should silently do nothing while still acking. actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); Assert.Single(reply.AcceptedEventIds); Assert.Equal(eventId, reply.AcceptedEventIds[0]); await using var read = CreateReadContext(); var auditCount = await read.Set().CountAsync(e => e.EventId == eventId); Assert.Equal(1, auditCount); var siteCallCount = await read.Set() .CountAsync(s => s.TrackedOperationId == trackedId); Assert.Equal(1, siteCallCount); } [SkippableFact] public async Task Receive_DuplicateEventId_AdvancedSiteCallStatus_UpdatesSiteCall() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var sp = BuildServiceProvider(); var actor = CreateActor(sp); // 1st packet — Submitted (audit EventId #1, SiteCalls Status=Submitted). var (auditSubmit, siteCallSubmit) = NewEntry( siteId, trackedId, status: "Submitted", auditStatus: AuditStatus.Submitted); actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(auditSubmit, siteCallSubmit) }), TestActor); ExpectMsg(TimeSpan.FromSeconds(15)); // 2nd packet — Attempted with retry count 1 (audit EventId #2, // SiteCalls Status=Attempted — monotonic upsert wins). Same // TrackedOperationId throughout. var (auditAttempt, siteCallAttempt) = NewEntry( siteId, trackedId, status: "Attempted", auditStatus: AuditStatus.Attempted); var advanced = siteCallAttempt with { RetryCount = 1, UpdatedAtUtc = siteCallAttempt.UpdatedAtUtc.AddMinutes(1) }; actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(auditAttempt, advanced) }), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); Assert.Single(reply.AcceptedEventIds); Assert.Equal(auditAttempt.EventId, reply.AcceptedEventIds[0]); // Both audit rows exist. await using var read = CreateReadContext(); var auditRows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(2, auditRows.Count); // SiteCalls row advanced to Attempted with retry count 1. var siteCallRow = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("Attempted", siteCallRow.Status); Assert.Equal(1, siteCallRow.RetryCount); } [SkippableFact] public async Task Receive_AuditInsertSucceeds_SiteCallThrows_BothRolledBack_NoOrphanRow() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var (audit, siteCall) = NewEntry(siteId); // Wrap the SiteCalls repo so UpsertAsync always throws — the dual-write // transaction must roll back the AuditLog INSERT done in the same // transaction, leaving no orphan row. var sp = BuildServiceProvider( ctx => new ThrowingSiteCallRepo(new SiteCallAuditRepository(ctx))); var actor = CreateActor(sp); actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); Assert.Empty(reply.AcceptedEventIds); await using var read = CreateReadContext(); var auditRow = await read.Set().SingleOrDefaultAsync(e => e.EventId == audit.EventId); Assert.Null(auditRow); var siteCallRow = await read.Set() .SingleOrDefaultAsync(s => s.TrackedOperationId == siteCall.TrackedOperationId); Assert.Null(siteCallRow); } [SkippableFact] public async Task Receive_FiveCachedPackets_AllPersistedSeparately_AllAcked() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var entries = Enumerable.Range(0, 5).Select(_ => { var (audit, siteCall) = NewEntry(siteId); return new CachedTelemetryEntry(audit, siteCall); }).ToList(); var sp = BuildServiceProvider(); var actor = CreateActor(sp); actor.Tell(new IngestCachedTelemetryCommand(entries), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); Assert.Equal(5, reply.AcceptedEventIds.Count); Assert.True(entries.Select(e => e.Audit.EventId).ToHashSet() .SetEquals(reply.AcceptedEventIds.ToHashSet())); await using var read = CreateReadContext(); var auditCount = await read.Set().CountAsync(e => e.SourceSiteId == siteId); Assert.Equal(5, auditCount); var siteCallCount = await read.Set().CountAsync(s => s.SourceSite == siteId); Assert.Equal(5, siteCallCount); } [SkippableFact] public async Task Receive_OnePacketSucceeds_NextPacketThrows_FirstStillCommitted_BatchContinues() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var (audit1, siteCall1) = NewEntry(siteId); var (audit2, siteCall2) = NewEntry(siteId); var (audit3, siteCall3) = NewEntry(siteId); var poisonTrackedId = siteCall2.TrackedOperationId; // Throw only for the middle entry's TrackedOperationId — entries on // either side must commit their own transactions independently. var sp = BuildServiceProvider( ctx => new PoisonOnIdSiteCallRepo(new SiteCallAuditRepository(ctx), poisonTrackedId)); var actor = CreateActor(sp); actor.Tell( new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit1, siteCall1), new CachedTelemetryEntry(audit2, siteCall2), new CachedTelemetryEntry(audit3, siteCall3), }), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(15)); // Two entries committed; poison entry rolled back. Assert.Equal(2, reply.AcceptedEventIds.Count); Assert.Contains(audit1.EventId, reply.AcceptedEventIds); Assert.Contains(audit3.EventId, reply.AcceptedEventIds); Assert.DoesNotContain(audit2.EventId, reply.AcceptedEventIds); await using var read = CreateReadContext(); var auditRows = await read.Set().Where(e => e.SourceSiteId == siteId).ToListAsync(); Assert.Equal(2, auditRows.Count); Assert.DoesNotContain(auditRows, r => r.EventId == audit2.EventId); var siteCallRows = await read.Set().Where(s => s.SourceSite == siteId).ToListAsync(); Assert.Equal(2, siteCallRows.Count); Assert.DoesNotContain(siteCallRows, r => r.TrackedOperationId == poisonTrackedId); } /// /// Test double — throws unconditionally from so /// the dual-write transaction is forced to roll back. Lets the AuditLog /// row insert succeed in-transaction; the rollback must remove it. /// private sealed class ThrowingSiteCallRepo : ISiteCallAuditRepository { private readonly ISiteCallAuditRepository _inner; public ThrowingSiteCallRepo(ISiteCallAuditRepository inner) { _inner = inner; } public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => throw new InvalidOperationException("simulated SiteCalls upsert failure"); public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => _inner.GetAsync(id, ct); public Task> QueryAsync( SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => _inner.QueryAsync(filter, paging, ct); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); public Task ComputeKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); public Task> ComputePerSiteKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } /// /// Test double — throws only when the supplied poison TrackedOperationId /// is the one being upserted. Demonstrates per-entry transaction isolation: /// one entry's failed transaction must not abort the batch's other entries. /// private sealed class PoisonOnIdSiteCallRepo : ISiteCallAuditRepository { private readonly ISiteCallAuditRepository _inner; private readonly TrackedOperationId _poisonId; public PoisonOnIdSiteCallRepo(ISiteCallAuditRepository inner, TrackedOperationId poisonId) { _inner = inner; _poisonId = poisonId; } public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) { if (siteCall.TrackedOperationId == _poisonId) { throw new InvalidOperationException("simulated SiteCalls upsert failure for poison id"); } return _inner.UpsertAsync(siteCall, ct); } public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => _inner.GetAsync(id, ct); public Task> QueryAsync( SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => _inner.QueryAsync(filter, paging, ct); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); public Task ComputeKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); public Task> ComputePerSiteKpisAsync( DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } }