Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs
Joseph Doherty 660fdc4e93 feat(auditlog): AuditLogPurgeActor daily partition-switch purge (#23 M6)
Central singleton (M6-T4 Bundle C) that drives the daily AuditLog partition
purge. On a configurable timer (default 24 hours) the actor:
  1. Queries IAuditLogRepository.GetPartitionBoundariesOlderThanAsync for
     monthly boundaries whose latest OccurredAtUtc is older than
     DateTime.UtcNow - AuditLogOptions.RetentionDays.
  2. For each eligible boundary calls SwitchOutPartitionAsync, which runs
     the drop-and-rebuild dance around UX_AuditLog_EventId.
  3. Publishes AuditLogPurgedEvent(boundary, rowsDeleted, durationMs) on
     the actor-system EventStream so the Bundle E central health collector
     and ops surfaces can subscribe without coupling to this actor.

Co-changes:
* SwitchOutPartitionAsync returns long (rows deleted) — sampled BEFORE the
  switch via COUNT_BIG over the per-partition  filter so the count
  reflects what the switch removed, not a post-purge scan of a table that
  no longer exists. All stub implementations updated.
* AuditLogPurgeOptions: IntervalHours (default 24), IntervalOverride for
  tests, Interval property resolving either.
* AuditLogPurgedEvent: record with MonthBoundary, RowsDeleted, DurationMs.

Behavior:
* Continue-on-error per boundary — one partition that throws does NOT
  abandon the rest of the tick.
* DI scope opened per tick (IAuditLogRepository is a SCOPED EF Core
  service); mirrors SiteAuditReconciliationActor and AuditLogIngestActor.
* SupervisorStrategy Resume keeps the singleton alive across leaked
  exceptions.
* EventStream capture BEFORE the first await — Context is unsafe after
  await in async receive handlers (same pattern as Sender-capture in
  AuditLogIngestActor.OnIngestAsync).

Tests:
* Tick_Fires_OnDailyInterval — visible timer side effect.
* Tick_OldPartitions_SwitchedOut — both seeded boundaries purged.
* Tick_NewerPartitions_Untouched — empty enumerator → no switches.
* Tick_PublishesPurgedEvent_WithRowCount — AuditLogPurgedEvent carries
  RowsDeleted and DurationMs.
* Tick_SwitchThrows_OtherPartitionsStillProcessed — continue-on-error.
* Threshold_UsesAuditLogOptionsRetentionDays — non-default 30-day window
  computed from UtcNow - RetentionDays.
* EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished — TestKit +
  MsSqlMigrationFixture: real partitioned table, Jan-2026 row purged,
  Apr-2026 row kept, AuditLogPurgedEvent observed via probe.
2026-05-20 18:36:31 -04:00

225 lines
8.4 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.AuditLog.Central;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
namespace ScadaLink.AuditLog.Tests.Central;
/// <summary>
/// Bundle D D2 tests for <see cref="AuditLogIngestActor"/>. Uses the same
/// <see cref="MsSqlMigrationFixture"/> as the M1 repository tests so the actor
/// exercises real <see cref="AuditLogRepository.InsertIfNotExistsAsync"/>
/// against a partitioned MSSQL schema (the only way to verify the
/// IngestedAtUtc stamp + duplicate-key idempotency end to end).
/// </summary>
public class AuditLogIngestActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public AuditLogIngestActorTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private static string NewSiteId() =>
"test-bundle-d2-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private static AuditEvent NewEvent(string siteId, Guid? id = null) => new()
{
EventId = id ?? Guid.NewGuid(),
OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
SourceSiteId = siteId,
};
private IActorRef CreateActor(IAuditLogRepository repository) =>
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
repository,
NullLogger<AuditLogIngestActor>.Instance)));
[SkippableFact]
public async Task Receive_BatchOf5_Calls_Repo_5Times_Acks_All_5()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList();
await using var context = CreateContext();
var repo = new AuditLogRepository(context);
var actor = CreateActor(repo);
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
Assert.Equal(5, reply.AcceptedEventIds.Count);
Assert.True(events.Select(e => e.EventId).ToHashSet().SetEquals(reply.AcceptedEventIds.ToHashSet()));
// Verify rows landed in MSSQL.
await using var readContext = CreateContext();
var rows = await readContext.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
Assert.Equal(5, rows.Count);
}
[SkippableFact]
public async Task Receive_BatchWith_AlreadyExistingEvent_AcksAll_NoDoubleInsert()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var pre = NewEvent(siteId);
// Pre-insert one event directly via the repo so the actor sees it
// already present when it processes the batch.
await using (var seedContext = CreateContext())
{
var seedRepo = new AuditLogRepository(seedContext);
await seedRepo.InsertIfNotExistsAsync(pre);
}
// Build the batch including the pre-existing event plus 2 new ones.
var fresh1 = NewEvent(siteId);
var fresh2 = NewEvent(siteId);
var batch = new List<AuditEvent> { pre, fresh1, fresh2 };
await using var context = CreateContext();
var repo = new AuditLogRepository(context);
var actor = CreateActor(repo);
actor.Tell(new IngestAuditEventsCommand(batch), TestActor);
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
// All 3 acked under idempotent first-write-wins.
Assert.Equal(3, reply.AcceptedEventIds.Count);
// Verify no double-insert.
await using var readContext = CreateContext();
var count = await readContext.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.CountAsync();
Assert.Equal(3, count);
}
[SkippableFact]
public async Task Receive_Sets_IngestedAtUtc_Before_Insert()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var events = Enumerable.Range(0, 3).Select(_ => NewEvent(siteId)).ToList();
var before = DateTime.UtcNow.AddSeconds(-1);
await using var context = CreateContext();
var repo = new AuditLogRepository(context);
var actor = CreateActor(repo);
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
var after = DateTime.UtcNow.AddSeconds(1);
await using var readContext = CreateContext();
var rows = await readContext.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
Assert.Equal(3, rows.Count);
Assert.All(rows, r =>
{
Assert.NotNull(r.IngestedAtUtc);
Assert.InRange(r.IngestedAtUtc!.Value, before, after);
});
}
[SkippableFact]
public async Task Receive_RepoThrowsForOneEvent_Other4StillPersisted()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList();
var poisonId = events[2].EventId;
// Wrapper repo that throws only when the poison EventId is being
// inserted. The four neighbours must still land in MSSQL.
await using var context = CreateContext();
var realRepo = new AuditLogRepository(context);
var wrappedRepo = new ThrowingRepository(realRepo, poisonId);
var actor = CreateActor(wrappedRepo);
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
// The actor catches the throw per-row, so 4 ids are accepted and 1 is
// left out.
Assert.Equal(4, reply.AcceptedEventIds.Count);
Assert.DoesNotContain(poisonId, reply.AcceptedEventIds);
await using var readContext = CreateContext();
var rows = await readContext.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
Assert.Equal(4, rows.Count);
Assert.DoesNotContain(rows, r => r.EventId == poisonId);
}
/// <summary>
/// Tiny test double that delegates to a real repository but throws on a
/// specified EventId. Used to verify per-row failure isolation: one bad
/// row must not cause the rest of the batch to be lost.
/// </summary>
private sealed class ThrowingRepository : IAuditLogRepository
{
private readonly IAuditLogRepository _inner;
private readonly Guid _poisonId;
public ThrowingRepository(IAuditLogRepository inner, Guid poisonId)
{
_inner = inner;
_poisonId = poisonId;
}
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
{
if (evt.EventId == _poisonId)
{
throw new InvalidOperationException("simulated repo failure for poison row");
}
return _inner.InsertIfNotExistsAsync(evt, ct);
}
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
_inner.QueryAsync(filter, paging, ct);
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
_inner.SwitchOutPartitionAsync(monthBoundary, ct);
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>
_inner.GetPartitionBoundariesOlderThanAsync(threshold, ct);
}
}