test(auditlog): partition-switch purge end-to-end (#23 M6)
This commit is contained in:
@@ -0,0 +1,354 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.Data.SqlClient;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using ScadaLink.AuditLog.Central;
|
||||||
|
using ScadaLink.AuditLog.Configuration;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.ConfigurationDatabase;
|
||||||
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||||
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Tests.Integration;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Bundle F (#23 M6-T11) end-to-end test for the daily partition-switch
|
||||||
|
/// purge: seeds three monthly partitions (Jan / Feb / Mar 2026) with direct
|
||||||
|
/// INSERTs that bypass the standard repository ingest path (so the seed
|
||||||
|
/// timestamps are explicit), drives <see cref="AuditLogPurgeActor"/> against
|
||||||
|
/// the real <see cref="AuditLogRepository"/> + per-test
|
||||||
|
/// <see cref="MsSqlMigrationFixture"/> database, and asserts:
|
||||||
|
/// <list type="number">
|
||||||
|
/// <item>The oldest partition (Jan) is removed.</item>
|
||||||
|
/// <item>Newer partitions (Feb + Mar) are untouched.</item>
|
||||||
|
/// <item>The <c>UX_AuditLog_EventId</c> unique index survives the
|
||||||
|
/// drop-and-rebuild dance.</item>
|
||||||
|
/// <item><see cref="IAuditLogRepository.InsertIfNotExistsAsync"/> remains
|
||||||
|
/// idempotent against the rebuilt index after the purge.</item>
|
||||||
|
/// </list>
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// The brief calls out that direct INSERTs bypass the writer role's INSERT-only
|
||||||
|
/// grant; the fixture connects as <c>sa</c> (see
|
||||||
|
/// <see cref="MsSqlMigrationFixture"/>'s default admin connection string), so
|
||||||
|
/// the seed step does not need the writer role at all. The drop-and-rebuild
|
||||||
|
/// dance itself runs under the same admin connection because the test owns
|
||||||
|
/// the database — the role granularity is exercised in the repository tests,
|
||||||
|
/// not here.
|
||||||
|
/// </remarks>
|
||||||
|
public class PartitionPurgeTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||||
|
{
|
||||||
|
private readonly MsSqlMigrationFixture _fixture;
|
||||||
|
|
||||||
|
public PartitionPurgeTests(MsSqlMigrationFixture fixture)
|
||||||
|
{
|
||||||
|
_fixture = fixture;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ScadaLinkDbContext CreateContext() =>
|
||||||
|
new(new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||||
|
.UseSqlServer(_fixture.ConnectionString).Options);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Direct INSERT into <c>dbo.AuditLog</c> bypassing
|
||||||
|
/// <see cref="IAuditLogRepository.InsertIfNotExistsAsync"/>. Used by the
|
||||||
|
/// seed step so the test can place rows in arbitrary partitions without
|
||||||
|
/// the repository's idempotency wrapper or ingest-stamping behaviour
|
||||||
|
/// affecting the seed payload.
|
||||||
|
/// </summary>
|
||||||
|
private async Task DirectInsertAsync(
|
||||||
|
SqlConnection conn,
|
||||||
|
Guid eventId,
|
||||||
|
DateTime occurredAtUtc,
|
||||||
|
string siteId)
|
||||||
|
{
|
||||||
|
await using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = @"
|
||||||
|
INSERT INTO dbo.AuditLog
|
||||||
|
(EventId, OccurredAtUtc, IngestedAtUtc, Channel, Kind, CorrelationId,
|
||||||
|
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, Status,
|
||||||
|
HttpStatus, DurationMs, ErrorMessage, ErrorDetail, RequestSummary,
|
||||||
|
ResponseSummary, PayloadTruncated, Extra, ForwardState)
|
||||||
|
VALUES
|
||||||
|
(@EventId, @OccurredAtUtc, @IngestedAtUtc, 'ApiOutbound', 'ApiCall', NULL,
|
||||||
|
@SourceSiteId, NULL, NULL, NULL, NULL, 'Delivered',
|
||||||
|
NULL, NULL, NULL, NULL, NULL,
|
||||||
|
NULL, 0, NULL, NULL);";
|
||||||
|
cmd.Parameters.Add("@EventId", System.Data.SqlDbType.UniqueIdentifier).Value = eventId;
|
||||||
|
// SqlDbType.DateTime2 with explicit Scale 7 matches the
|
||||||
|
// OccurredAtUtc column shape (datetime2(7)) and avoids the implicit
|
||||||
|
// narrowing that SqlClient's default DateTime → datetime applies via
|
||||||
|
// AddWithValue. Critical for partition assignment: the partition
|
||||||
|
// function key column is datetime2(7); a narrowed value would still
|
||||||
|
// land in the correct partition for first-of-month seeds, but
|
||||||
|
// explicit typing here documents the intent and matches how the
|
||||||
|
// production repository INSERT shapes its parameters.
|
||||||
|
var occurredParam = cmd.Parameters.Add("@OccurredAtUtc", System.Data.SqlDbType.DateTime2);
|
||||||
|
occurredParam.Scale = 7;
|
||||||
|
occurredParam.Value = occurredAtUtc;
|
||||||
|
var ingestedParam = cmd.Parameters.Add("@IngestedAtUtc", System.Data.SqlDbType.DateTime2);
|
||||||
|
ingestedParam.Scale = 7;
|
||||||
|
ingestedParam.Value = DateTime.UtcNow;
|
||||||
|
cmd.Parameters.Add("@SourceSiteId", System.Data.SqlDbType.VarChar, 64).Value = siteId;
|
||||||
|
await cmd.ExecuteNonQueryAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Asserts that <c>UX_AuditLog_EventId</c> exists in
|
||||||
|
/// <c>sys.indexes</c>. The drop-and-rebuild dance briefly removes the
|
||||||
|
/// index inside its transaction; this check is meant to fire AFTER the
|
||||||
|
/// actor's purge tick has committed so the rebuilt index is observable.
|
||||||
|
/// </summary>
|
||||||
|
private static async Task AssertUxIndexExistsAsync(SqlConnection conn)
|
||||||
|
{
|
||||||
|
await using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = @"
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM sys.indexes
|
||||||
|
WHERE name = 'UX_AuditLog_EventId'
|
||||||
|
AND object_id = OBJECT_ID('dbo.AuditLog');";
|
||||||
|
var raw = await cmd.ExecuteScalarAsync();
|
||||||
|
var count = Convert.ToInt32(raw);
|
||||||
|
Assert.True(count == 1, $"UX_AuditLog_EventId should be present post-purge; sys.indexes count was {count}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private IActorRef CreateActor(
|
||||||
|
IServiceProvider sp,
|
||||||
|
AuditLogPurgeOptions purgeOptions,
|
||||||
|
AuditLogOptions auditOptions)
|
||||||
|
{
|
||||||
|
return Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
|
||||||
|
sp,
|
||||||
|
Options.Create(purgeOptions),
|
||||||
|
Options.Create(auditOptions),
|
||||||
|
NullLogger<AuditLogPurgeActor>.Instance)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (DateTime Jan, DateTime Feb, DateTime Mar) SeedOccurredAt() => (
|
||||||
|
new DateTime(2026, 1, 15, 0, 0, 0, DateTimeKind.Utc),
|
||||||
|
new DateTime(2026, 2, 15, 0, 0, 0, DateTimeKind.Utc),
|
||||||
|
new DateTime(2026, 3, 15, 0, 0, 0, DateTimeKind.Utc));
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
// 1. EndToEnd_OldestPartition_PurgedViaActor_NewerKept
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_OldestPartition_PurgedViaActor_NewerKept()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
// Test date is ~2026-05-20 per environment. We want a threshold that
|
||||||
|
// sits strictly between Jan 15 (the Jan partition's MAX) and Feb 15
|
||||||
|
// (the Feb partition's MAX) so only the Jan-2026 partition is
|
||||||
|
// eligible for purge. RetentionDays = 100 gives a threshold of
|
||||||
|
// ~2026-02-09 — Jan 15 is older (purged), Feb 15 and Mar 15 are
|
||||||
|
// newer (kept). The window between Jan 15 and Feb 15 is wide enough
|
||||||
|
// (~30 days) to tolerate any plausible test-clock drift in CI.
|
||||||
|
var siteId = "purge-e2e-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||||
|
var janEventId = Guid.NewGuid();
|
||||||
|
var febEventId = Guid.NewGuid();
|
||||||
|
var marEventId = Guid.NewGuid();
|
||||||
|
var (janOccurred, febOccurred, marOccurred) = SeedOccurredAt();
|
||||||
|
|
||||||
|
await using (var seedConn = _fixture.OpenConnection())
|
||||||
|
{
|
||||||
|
await DirectInsertAsync(seedConn, janEventId, janOccurred, siteId);
|
||||||
|
await DirectInsertAsync(seedConn, febEventId, febOccurred, siteId);
|
||||||
|
await DirectInsertAsync(seedConn, marEventId, marOccurred, siteId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wire the actor with a real EF context against the fixture DB.
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddDbContext<ScadaLinkDbContext>(
|
||||||
|
opts => opts.UseSqlServer(_fixture.ConnectionString),
|
||||||
|
ServiceLifetime.Scoped);
|
||||||
|
services.AddScoped<IAuditLogRepository, AuditLogRepository>();
|
||||||
|
var sp = services.BuildServiceProvider();
|
||||||
|
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
Sys.EventStream.Subscribe(probe.Ref, typeof(AuditLogPurgedEvent));
|
||||||
|
|
||||||
|
var purgeOptions = new AuditLogPurgeOptions
|
||||||
|
{
|
||||||
|
IntervalHours = 24,
|
||||||
|
IntervalOverride = TimeSpan.FromMilliseconds(100),
|
||||||
|
};
|
||||||
|
var auditOptions = new AuditLogOptions { RetentionDays = 100 };
|
||||||
|
|
||||||
|
CreateActor(sp, purgeOptions, auditOptions);
|
||||||
|
|
||||||
|
// Wait for the actor's tick to purge the Jan-2026 partition.
|
||||||
|
// Concurrent test runs against the same fixture might also create
|
||||||
|
// eligible partitions, but each test class owns its own fixture DB
|
||||||
|
// (MsSqlMigrationFixture seeds a guid-named DB per class), so the
|
||||||
|
// Jan-2026 boundary is the only one this test can have produced.
|
||||||
|
var janBoundary = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||||
|
var matched = probe.FishForMessage<AuditLogPurgedEvent>(
|
||||||
|
isMessage: m => m.MonthBoundary == janBoundary,
|
||||||
|
max: TimeSpan.FromSeconds(30));
|
||||||
|
Assert.True(matched.RowsDeleted >= 1,
|
||||||
|
$"Expected RowsDeleted >= 1 for Jan-2026 boundary; got {matched.RowsDeleted}.");
|
||||||
|
|
||||||
|
// Allow a brief settle in case the actor is mid-tick on Feb/Mar
|
||||||
|
// (it shouldn't be, since RetentionDays = 90 means only Jan is
|
||||||
|
// eligible, but the actor MAY re-enumerate quickly while we read).
|
||||||
|
await Task.Delay(TimeSpan.FromMilliseconds(500));
|
||||||
|
|
||||||
|
await using var verify = CreateContext();
|
||||||
|
var rows = await verify.Set<AuditEvent>()
|
||||||
|
.Where(e => e.SourceSiteId == siteId)
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
// Jan removed; Feb + Mar untouched. Because the test owns the site
|
||||||
|
// id and the fixture DB, exact set membership is observable.
|
||||||
|
Assert.DoesNotContain(rows, r => r.EventId == janEventId);
|
||||||
|
Assert.Contains(rows, r => r.EventId == febEventId);
|
||||||
|
Assert.Contains(rows, r => r.EventId == marEventId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
// 2. EndToEnd_UxIndexRebuilt_AfterPurge
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_UxIndexRebuilt_AfterPurge()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
// Same shape as test 1 — purge the Jan-2026 partition and then
|
||||||
|
// assert the UX_AuditLog_EventId index is still present. The
|
||||||
|
// drop-and-rebuild dance briefly removes it inside its transaction
|
||||||
|
// (the SWITCH PARTITION step requires the non-aligned unique index
|
||||||
|
// to be absent), but step 5 rebuilds it before committing. Sanity-
|
||||||
|
// checking the post-COMMIT shape here documents the invariant in an
|
||||||
|
// assertable way.
|
||||||
|
var siteId = "purge-uxidx-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||||
|
var janEventId = Guid.NewGuid();
|
||||||
|
var (janOccurred, _, _) = SeedOccurredAt();
|
||||||
|
|
||||||
|
await using (var seedConn = _fixture.OpenConnection())
|
||||||
|
{
|
||||||
|
await DirectInsertAsync(seedConn, janEventId, janOccurred, siteId);
|
||||||
|
}
|
||||||
|
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddDbContext<ScadaLinkDbContext>(
|
||||||
|
opts => opts.UseSqlServer(_fixture.ConnectionString),
|
||||||
|
ServiceLifetime.Scoped);
|
||||||
|
services.AddScoped<IAuditLogRepository, AuditLogRepository>();
|
||||||
|
var sp = services.BuildServiceProvider();
|
||||||
|
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
Sys.EventStream.Subscribe(probe.Ref, typeof(AuditLogPurgedEvent));
|
||||||
|
|
||||||
|
CreateActor(
|
||||||
|
sp,
|
||||||
|
new AuditLogPurgeOptions
|
||||||
|
{
|
||||||
|
IntervalHours = 24,
|
||||||
|
IntervalOverride = TimeSpan.FromMilliseconds(100),
|
||||||
|
},
|
||||||
|
new AuditLogOptions { RetentionDays = 90 });
|
||||||
|
|
||||||
|
var janBoundary = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||||
|
probe.FishForMessage<AuditLogPurgedEvent>(
|
||||||
|
isMessage: m => m.MonthBoundary == janBoundary,
|
||||||
|
max: TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
// Open a fresh connection (the actor's pool is owned by EF) and
|
||||||
|
// assert the index is present post-purge.
|
||||||
|
await using var check = _fixture.OpenConnection();
|
||||||
|
await AssertUxIndexExistsAsync(check);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
// 3. EndToEnd_InsertIfNotExistsAsync_StillIdempotent_AfterPurge
|
||||||
|
// ---------------------------------------------------------------------
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_InsertIfNotExistsAsync_StillIdempotent_AfterPurge()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
// Seed + purge a Jan-2026 row, THEN exercise InsertIfNotExistsAsync
|
||||||
|
// twice for a fresh (May-2026) EventId. The second call must be a
|
||||||
|
// no-op (duplicate-key collision swallowed by the repository, per
|
||||||
|
// M2 Bundle A's race-fix) — which means the rebuilt
|
||||||
|
// UX_AuditLog_EventId unique index is functioning as intended.
|
||||||
|
var siteId = "purge-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||||
|
var janEventId = Guid.NewGuid();
|
||||||
|
var (janOccurred, _, _) = SeedOccurredAt();
|
||||||
|
|
||||||
|
await using (var seedConn = _fixture.OpenConnection())
|
||||||
|
{
|
||||||
|
await DirectInsertAsync(seedConn, janEventId, janOccurred, siteId);
|
||||||
|
}
|
||||||
|
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddDbContext<ScadaLinkDbContext>(
|
||||||
|
opts => opts.UseSqlServer(_fixture.ConnectionString),
|
||||||
|
ServiceLifetime.Scoped);
|
||||||
|
services.AddScoped<IAuditLogRepository, AuditLogRepository>();
|
||||||
|
var sp = services.BuildServiceProvider();
|
||||||
|
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
Sys.EventStream.Subscribe(probe.Ref, typeof(AuditLogPurgedEvent));
|
||||||
|
|
||||||
|
CreateActor(
|
||||||
|
sp,
|
||||||
|
new AuditLogPurgeOptions
|
||||||
|
{
|
||||||
|
IntervalHours = 24,
|
||||||
|
IntervalOverride = TimeSpan.FromMilliseconds(100),
|
||||||
|
},
|
||||||
|
new AuditLogOptions { RetentionDays = 90 });
|
||||||
|
|
||||||
|
var janBoundary = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||||
|
probe.FishForMessage<AuditLogPurgedEvent>(
|
||||||
|
isMessage: m => m.MonthBoundary == janBoundary,
|
||||||
|
max: TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
// Settle then exercise InsertIfNotExistsAsync twice for the same
|
||||||
|
// EventId. The repository's idempotency relies on
|
||||||
|
// UX_AuditLog_EventId being present so the IF NOT EXISTS … INSERT
|
||||||
|
// race window resolves to a duplicate-key violation the repo
|
||||||
|
// swallows. If the index were missing here, two rows would land
|
||||||
|
// and the second InsertIfNotExistsAsync would silently double-insert.
|
||||||
|
await Task.Delay(TimeSpan.FromMilliseconds(500));
|
||||||
|
|
||||||
|
var freshEventId = Guid.NewGuid();
|
||||||
|
var freshOccurred = new DateTime(2026, 5, 15, 12, 0, 0, DateTimeKind.Utc);
|
||||||
|
var freshSite = "purge-idem-fresh-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||||
|
var freshEvt = new AuditEvent
|
||||||
|
{
|
||||||
|
EventId = freshEventId,
|
||||||
|
OccurredAtUtc = freshOccurred,
|
||||||
|
Channel = AuditChannel.ApiOutbound,
|
||||||
|
Kind = AuditKind.ApiCall,
|
||||||
|
Status = AuditStatus.Delivered,
|
||||||
|
SourceSiteId = freshSite,
|
||||||
|
Target = "system-x/method",
|
||||||
|
};
|
||||||
|
|
||||||
|
await using (var ctx = CreateContext())
|
||||||
|
{
|
||||||
|
var repo = new AuditLogRepository(ctx);
|
||||||
|
await repo.InsertIfNotExistsAsync(freshEvt);
|
||||||
|
// Same row a second time — must be a silent no-op.
|
||||||
|
await repo.InsertIfNotExistsAsync(freshEvt);
|
||||||
|
}
|
||||||
|
|
||||||
|
await using var verify = CreateContext();
|
||||||
|
var rows = await verify.Set<AuditEvent>()
|
||||||
|
.Where(e => e.SourceSiteId == freshSite)
|
||||||
|
.ToListAsync();
|
||||||
|
Assert.Single(rows);
|
||||||
|
Assert.Equal(freshEventId, rows[0].EventId);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user