using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.AuditLog.Central; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.AuditLog.Tests.Central; /// /// Bundle D D2 tests for . Uses the same /// as the M1 repository tests so the actor /// exercises real /// against a partitioned MSSQL schema (the only way to verify the /// IngestedAtUtc stamp + duplicate-key idempotency end to end). /// public class AuditLogIngestActorTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public AuditLogIngestActorTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private ScadaLinkDbContext CreateContext() { var options = new DbContextOptionsBuilder() .UseSqlServer(_fixture.ConnectionString) .Options; return new ScadaLinkDbContext(options); } private static string NewSiteId() => "test-bundle-d2-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static AuditEvent NewEvent(string siteId, Guid? id = null) => new() { EventId = id ?? Guid.NewGuid(), OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), Channel = AuditChannel.ApiOutbound, Kind = AuditKind.ApiCall, Status = AuditStatus.Delivered, SourceSiteId = siteId, }; private IActorRef CreateActor(IAuditLogRepository repository) => Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( repository, NullLogger.Instance))); [SkippableFact] public async Task Receive_BatchOf5_Calls_Repo_5Times_Acks_All_5() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList(); await using var context = CreateContext(); var repo = new AuditLogRepository(context); var actor = CreateActor(repo); actor.Tell(new IngestAuditEventsCommand(events), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal(5, reply.AcceptedEventIds.Count); Assert.True(events.Select(e => e.EventId).ToHashSet().SetEquals(reply.AcceptedEventIds.ToHashSet())); // Verify rows landed in MSSQL. await using var readContext = CreateContext(); var rows = await readContext.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(5, rows.Count); } [SkippableFact] public async Task Receive_BatchWith_AlreadyExistingEvent_AcksAll_NoDoubleInsert() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var pre = NewEvent(siteId); // Pre-insert one event directly via the repo so the actor sees it // already present when it processes the batch. await using (var seedContext = CreateContext()) { var seedRepo = new AuditLogRepository(seedContext); await seedRepo.InsertIfNotExistsAsync(pre); } // Build the batch including the pre-existing event plus 2 new ones. var fresh1 = NewEvent(siteId); var fresh2 = NewEvent(siteId); var batch = new List { pre, fresh1, fresh2 }; await using var context = CreateContext(); var repo = new AuditLogRepository(context); var actor = CreateActor(repo); actor.Tell(new IngestAuditEventsCommand(batch), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(10)); // All 3 acked under idempotent first-write-wins. Assert.Equal(3, reply.AcceptedEventIds.Count); // Verify no double-insert. await using var readContext = CreateContext(); var count = await readContext.Set() .Where(e => e.SourceSiteId == siteId) .CountAsync(); Assert.Equal(3, count); } [SkippableFact] public async Task Receive_Sets_IngestedAtUtc_Before_Insert() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var events = Enumerable.Range(0, 3).Select(_ => NewEvent(siteId)).ToList(); var before = DateTime.UtcNow.AddSeconds(-1); await using var context = CreateContext(); var repo = new AuditLogRepository(context); var actor = CreateActor(repo); actor.Tell(new IngestAuditEventsCommand(events), TestActor); ExpectMsg(TimeSpan.FromSeconds(10)); var after = DateTime.UtcNow.AddSeconds(1); await using var readContext = CreateContext(); var rows = await readContext.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(3, rows.Count); Assert.All(rows, r => { Assert.NotNull(r.IngestedAtUtc); Assert.InRange(r.IngestedAtUtc!.Value, before, after); }); } [SkippableFact] public async Task Receive_RepoThrowsForOneEvent_Other4StillPersisted() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList(); var poisonId = events[2].EventId; // Wrapper repo that throws only when the poison EventId is being // inserted. The four neighbours must still land in MSSQL. await using var context = CreateContext(); var realRepo = new AuditLogRepository(context); var wrappedRepo = new ThrowingRepository(realRepo, poisonId); var actor = CreateActor(wrappedRepo); actor.Tell(new IngestAuditEventsCommand(events), TestActor); var reply = ExpectMsg(TimeSpan.FromSeconds(10)); // The actor catches the throw per-row, so 4 ids are accepted and 1 is // left out. Assert.Equal(4, reply.AcceptedEventIds.Count); Assert.DoesNotContain(poisonId, reply.AcceptedEventIds); await using var readContext = CreateContext(); var rows = await readContext.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(4, rows.Count); Assert.DoesNotContain(rows, r => r.EventId == poisonId); } /// /// Tiny test double that delegates to a real repository but throws on a /// specified EventId. Used to verify per-row failure isolation: one bad /// row must not cause the rest of the batch to be lost. /// private sealed class ThrowingRepository : IAuditLogRepository { private readonly IAuditLogRepository _inner; private readonly Guid _poisonId; public ThrowingRepository(IAuditLogRepository inner, Guid poisonId) { _inner = inner; _poisonId = poisonId; } public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) { if (evt.EventId == _poisonId) { throw new InvalidOperationException("simulated repo failure for poison row"); } return _inner.InsertIfNotExistsAsync(evt, ct); } public Task> QueryAsync( AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => _inner.QueryAsync(filter, paging, ct); public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => _inner.SwitchOutPartitionAsync(monthBoundary, ct); public Task> GetPartitionBoundariesOlderThanAsync( DateTime threshold, CancellationToken ct = default) => _inner.GetPartitionBoundariesOlderThanAsync(threshold, ct); public Task GetKpiSnapshotAsync( TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) => _inner.GetKpiSnapshotAsync(window, nowUtc, ct); } }