Files
scadalink-design/src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs
Joseph Doherty 660fdc4e93 feat(auditlog): AuditLogPurgeActor daily partition-switch purge (#23 M6)
Central singleton (M6-T4 Bundle C) that drives the daily AuditLog partition
purge. On a configurable timer (default 24 hours) the actor:
  1. Queries IAuditLogRepository.GetPartitionBoundariesOlderThanAsync for
     monthly boundaries whose latest OccurredAtUtc is older than
     DateTime.UtcNow - AuditLogOptions.RetentionDays.
  2. For each eligible boundary calls SwitchOutPartitionAsync, which runs
     the drop-and-rebuild dance around UX_AuditLog_EventId.
  3. Publishes AuditLogPurgedEvent(boundary, rowsDeleted, durationMs) on
     the actor-system EventStream so the Bundle E central health collector
     and ops surfaces can subscribe without coupling to this actor.

Co-changes:
* SwitchOutPartitionAsync returns long (rows deleted) — sampled BEFORE the
  switch via COUNT_BIG over the per-partition  filter so the count
  reflects what the switch removed, not a post-purge scan of a table that
  no longer exists. All stub implementations updated.
* AuditLogPurgeOptions: IntervalHours (default 24), IntervalOverride for
  tests, Interval property resolving either.
* AuditLogPurgedEvent: record with MonthBoundary, RowsDeleted, DurationMs.

Behavior:
* Continue-on-error per boundary — one partition that throws does NOT
  abandon the rest of the tick.
* DI scope opened per tick (IAuditLogRepository is a SCOPED EF Core
  service); mirrors SiteAuditReconciliationActor and AuditLogIngestActor.
* SupervisorStrategy Resume keeps the singleton alive across leaked
  exceptions.
* EventStream capture BEFORE the first await — Context is unsafe after
  await in async receive handlers (same pattern as Sender-capture in
  AuditLogIngestActor.OnIngestAsync).

Tests:
* Tick_Fires_OnDailyInterval — visible timer side effect.
* Tick_OldPartitions_SwitchedOut — both seeded boundaries purged.
* Tick_NewerPartitions_Untouched — empty enumerator → no switches.
* Tick_PublishesPurgedEvent_WithRowCount — AuditLogPurgedEvent carries
  RowsDeleted and DurationMs.
* Tick_SwitchThrows_OtherPartitionsStillProcessed — continue-on-error.
* Threshold_UsesAuditLogOptionsRetentionDays — non-default 30-day window
  computed from UtcNow - RetentionDays.
* EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished — TestKit +
  MsSqlMigrationFixture: real partitioned table, Jan-2026 row purged,
  Apr-2026 row kept, AuditLogPurgedEvent observed via probe.
2026-05-20 18:36:31 -04:00

425 lines
19 KiB
C#

using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.ConfigurationDatabase.Repositories;
/// <summary>
/// EF Core implementation of <see cref="IAuditLogRepository"/>. See the interface
/// for the append-only contract; this class only adds notes on the data-access
/// strategy used by each method.
/// </summary>
public class AuditLogRepository : IAuditLogRepository
{
// SQL Server error numbers for duplicate-key violations on
// UX_AuditLog_EventId. 2601 is a unique-index violation; 2627 is a
// primary-key/unique-constraint violation. The IF NOT EXISTS … INSERT
// pattern has a check-then-act race window — two sessions can both pass
// the EXISTS check and then both attempt the INSERT — and the loser
// surfaces as one of these errors. Idempotency demands we swallow them.
private const int SqlErrorUniqueIndexViolation = 2601;
private const int SqlErrorPrimaryKeyViolation = 2627;
private readonly ScadaLinkDbContext _context;
private readonly ILogger<AuditLogRepository> _logger;
public AuditLogRepository(ScadaLinkDbContext context, ILogger<AuditLogRepository>? logger = null)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
_logger = logger ?? NullLogger<AuditLogRepository>.Instance;
}
/// <summary>
/// Issues a single <c>IF NOT EXISTS … INSERT INTO dbo.AuditLog (…) VALUES (…)</c>
/// via <see cref="Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.ExecuteSqlInterpolatedAsync"/>.
/// Bypasses the EF change tracker so the row never enters a tracked state and
/// the enum-as-string conversion is done explicitly in C# (the columns are
/// declared <c>varchar(32)</c> via <c>HasConversion&lt;string&gt;()</c> in
/// <see cref="ScadaLink.ConfigurationDatabase.Configurations.AuditLogEntityTypeConfiguration"/>).
/// </summary>
public async Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
{
if (evt is null)
{
throw new ArgumentNullException(nameof(evt));
}
// Enum columns are stored as varchar(32) (HasConversion<string>()), so do
// the conversion in C# rather than relying on parameter type inference —
// SqlClient would otherwise bind enums as int by default.
var channel = evt.Channel.ToString();
var kind = evt.Kind.ToString();
var status = evt.Status.ToString();
var forwardState = evt.ForwardState?.ToString();
// FormattableString interpolation parameterises every value (no concatenation),
// so this is safe against injection even for the string columns.
try
{
await _context.Database.ExecuteSqlInterpolatedAsync(
$@"IF NOT EXISTS (SELECT 1 FROM dbo.AuditLog WHERE EventId = {evt.EventId})
INSERT INTO dbo.AuditLog
(EventId, OccurredAtUtc, IngestedAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, Status,
HttpStatus, DurationMs, ErrorMessage, ErrorDetail, RequestSummary,
ResponseSummary, PayloadTruncated, Extra, ForwardState)
VALUES
({evt.EventId}, {evt.OccurredAtUtc}, {evt.IngestedAtUtc}, {channel}, {kind}, {evt.CorrelationId},
{evt.SourceSiteId}, {evt.SourceInstanceId}, {evt.SourceScript}, {evt.Actor}, {evt.Target}, {status},
{evt.HttpStatus}, {evt.DurationMs}, {evt.ErrorMessage}, {evt.ErrorDetail}, {evt.RequestSummary},
{evt.ResponseSummary}, {evt.PayloadTruncated}, {evt.Extra}, {forwardState});",
ct);
}
catch (SqlException ex) when (
ex.Number == SqlErrorUniqueIndexViolation
|| ex.Number == SqlErrorPrimaryKeyViolation)
{
// Two concurrent sessions both passed the IF NOT EXISTS check and
// both attempted the INSERT — the loser raises 2601/2627 against
// UX_AuditLog_EventId. First-write-wins idempotency is already the
// documented contract for this method, so the race outcome is
// semantically a no-op. Swallow at Debug; other SqlExceptions
// bubble.
_logger.LogDebug(
ex,
"InsertIfNotExistsAsync swallowed duplicate-key violation (error {SqlErrorNumber}) for EventId {EventId}; treating as no-op.",
ex.Number,
evt.EventId);
}
}
/// <summary>
/// Builds an <c>AsNoTracking</c> queryable over <see cref="AuditEvent"/>, applies
/// every non-null filter predicate, and pages by keyset on
/// <c>(OccurredAtUtc DESC, EventId DESC)</c>. The keyset clause is expressed
/// directly (<c>occurred &lt; after || (occurred == after &amp;&amp; eventId.CompareTo(afterId) &lt; 0)</c>)
/// — EF Core 10 translates <see cref="Guid.CompareTo(Guid)"/> against SQL Server's
/// <c>uniqueidentifier</c> sort order.
/// </summary>
public async Task<IReadOnlyList<AuditEvent>> QueryAsync(
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default)
{
if (filter is null)
{
throw new ArgumentNullException(nameof(filter));
}
if (paging is null)
{
throw new ArgumentNullException(nameof(paging));
}
var query = _context.Set<AuditEvent>().AsNoTracking();
if (filter.Channel is { } channel)
{
query = query.Where(e => e.Channel == channel);
}
if (filter.Kind is { } kind)
{
query = query.Where(e => e.Kind == kind);
}
if (filter.Status is { } status)
{
query = query.Where(e => e.Status == status);
}
if (!string.IsNullOrEmpty(filter.SourceSiteId))
{
var siteId = filter.SourceSiteId;
query = query.Where(e => e.SourceSiteId == siteId);
}
if (!string.IsNullOrEmpty(filter.Target))
{
var target = filter.Target;
query = query.Where(e => e.Target == target);
}
if (!string.IsNullOrEmpty(filter.Actor))
{
var actor = filter.Actor;
query = query.Where(e => e.Actor == actor);
}
if (filter.CorrelationId is { } correlationId)
{
query = query.Where(e => e.CorrelationId == correlationId);
}
if (filter.FromUtc is { } fromUtc)
{
query = query.Where(e => e.OccurredAtUtc >= fromUtc);
}
if (filter.ToUtc is { } toUtc)
{
query = query.Where(e => e.OccurredAtUtc <= toUtc);
}
// Keyset cursor on (OccurredAtUtc desc, EventId desc).
if (paging.AfterOccurredAtUtc is { } afterOccurred && paging.AfterEventId is { } afterEventId)
{
query = query.Where(e =>
e.OccurredAtUtc < afterOccurred
|| (e.OccurredAtUtc == afterOccurred && e.EventId.CompareTo(afterEventId) < 0));
}
return await query
.OrderByDescending(e => e.OccurredAtUtc)
.ThenByDescending(e => e.EventId)
.Take(paging.PageSize)
.ToListAsync(ct);
}
/// <summary>
/// M6-T4 production implementation of the drop-and-rebuild dance documented
/// on <see cref="IAuditLogRepository.SwitchOutPartitionAsync"/>.
/// </summary>
/// <remarks>
/// <para>
/// The staging table name is GUID-suffixed so concurrent purge attempts on
/// different boundaries cannot collide. The staging schema is byte-identical
/// to the live <c>AuditLog</c> table (same column types, lengths,
/// nullability, and clustered-key shape) — SQL Server's
/// <c>ALTER TABLE … SWITCH PARTITION</c> rejects any drift. Keep this CREATE
/// in sync with both the migration that ships the live table
/// (<c>20260520142214_AddAuditLogTable</c>) and
/// <c>AuditLogEntityTypeConfiguration</c>.
/// </para>
/// <para>
/// All five steps run inside an explicit transaction so the SWITCH +
/// staging-DROP are atomic from the perspective of a consumer reading via
/// snapshot isolation; the CATCH rolls back and runs an idempotent
/// "rebuild UX_AuditLog_EventId if it doesn't exist" so a partial failure
/// never leaves the live table without its idempotency-supporting unique
/// index.
/// </para>
/// </remarks>
public async Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
{
// GUID-suffixed staging name: prevents collision with any concurrent
// purge attempt and avoids polluting the AuditLog object namespace with
// a predictable identifier.
var stagingTableName = $"AuditLog_Staging_{Guid.NewGuid():N}";
// ISO 8601 in UTC — SQL Server's datetime2 literal parser accepts this
// unambiguously and the value is round-trip-safe across SET DATEFORMAT
// settings.
var monthBoundaryStr = monthBoundary.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss");
// Two-statement batch: the first SELECT samples the per-partition row
// count BEFORE the dance so we can report it back to the purge actor;
// the second batch performs the drop-and-rebuild. We use OUTPUT-style
// variables wired through @@ROWCOUNT after the SWITCH is not viable
// because SWITCH is a metadata-only operation that doesn't move rows in
// a way @@ROWCOUNT can observe.
var sampleSql = $@"
SELECT COUNT_BIG(*) FROM dbo.AuditLog
WHERE $PARTITION.pf_AuditLog_Month(OccurredAtUtc) =
$partition.pf_AuditLog_Month('{monthBoundaryStr}');";
var sql = $@"
BEGIN TRY
BEGIN TRANSACTION;
-- 1. Drop the non-aligned unique index. ALTER TABLE SWITCH refuses
-- to run while it exists.
IF EXISTS (SELECT 1 FROM sys.indexes WHERE name = 'UX_AuditLog_EventId' AND object_id = OBJECT_ID('dbo.AuditLog'))
DROP INDEX UX_AuditLog_EventId ON dbo.AuditLog;
-- 2. Staging table on [PRIMARY] (non-partitioned) with column shapes
-- byte-identical to dbo.AuditLog. Any drift here causes SWITCH to
-- reject the operation with msg 4904/4915.
CREATE TABLE dbo.[{stagingTableName}] (
EventId uniqueidentifier NOT NULL,
OccurredAtUtc datetime2(7) NOT NULL,
IngestedAtUtc datetime2(7) NULL,
Channel varchar(32) NOT NULL,
Kind varchar(32) NOT NULL,
CorrelationId uniqueidentifier NULL,
SourceSiteId varchar(64) NULL,
SourceInstanceId varchar(128) NULL,
SourceScript varchar(128) NULL,
Actor varchar(128) NULL,
Target varchar(256) NULL,
Status varchar(32) NOT NULL,
HttpStatus int NULL,
DurationMs int NULL,
ErrorMessage nvarchar(1024) NULL,
ErrorDetail nvarchar(max) NULL,
RequestSummary nvarchar(max) NULL,
ResponseSummary nvarchar(max) NULL,
PayloadTruncated bit NOT NULL,
Extra nvarchar(max) NULL,
ForwardState varchar(32) NULL,
CONSTRAINT PK_{stagingTableName} PRIMARY KEY CLUSTERED (EventId, OccurredAtUtc)
) ON [PRIMARY];
-- 3. Switch the partition out. $partition.pf_AuditLog_Month returns
-- the partition number that contains the supplied boundary value;
-- SWITCH PARTITION N moves that partition's pages to the staging
-- table (metadata-only, no row copying).
DECLARE @partitionNumber int = $partition.pf_AuditLog_Month('{monthBoundaryStr}');
DECLARE @sql nvarchar(max) = 'ALTER TABLE dbo.AuditLog SWITCH PARTITION ' + CAST(@partitionNumber AS nvarchar(10)) + ' TO dbo.[{stagingTableName}];';
EXEC sp_executesql @sql;
-- 4. Drop staging — the rows are discarded here. This is the purge.
DROP TABLE dbo.[{stagingTableName}];
-- 5. Rebuild the non-aligned unique index. Live traffic that hit the
-- table during steps 1-4 saw composite-PK uniqueness only; from
-- here on, single-column EventId uniqueness is restored.
CREATE UNIQUE NONCLUSTERED INDEX UX_AuditLog_EventId ON dbo.AuditLog (EventId) ON [PRIMARY];
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION;
-- Best-effort staging cleanup. The DROP INDEX in step 1 is now
-- rolled back (so the index is back), but the staging table from
-- step 2 may or may not survive the rollback depending on the
-- failure point. Guard the DROP so a missing staging table doesn't
-- mask the original error.
IF OBJECT_ID('dbo.[{stagingTableName}]', 'U') IS NOT NULL DROP TABLE dbo.[{stagingTableName}];
-- Idempotent index rebuild — covers the niche case where ROLLBACK
-- failed to restore UX_AuditLog_EventId (or the failure happened
-- AFTER the COMMIT, which shouldn't be possible inside this TRY
-- but is cheap insurance). Without this, a failed switch could
-- leave the live table without its idempotency-supporting index.
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name = 'UX_AuditLog_EventId' AND object_id = OBJECT_ID('dbo.AuditLog'))
CREATE UNIQUE NONCLUSTERED INDEX UX_AuditLog_EventId ON dbo.AuditLog (EventId) ON [PRIMARY];
-- Surface the original error to the caller — the purge actor logs
-- and continues with the next boundary.
THROW;
END CATCH;";
// Sample the row count before the switch. The sample is best-effort
// (no transaction wrapping the sample-then-switch pair) because the
// central singleton is the only writer to this RPC and a daily-purge
// tick doesn't compete with concurrent SwitchOut callers. A
// concurrent INSERT racing the sample under-reports by at most a
// few rows, which is acceptable for an "approximate" purged-row
// count surfaced via AuditLogPurgedEvent.
long rowsDeleted = 0;
var conn = _context.Database.GetDbConnection();
var openedHere = false;
if (conn.State != System.Data.ConnectionState.Open)
{
await conn.OpenAsync(ct).ConfigureAwait(false);
openedHere = true;
}
try
{
await using (var sampleCmd = conn.CreateCommand())
{
sampleCmd.CommandText = sampleSql;
var sampleResult = await sampleCmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
if (sampleResult is not null && sampleResult is not DBNull)
{
rowsDeleted = Convert.ToInt64(sampleResult);
}
}
}
finally
{
if (openedHere)
{
await conn.CloseAsync().ConfigureAwait(false);
}
}
await _context.Database.ExecuteSqlRawAsync(sql, ct);
return rowsDeleted;
}
/// <summary>
/// Returns the set of <c>pf_AuditLog_Month</c> boundaries whose partition's
/// <c>MAX(OccurredAtUtc)</c> is strictly older than <paramref name="threshold"/>.
/// Boundaries with empty partitions are excluded — purging an empty
/// partition is wasted I/O.
/// </summary>
/// <remarks>
/// <para>
/// The CTE pulls every boundary value defined by the partition function and
/// joins it (via <c>$PARTITION.pf_AuditLog_Month</c>) to the live AuditLog
/// to compute per-partition <c>MAX(OccurredAtUtc)</c>. The outer filter
/// keeps only those whose MAX is non-NULL (partition has rows) AND strictly
/// less than the threshold (every row is past retention).
/// </para>
/// <para>
/// Note: the query scans the live <c>OccurredAtUtc</c> column to compute
/// the MAX per partition. With <c>IX_AuditLog_OccurredAtUtc</c> on the
/// partition-aligned scheme this is a single index seek per partition; for
/// 24 partitions and a daily purge cadence the cost is negligible.
/// </para>
/// </remarks>
public async Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold,
CancellationToken ct = default)
{
var thresholdUtc = threshold.ToUniversalTime();
var thresholdStr = thresholdUtc.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
// Per-partition MAX over the live table. We materialise the boundary
// list first (24 rows) then LEFT JOIN to the MAX aggregate so empty
// partitions surface as NULL and get filtered out by the WHERE clause.
var sql = $@"
WITH Boundaries AS (
SELECT CAST(rv.value AS datetime2(7)) AS BoundaryValue,
rv.boundary_id AS BoundaryId
FROM sys.partition_range_values rv
INNER JOIN sys.partition_functions pf ON rv.function_id = pf.function_id
WHERE pf.name = 'pf_AuditLog_Month'
)
SELECT b.BoundaryValue
FROM Boundaries b
CROSS APPLY (
SELECT MAX(a.OccurredAtUtc) AS MaxOccurredAt
FROM dbo.AuditLog a
WHERE $PARTITION.pf_AuditLog_Month(a.OccurredAtUtc) = b.BoundaryId + 1
) x
WHERE x.MaxOccurredAt IS NOT NULL
AND x.MaxOccurredAt < CAST('{thresholdStr}' AS datetime2(7))
ORDER BY b.BoundaryValue;";
var conn = _context.Database.GetDbConnection();
var openedHere = false;
if (conn.State != System.Data.ConnectionState.Open)
{
await conn.OpenAsync(ct).ConfigureAwait(false);
openedHere = true;
}
var results = new List<DateTime>();
try
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = sql;
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
while (await reader.ReadAsync(ct).ConfigureAwait(false))
{
results.Add(reader.GetDateTime(0));
}
}
finally
{
if (openedHere)
{
await conn.CloseAsync().ConfigureAwait(false);
}
}
return results;
}
}