809 lines
38 KiB
C#
809 lines
38 KiB
C#
using Microsoft.Data.SqlClient;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Audit;
|
|
|
|
namespace ScadaLink.ConfigurationDatabase.Repositories;
|
|
|
|
/// <summary>
|
|
/// EF Core implementation of <see cref="IAuditLogRepository"/>. See the interface
|
|
/// for the append-only contract; this class only adds notes on the data-access
|
|
/// strategy used by each method.
|
|
/// </summary>
|
|
public class AuditLogRepository : IAuditLogRepository
|
|
{
|
|
// SQL Server error numbers for duplicate-key violations on
|
|
// UX_AuditLog_EventId. 2601 is a unique-index violation; 2627 is a
|
|
// primary-key/unique-constraint violation. The IF NOT EXISTS … INSERT
|
|
// pattern has a check-then-act race window — two sessions can both pass
|
|
// the EXISTS check and then both attempt the INSERT — and the loser
|
|
// surfaces as one of these errors. Idempotency demands we swallow them.
|
|
private const int SqlErrorUniqueIndexViolation = 2601;
|
|
private const int SqlErrorPrimaryKeyViolation = 2627;
|
|
|
|
private readonly ScadaLinkDbContext _context;
|
|
private readonly ILogger<AuditLogRepository> _logger;
|
|
|
|
public AuditLogRepository(ScadaLinkDbContext context, ILogger<AuditLogRepository>? logger = null)
|
|
{
|
|
_context = context ?? throw new ArgumentNullException(nameof(context));
|
|
_logger = logger ?? NullLogger<AuditLogRepository>.Instance;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Issues a single <c>IF NOT EXISTS … INSERT INTO dbo.AuditLog (…) VALUES (…)</c>
|
|
/// via <see cref="Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.ExecuteSqlInterpolatedAsync"/>.
|
|
/// Bypasses the EF change tracker so the row never enters a tracked state and
|
|
/// the enum-as-string conversion is done explicitly in C# (the columns are
|
|
/// declared <c>varchar(32)</c> via <c>HasConversion<string>()</c> in
|
|
/// <see cref="ScadaLink.ConfigurationDatabase.Configurations.AuditLogEntityTypeConfiguration"/>).
|
|
/// </summary>
|
|
public async Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
|
|
{
|
|
if (evt is null)
|
|
{
|
|
throw new ArgumentNullException(nameof(evt));
|
|
}
|
|
|
|
// Enum columns are stored as varchar(32) (HasConversion<string>()), so do
|
|
// the conversion in C# rather than relying on parameter type inference —
|
|
// SqlClient would otherwise bind enums as int by default.
|
|
var channel = evt.Channel.ToString();
|
|
var kind = evt.Kind.ToString();
|
|
var status = evt.Status.ToString();
|
|
var forwardState = evt.ForwardState?.ToString();
|
|
|
|
// FormattableString interpolation parameterises every value (no concatenation),
|
|
// so this is safe against injection even for the string columns.
|
|
try
|
|
{
|
|
await _context.Database.ExecuteSqlInterpolatedAsync(
|
|
$@"IF NOT EXISTS (SELECT 1 FROM dbo.AuditLog WHERE EventId = {evt.EventId})
|
|
INSERT INTO dbo.AuditLog
|
|
(EventId, OccurredAtUtc, IngestedAtUtc, Channel, Kind, CorrelationId, ExecutionId, ParentExecutionId,
|
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, Status,
|
|
HttpStatus, DurationMs, ErrorMessage, ErrorDetail, RequestSummary,
|
|
ResponseSummary, PayloadTruncated, Extra, ForwardState)
|
|
VALUES
|
|
({evt.EventId}, {evt.OccurredAtUtc}, {evt.IngestedAtUtc}, {channel}, {kind}, {evt.CorrelationId}, {evt.ExecutionId}, {evt.ParentExecutionId},
|
|
{evt.SourceSiteId}, {evt.SourceNode}, {evt.SourceInstanceId}, {evt.SourceScript}, {evt.Actor}, {evt.Target}, {status},
|
|
{evt.HttpStatus}, {evt.DurationMs}, {evt.ErrorMessage}, {evt.ErrorDetail}, {evt.RequestSummary},
|
|
{evt.ResponseSummary}, {evt.PayloadTruncated}, {evt.Extra}, {forwardState});",
|
|
ct);
|
|
}
|
|
catch (SqlException ex) when (
|
|
ex.Number == SqlErrorUniqueIndexViolation
|
|
|| ex.Number == SqlErrorPrimaryKeyViolation)
|
|
{
|
|
// Two concurrent sessions both passed the IF NOT EXISTS check and
|
|
// both attempted the INSERT — the loser raises 2601/2627 against
|
|
// UX_AuditLog_EventId. First-write-wins idempotency is already the
|
|
// documented contract for this method, so the race outcome is
|
|
// semantically a no-op. Swallow at Debug; other SqlExceptions
|
|
// bubble.
|
|
_logger.LogDebug(
|
|
ex,
|
|
"InsertIfNotExistsAsync swallowed duplicate-key violation (error {SqlErrorNumber}) for EventId {EventId}; treating as no-op.",
|
|
ex.Number,
|
|
evt.EventId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Builds an <c>AsNoTracking</c> queryable over <see cref="AuditEvent"/>, applies
|
|
/// every non-null filter predicate, and pages by keyset on
|
|
/// <c>(OccurredAtUtc DESC, EventId DESC)</c>. The keyset clause is expressed
|
|
/// directly (<c>occurred < after || (occurred == after && eventId.CompareTo(afterId) < 0)</c>)
|
|
/// — EF Core 10 translates <see cref="Guid.CompareTo(Guid)"/> against SQL Server's
|
|
/// <c>uniqueidentifier</c> sort order.
|
|
/// </summary>
|
|
public async Task<IReadOnlyList<AuditEvent>> QueryAsync(
|
|
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default)
|
|
{
|
|
if (filter is null)
|
|
{
|
|
throw new ArgumentNullException(nameof(filter));
|
|
}
|
|
|
|
if (paging is null)
|
|
{
|
|
throw new ArgumentNullException(nameof(paging));
|
|
}
|
|
|
|
var query = _context.Set<AuditEvent>().AsNoTracking();
|
|
|
|
// Multi-value dimensions: a null OR empty list means "no constraint"
|
|
// (the { Count: > 0 } guard prevents an empty list collapsing to a
|
|
// WHERE 1=0). A non-empty list translates to a SQL IN (…) via EF Core's
|
|
// IReadOnlyList<T>.Contains support — server-side, no client-eval.
|
|
if (filter.Channels is { Count: > 0 } channels)
|
|
{
|
|
query = query.Where(e => channels.Contains(e.Channel));
|
|
}
|
|
|
|
if (filter.Kinds is { Count: > 0 } kinds)
|
|
{
|
|
query = query.Where(e => kinds.Contains(e.Kind));
|
|
}
|
|
|
|
if (filter.Statuses is { Count: > 0 } statuses)
|
|
{
|
|
query = query.Where(e => statuses.Contains(e.Status));
|
|
}
|
|
|
|
if (filter.SourceSiteIds is { Count: > 0 } sourceSiteIds)
|
|
{
|
|
query = query.Where(e => e.SourceSiteId != null && sourceSiteIds.Contains(e.SourceSiteId));
|
|
}
|
|
|
|
// SourceNode filter mirrors SourceSiteIds: a non-empty list translates to
|
|
// SQL IN (…); NULL SourceNode rows are excluded when the filter is set.
|
|
if (filter.SourceNodes is { Count: > 0 } sourceNodes)
|
|
{
|
|
query = query.Where(e => e.SourceNode != null && sourceNodes.Contains(e.SourceNode));
|
|
}
|
|
|
|
if (!string.IsNullOrEmpty(filter.Target))
|
|
{
|
|
var target = filter.Target;
|
|
query = query.Where(e => e.Target == target);
|
|
}
|
|
|
|
if (!string.IsNullOrEmpty(filter.Actor))
|
|
{
|
|
var actor = filter.Actor;
|
|
query = query.Where(e => e.Actor == actor);
|
|
}
|
|
|
|
if (filter.CorrelationId is { } correlationId)
|
|
{
|
|
query = query.Where(e => e.CorrelationId == correlationId);
|
|
}
|
|
|
|
if (filter.ExecutionId is { } executionId)
|
|
{
|
|
query = query.Where(e => e.ExecutionId == executionId);
|
|
}
|
|
|
|
if (filter.ParentExecutionId is { } parentExecutionId)
|
|
{
|
|
query = query.Where(e => e.ParentExecutionId == parentExecutionId);
|
|
}
|
|
|
|
if (filter.FromUtc is { } fromUtc)
|
|
{
|
|
query = query.Where(e => e.OccurredAtUtc >= fromUtc);
|
|
}
|
|
|
|
if (filter.ToUtc is { } toUtc)
|
|
{
|
|
query = query.Where(e => e.OccurredAtUtc <= toUtc);
|
|
}
|
|
|
|
// Keyset cursor on (OccurredAtUtc desc, EventId desc).
|
|
if (paging.AfterOccurredAtUtc is { } afterOccurred && paging.AfterEventId is { } afterEventId)
|
|
{
|
|
query = query.Where(e =>
|
|
e.OccurredAtUtc < afterOccurred
|
|
|| (e.OccurredAtUtc == afterOccurred && e.EventId.CompareTo(afterEventId) < 0));
|
|
}
|
|
|
|
return await query
|
|
.OrderByDescending(e => e.OccurredAtUtc)
|
|
.ThenByDescending(e => e.EventId)
|
|
.Take(paging.PageSize)
|
|
.ToListAsync(ct);
|
|
}
|
|
|
|
/// <summary>
|
|
/// M6-T4 production implementation of the drop-and-rebuild dance documented
|
|
/// on <see cref="IAuditLogRepository.SwitchOutPartitionAsync"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// The staging table name is GUID-suffixed so concurrent purge attempts on
|
|
/// different boundaries cannot collide. The staging schema is byte-identical
|
|
/// to the live <c>AuditLog</c> table (same column types, lengths,
|
|
/// nullability, and clustered-key shape) — SQL Server's
|
|
/// <c>ALTER TABLE … SWITCH PARTITION</c> rejects any drift. Keep this CREATE
|
|
/// in sync with both the migration that ships the live table
|
|
/// (<c>20260520142214_AddAuditLogTable</c>) and
|
|
/// <c>AuditLogEntityTypeConfiguration</c>.
|
|
/// </para>
|
|
/// <para>
|
|
/// All five steps run inside an explicit transaction so the SWITCH +
|
|
/// staging-DROP are atomic from the perspective of a consumer reading via
|
|
/// snapshot isolation; the CATCH rolls back and runs an idempotent
|
|
/// "rebuild UX_AuditLog_EventId if it doesn't exist" so a partial failure
|
|
/// never leaves the live table without its idempotency-supporting unique
|
|
/// index.
|
|
/// </para>
|
|
/// </remarks>
|
|
public async Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
|
|
{
|
|
// GUID-suffixed staging name: prevents collision with any concurrent
|
|
// purge attempt and avoids polluting the AuditLog object namespace with
|
|
// a predictable identifier.
|
|
var stagingTableName = $"AuditLog_Staging_{Guid.NewGuid():N}";
|
|
|
|
// ISO 8601 in UTC — SQL Server's datetime2 literal parser accepts this
|
|
// unambiguously and the value is round-trip-safe across SET DATEFORMAT
|
|
// settings.
|
|
var monthBoundaryStr = monthBoundary.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss");
|
|
|
|
// Two-statement batch: the first SELECT samples the per-partition row
|
|
// count BEFORE the dance so we can report it back to the purge actor;
|
|
// the second batch performs the drop-and-rebuild. We use OUTPUT-style
|
|
// variables wired through @@ROWCOUNT after the SWITCH is not viable
|
|
// because SWITCH is a metadata-only operation that doesn't move rows in
|
|
// a way @@ROWCOUNT can observe.
|
|
var sampleSql = $@"
|
|
SELECT COUNT_BIG(*) FROM dbo.AuditLog
|
|
WHERE $PARTITION.pf_AuditLog_Month(OccurredAtUtc) =
|
|
$partition.pf_AuditLog_Month('{monthBoundaryStr}');";
|
|
|
|
var sql = $@"
|
|
BEGIN TRY
|
|
BEGIN TRANSACTION;
|
|
|
|
-- 1. Drop the non-aligned unique index. ALTER TABLE SWITCH refuses
|
|
-- to run while it exists.
|
|
IF EXISTS (SELECT 1 FROM sys.indexes WHERE name = 'UX_AuditLog_EventId' AND object_id = OBJECT_ID('dbo.AuditLog'))
|
|
DROP INDEX UX_AuditLog_EventId ON dbo.AuditLog;
|
|
|
|
-- 2. Staging table on [PRIMARY] (non-partitioned) with column shapes
|
|
-- byte-identical to dbo.AuditLog. Any drift here causes SWITCH to
|
|
-- reject the operation with msg 4904/4915.
|
|
CREATE TABLE dbo.[{stagingTableName}] (
|
|
EventId uniqueidentifier NOT NULL,
|
|
OccurredAtUtc datetime2(7) NOT NULL,
|
|
IngestedAtUtc datetime2(7) NULL,
|
|
Channel varchar(32) NOT NULL,
|
|
Kind varchar(32) NOT NULL,
|
|
CorrelationId uniqueidentifier NULL,
|
|
SourceSiteId varchar(64) NULL,
|
|
SourceInstanceId varchar(128) NULL,
|
|
SourceScript varchar(128) NULL,
|
|
Actor varchar(128) NULL,
|
|
Target varchar(256) NULL,
|
|
Status varchar(32) NOT NULL,
|
|
HttpStatus int NULL,
|
|
DurationMs int NULL,
|
|
ErrorMessage nvarchar(1024) NULL,
|
|
ErrorDetail nvarchar(max) NULL,
|
|
RequestSummary nvarchar(max) NULL,
|
|
ResponseSummary nvarchar(max) NULL,
|
|
PayloadTruncated bit NOT NULL,
|
|
Extra nvarchar(max) NULL,
|
|
ForwardState varchar(32) NULL,
|
|
-- ExecutionId, ParentExecutionId, and SourceNode are last (in this
|
|
-- ordinal order) because each was added to the live AuditLog table
|
|
-- by a later ALTER TABLE ADD migration; the staging table must
|
|
-- match the live table column shape ordinal-for-ordinal or
|
|
-- ALTER TABLE ... SWITCH PARTITION fails (msg 4904/4915).
|
|
ExecutionId uniqueidentifier NULL,
|
|
ParentExecutionId uniqueidentifier NULL,
|
|
SourceNode varchar(64) NULL,
|
|
CONSTRAINT PK_{stagingTableName} PRIMARY KEY CLUSTERED (EventId, OccurredAtUtc)
|
|
) ON [PRIMARY];
|
|
|
|
-- 3. Switch the partition out. $partition.pf_AuditLog_Month returns
|
|
-- the partition number that contains the supplied boundary value;
|
|
-- SWITCH PARTITION N moves that partition's pages to the staging
|
|
-- table (metadata-only, no row copying).
|
|
DECLARE @partitionNumber int = $partition.pf_AuditLog_Month('{monthBoundaryStr}');
|
|
DECLARE @sql nvarchar(max) = 'ALTER TABLE dbo.AuditLog SWITCH PARTITION ' + CAST(@partitionNumber AS nvarchar(10)) + ' TO dbo.[{stagingTableName}];';
|
|
EXEC sp_executesql @sql;
|
|
|
|
-- 4. Drop staging — the rows are discarded here. This is the purge.
|
|
DROP TABLE dbo.[{stagingTableName}];
|
|
|
|
-- 5. Rebuild the non-aligned unique index. Live traffic that hit the
|
|
-- table during steps 1-4 saw composite-PK uniqueness only; from
|
|
-- here on, single-column EventId uniqueness is restored.
|
|
CREATE UNIQUE NONCLUSTERED INDEX UX_AuditLog_EventId ON dbo.AuditLog (EventId) ON [PRIMARY];
|
|
|
|
COMMIT TRANSACTION;
|
|
END TRY
|
|
BEGIN CATCH
|
|
IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION;
|
|
|
|
-- Best-effort staging cleanup. The DROP INDEX in step 1 is now
|
|
-- rolled back (so the index is back), but the staging table from
|
|
-- step 2 may or may not survive the rollback depending on the
|
|
-- failure point. Guard the DROP so a missing staging table doesn't
|
|
-- mask the original error.
|
|
IF OBJECT_ID('dbo.[{stagingTableName}]', 'U') IS NOT NULL DROP TABLE dbo.[{stagingTableName}];
|
|
|
|
-- Idempotent index rebuild — covers the niche case where ROLLBACK
|
|
-- failed to restore UX_AuditLog_EventId (or the failure happened
|
|
-- AFTER the COMMIT, which shouldn't be possible inside this TRY
|
|
-- but is cheap insurance). Without this, a failed switch could
|
|
-- leave the live table without its idempotency-supporting index.
|
|
IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name = 'UX_AuditLog_EventId' AND object_id = OBJECT_ID('dbo.AuditLog'))
|
|
CREATE UNIQUE NONCLUSTERED INDEX UX_AuditLog_EventId ON dbo.AuditLog (EventId) ON [PRIMARY];
|
|
|
|
-- Surface the original error to the caller — the purge actor logs
|
|
-- and continues with the next boundary.
|
|
THROW;
|
|
END CATCH;";
|
|
|
|
// Sample the row count before the switch. The sample is best-effort
|
|
// (no transaction wrapping the sample-then-switch pair) because the
|
|
// central singleton is the only writer to this RPC and a daily-purge
|
|
// tick doesn't compete with concurrent SwitchOut callers. A
|
|
// concurrent INSERT racing the sample under-reports by at most a
|
|
// few rows, which is acceptable for an "approximate" purged-row
|
|
// count surfaced via AuditLogPurgedEvent.
|
|
long rowsDeleted = 0;
|
|
var conn = _context.Database.GetDbConnection();
|
|
var openedHere = false;
|
|
if (conn.State != System.Data.ConnectionState.Open)
|
|
{
|
|
await conn.OpenAsync(ct).ConfigureAwait(false);
|
|
openedHere = true;
|
|
}
|
|
try
|
|
{
|
|
await using (var sampleCmd = conn.CreateCommand())
|
|
{
|
|
sampleCmd.CommandText = sampleSql;
|
|
var sampleResult = await sampleCmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
|
|
if (sampleResult is not null && sampleResult is not DBNull)
|
|
{
|
|
rowsDeleted = Convert.ToInt64(sampleResult);
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (openedHere)
|
|
{
|
|
await conn.CloseAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
await _context.Database.ExecuteSqlRawAsync(sql, ct);
|
|
return rowsDeleted;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the set of <c>pf_AuditLog_Month</c> boundaries whose partition's
|
|
/// <c>MAX(OccurredAtUtc)</c> is strictly older than <paramref name="threshold"/>.
|
|
/// Boundaries with empty partitions are excluded — purging an empty
|
|
/// partition is wasted I/O.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// The CTE pulls every boundary value defined by the partition function and
|
|
/// joins it (via <c>$PARTITION.pf_AuditLog_Month</c>) to the live AuditLog
|
|
/// to compute per-partition <c>MAX(OccurredAtUtc)</c>. The outer filter
|
|
/// keeps only those whose MAX is non-NULL (partition has rows) AND strictly
|
|
/// less than the threshold (every row is past retention).
|
|
/// </para>
|
|
/// <para>
|
|
/// Note: the query scans the live <c>OccurredAtUtc</c> column to compute
|
|
/// the MAX per partition. With <c>IX_AuditLog_OccurredAtUtc</c> on the
|
|
/// partition-aligned scheme this is a single index seek per partition; for
|
|
/// 24 partitions and a daily purge cadence the cost is negligible.
|
|
/// </para>
|
|
/// </remarks>
|
|
public async Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
|
|
DateTime threshold,
|
|
CancellationToken ct = default)
|
|
{
|
|
var thresholdUtc = threshold.ToUniversalTime();
|
|
var thresholdStr = thresholdUtc.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
|
|
|
// Per-partition MAX over the live table. We materialise the boundary
|
|
// list first (24 rows) then LEFT JOIN to the MAX aggregate so empty
|
|
// partitions surface as NULL and get filtered out by the WHERE clause.
|
|
var sql = $@"
|
|
WITH Boundaries AS (
|
|
SELECT CAST(rv.value AS datetime2(7)) AS BoundaryValue,
|
|
rv.boundary_id AS BoundaryId
|
|
FROM sys.partition_range_values rv
|
|
INNER JOIN sys.partition_functions pf ON rv.function_id = pf.function_id
|
|
WHERE pf.name = 'pf_AuditLog_Month'
|
|
)
|
|
SELECT b.BoundaryValue
|
|
FROM Boundaries b
|
|
CROSS APPLY (
|
|
SELECT MAX(a.OccurredAtUtc) AS MaxOccurredAt
|
|
FROM dbo.AuditLog a
|
|
WHERE $PARTITION.pf_AuditLog_Month(a.OccurredAtUtc) = b.BoundaryId + 1
|
|
) x
|
|
WHERE x.MaxOccurredAt IS NOT NULL
|
|
AND x.MaxOccurredAt < CAST('{thresholdStr}' AS datetime2(7))
|
|
ORDER BY b.BoundaryValue;";
|
|
|
|
var conn = _context.Database.GetDbConnection();
|
|
var openedHere = false;
|
|
if (conn.State != System.Data.ConnectionState.Open)
|
|
{
|
|
await conn.OpenAsync(ct).ConfigureAwait(false);
|
|
openedHere = true;
|
|
}
|
|
|
|
var results = new List<DateTime>();
|
|
try
|
|
{
|
|
await using var cmd = conn.CreateCommand();
|
|
cmd.CommandText = sql;
|
|
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
|
|
while (await reader.ReadAsync(ct).ConfigureAwait(false))
|
|
{
|
|
results.Add(reader.GetDateTime(0));
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (openedHere)
|
|
{
|
|
await conn.CloseAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
/// <summary>
|
|
/// M7-T13 Bundle E — Health-dashboard Audit KPI tiles aggregate query.
|
|
/// Single round-trip
|
|
/// (<c>SELECT COUNT_BIG(*) AS Total, SUM(CASE WHEN Status IN (...) THEN 1 ELSE 0 END) AS Errors</c>)
|
|
/// over the trailing <paramref name="window"/> anchored at
|
|
/// <paramref name="nowUtc"/>. Returns a snapshot with
|
|
/// <see cref="AuditLogKpiSnapshot.BacklogTotal"/> left at zero — the service
|
|
/// layer composes that in from
|
|
/// <see cref="ScadaLink.HealthMonitoring.ICentralHealthAggregator"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Why one query, not two: keeping the numerator + denominator in the same
|
|
/// scan means the error rate the UI displays is computed from a consistent
|
|
/// snapshot. With two separate queries a row could be inserted between
|
|
/// them, inflating the denominator past the numerator (or vice-versa) and
|
|
/// briefly producing a misleading percentage.
|
|
/// </para>
|
|
/// <para>
|
|
/// "Error" rows are <c>Failed</c>, <c>Parked</c>, or <c>Discarded</c> — see
|
|
/// <see cref="IAuditLogRepository.GetKpiSnapshotAsync"/> for the rationale.
|
|
/// We pass the three discriminator strings as separate parameters rather
|
|
/// than building an IN-list to keep the prepared statement cache-friendly.
|
|
/// </para>
|
|
/// </remarks>
|
|
public async Task<AuditLogKpiSnapshot> GetKpiSnapshotAsync(
|
|
TimeSpan window,
|
|
DateTime? nowUtc = null,
|
|
CancellationToken ct = default)
|
|
{
|
|
var anchorUtc = (nowUtc ?? DateTime.UtcNow).ToUniversalTime();
|
|
var thresholdUtc = anchorUtc - window;
|
|
|
|
// ExecuteSqlInterpolated parameterises every interpolation — the enum
|
|
// discriminators are passed as varchar parameters that match the
|
|
// varchar(32) Status column (HasConversion<string>()).
|
|
var failedStr = nameof(Commons.Types.Enums.AuditStatus.Failed);
|
|
var parkedStr = nameof(Commons.Types.Enums.AuditStatus.Parked);
|
|
var discardedStr = nameof(Commons.Types.Enums.AuditStatus.Discarded);
|
|
|
|
long total = 0;
|
|
long errors = 0;
|
|
|
|
var conn = _context.Database.GetDbConnection();
|
|
var openedHere = false;
|
|
if (conn.State != System.Data.ConnectionState.Open)
|
|
{
|
|
await conn.OpenAsync(ct).ConfigureAwait(false);
|
|
openedHere = true;
|
|
}
|
|
|
|
try
|
|
{
|
|
await using var cmd = conn.CreateCommand();
|
|
// Named parameters keep the prepared statement cache stable across
|
|
// calls — only the values change. COUNT_BIG returns a bigint so
|
|
// we read into long even when the running total fits in int.
|
|
cmd.CommandText = @"
|
|
SELECT
|
|
COUNT_BIG(*) AS Total,
|
|
SUM(CASE WHEN Status IN (@failed, @parked, @discarded) THEN 1 ELSE 0 END) AS Errors
|
|
FROM dbo.AuditLog
|
|
WHERE OccurredAtUtc >= @threshold
|
|
AND OccurredAtUtc <= @anchor;";
|
|
|
|
var pThreshold = cmd.CreateParameter();
|
|
pThreshold.ParameterName = "@threshold";
|
|
pThreshold.Value = thresholdUtc;
|
|
cmd.Parameters.Add(pThreshold);
|
|
|
|
var pAnchor = cmd.CreateParameter();
|
|
pAnchor.ParameterName = "@anchor";
|
|
pAnchor.Value = anchorUtc;
|
|
cmd.Parameters.Add(pAnchor);
|
|
|
|
var pFailed = cmd.CreateParameter();
|
|
pFailed.ParameterName = "@failed";
|
|
pFailed.Value = failedStr;
|
|
cmd.Parameters.Add(pFailed);
|
|
|
|
var pParked = cmd.CreateParameter();
|
|
pParked.ParameterName = "@parked";
|
|
pParked.Value = parkedStr;
|
|
cmd.Parameters.Add(pParked);
|
|
|
|
var pDiscarded = cmd.CreateParameter();
|
|
pDiscarded.ParameterName = "@discarded";
|
|
pDiscarded.Value = discardedStr;
|
|
cmd.Parameters.Add(pDiscarded);
|
|
|
|
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
|
|
if (await reader.ReadAsync(ct).ConfigureAwait(false))
|
|
{
|
|
// SUM over an empty set is NULL; COUNT_BIG over an empty set is 0.
|
|
total = reader.IsDBNull(0) ? 0L : reader.GetInt64(0);
|
|
errors = reader.IsDBNull(1) ? 0L : Convert.ToInt64(reader.GetValue(1));
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (openedHere)
|
|
{
|
|
await conn.CloseAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
return new AuditLogKpiSnapshot(
|
|
TotalEventsLastHour: total,
|
|
ErrorEventsLastHour: errors,
|
|
BacklogTotal: 0L,
|
|
AsOfUtc: anchorUtc);
|
|
}
|
|
|
|
// Hard ceiling on chain depth for both the upward walk and the downward
|
|
// recursive CTE. The ParentExecutionId graph is a tree (acyclic by
|
|
// construction — each execution is minted fresh, its parent always
|
|
// pre-exists), so this is purely a guard against corrupt/pathological data:
|
|
// a cycle must surface as a bounded error rather than hang the server.
|
|
// Chains are shallow (1-2 levels typical) so the guard is never reached in
|
|
// practice.
|
|
private const int ExecutionChainMaxDepth = 32;
|
|
|
|
/// <summary>
|
|
/// Audit Log ParentExecutionId (Task 8) — returns the whole execution chain
|
|
/// containing <paramref name="executionId"/>, regardless of entry point.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Two phases. <b>Walk up:</b> an iterative
|
|
/// <c>SELECT TOP 1 ParentExecutionId … WHERE ExecutionId = @cur AND ParentExecutionId IS NOT NULL</c>
|
|
/// climbs from the supplied node to the root — the last execution id with no
|
|
/// parent. The loop is capped at <see cref="ExecutionChainMaxDepth"/>
|
|
/// iterations; a purged/missing parent simply ends the climb early. <b>Walk
|
|
/// down:</b> a recursive CTE over a DISTINCT
|
|
/// <c>(ExecutionId, ParentExecutionId)</c> edge set, seeded at the root edge
|
|
/// and joining <c>edge.ParentExecutionId = chain.ExecutionId</c> to
|
|
/// enumerate every descendant. Recursing over edges rather than raw rows
|
|
/// keeps the recursion one path wide per execution. It is bounded by
|
|
/// <c>OPTION (MAXRECURSION ...)</c> at <see cref="ExecutionChainMaxDepth"/>
|
|
/// — corrupt cyclic data raises a <see cref="SqlException"/> (msg 530)
|
|
/// rather than spinning.
|
|
/// </para>
|
|
/// <para>
|
|
/// The chain's full execution-id set is every edge's <c>ExecutionId</c>
|
|
/// unioned with its non-null <c>ParentExecutionId</c>, so an execution
|
|
/// referenced only as a parent — a "stub" that emitted no rows of its own,
|
|
/// and therefore owns no edge of its own — is still included. The final
|
|
/// projection LEFT JOINs that id set back to <c>AuditLog</c> and
|
|
/// <c>GROUP BY</c>s, so a stub yields a node with <c>RowCount = 0</c> and
|
|
/// empty/null aggregates. The query is SELECT-only
|
|
/// (the audit writer role grants no UPDATE/DELETE — reads are unrestricted).
|
|
/// </para>
|
|
/// </remarks>
|
|
public async Task<IReadOnlyList<ExecutionTreeNode>> GetExecutionTreeAsync(
|
|
Guid executionId,
|
|
CancellationToken ct = default)
|
|
{
|
|
var conn = _context.Database.GetDbConnection();
|
|
var openedHere = false;
|
|
if (conn.State != System.Data.ConnectionState.Open)
|
|
{
|
|
await conn.OpenAsync(ct).ConfigureAwait(false);
|
|
openedHere = true;
|
|
}
|
|
|
|
try
|
|
{
|
|
// --- Phase 1: walk up to the root ---------------------------------
|
|
// Climb ParentExecutionId until a node has no parent (root) or the
|
|
// parent execution has no rows of its own (purged/stub — the climb
|
|
// cannot continue past a row-less node). The depth cap guards
|
|
// against a cycle in corrupt data; a tree never reaches it.
|
|
var rootExecutionId = executionId;
|
|
for (var depth = 0; depth < ExecutionChainMaxDepth; depth++)
|
|
{
|
|
Guid? parent;
|
|
await using (var upCmd = conn.CreateCommand())
|
|
{
|
|
upCmd.CommandText =
|
|
"SELECT TOP 1 ParentExecutionId FROM dbo.AuditLog " +
|
|
"WHERE ExecutionId = @cur AND ParentExecutionId IS NOT NULL;";
|
|
var pCur = upCmd.CreateParameter();
|
|
pCur.ParameterName = "@cur";
|
|
pCur.Value = rootExecutionId;
|
|
upCmd.Parameters.Add(pCur);
|
|
|
|
var result = await upCmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
|
|
parent = result is null or DBNull ? null : (Guid)result;
|
|
}
|
|
|
|
if (parent is null)
|
|
{
|
|
// No parent row for the current node — it is the root (or a
|
|
// row-less stub at the top of what survives). Stop climbing.
|
|
break;
|
|
}
|
|
|
|
rootExecutionId = parent.Value;
|
|
}
|
|
|
|
// --- Phase 2: walk down from the root via a recursive CTE ---------
|
|
// Edges : a non-recursive, DISTINCT (ExecutionId, ParentExecutionId)
|
|
// edge set distilled from AuditLog. Recursing over edges
|
|
// instead of raw rows means an execution with N audit rows
|
|
// contributes ONE recursion path, not N — MAXRECURSION
|
|
// bounds depth, not per-level width, so the raw-row form
|
|
// could fan out badly. One edge per execution because all
|
|
// rows of an execution share a single ParentExecutionId
|
|
// (see the MIN(...) note on the final projection).
|
|
// Chain : seeded at the root edge, recursively joins each edge whose
|
|
// ParentExecutionId is an ExecutionId already in the chain.
|
|
// Each edge carries its own ParentExecutionId, so the chain
|
|
// of edges already surfaces every execution id in the tree
|
|
// — including a row-less stub parent, which appears as the
|
|
// ParentExecutionId of its child's edge. No separate
|
|
// union-back CTE is needed.
|
|
// Final : collect every distinct execution id reachable from the
|
|
// chain (each edge's ExecutionId plus its non-null
|
|
// ParentExecutionId), LEFT JOIN back to AuditLog and
|
|
// GROUP BY so a stub parent — which owns no edge of its own
|
|
// because it emitted no rows — still surfaces as a node with
|
|
// RowCount 0 and NULL aggregates.
|
|
var nodes = new List<ExecutionTreeNode>();
|
|
await using (var downCmd = conn.CreateCommand())
|
|
{
|
|
downCmd.CommandText = $@"
|
|
WITH Edges AS (
|
|
SELECT DISTINCT ExecutionId, ParentExecutionId
|
|
FROM dbo.AuditLog
|
|
WHERE ExecutionId IS NOT NULL
|
|
),
|
|
Chain AS (
|
|
-- Anchor: the root execution id, seeded as a literal so
|
|
-- it is present even when the root is a row-less stub
|
|
-- (a purged/no-action parent owns no edge of its own).
|
|
-- The root is parentless by construction — the upward
|
|
-- walk stopped there — so its ParentExecutionId is NULL.
|
|
SELECT CAST(@root AS uniqueidentifier) AS ExecutionId,
|
|
CAST(NULL AS uniqueidentifier) AS ParentExecutionId
|
|
UNION ALL
|
|
SELECT e.ExecutionId, e.ParentExecutionId
|
|
FROM Edges e
|
|
INNER JOIN Chain c ON e.ParentExecutionId = c.ExecutionId
|
|
),
|
|
ChainIds AS (
|
|
SELECT ExecutionId FROM Chain
|
|
UNION
|
|
SELECT ParentExecutionId FROM Chain
|
|
WHERE ParentExecutionId IS NOT NULL
|
|
)
|
|
-- ParentExecutionId / SourceSiteId / SourceInstanceId are
|
|
-- derived via MIN: every audit row of one execution carries
|
|
-- the SAME ParentExecutionId (and source identity) — it is
|
|
-- stamped once per script run / inbound request — so MIN
|
|
-- simply picks that one shared value, it is not collapsing a
|
|
-- genuine disagreement across rows.
|
|
SELECT
|
|
ids.ExecutionId AS [ExecutionId],
|
|
MIN(a.ParentExecutionId) AS [ParentExecutionId],
|
|
COUNT(a.EventId) AS [RowCount],
|
|
(SELECT STRING_AGG(d.Channel, ',')
|
|
FROM (SELECT DISTINCT a2.Channel FROM dbo.AuditLog a2
|
|
WHERE a2.ExecutionId = ids.ExecutionId) d) AS [Channels],
|
|
(SELECT STRING_AGG(d.Status, ',')
|
|
FROM (SELECT DISTINCT a2.Status FROM dbo.AuditLog a2
|
|
WHERE a2.ExecutionId = ids.ExecutionId) d) AS [Statuses],
|
|
MIN(a.SourceSiteId) AS [SourceSiteId],
|
|
MIN(a.SourceInstanceId) AS [SourceInstanceId],
|
|
MIN(a.OccurredAtUtc) AS [FirstOccurredAtUtc],
|
|
MAX(a.OccurredAtUtc) AS [LastOccurredAtUtc]
|
|
FROM ChainIds ids
|
|
LEFT JOIN dbo.AuditLog a ON a.ExecutionId = ids.ExecutionId
|
|
GROUP BY ids.ExecutionId
|
|
OPTION (MAXRECURSION {ExecutionChainMaxDepth});";
|
|
|
|
var pRoot = downCmd.CreateParameter();
|
|
pRoot.ParameterName = "@root";
|
|
pRoot.Value = rootExecutionId;
|
|
downCmd.Parameters.Add(pRoot);
|
|
|
|
await using var reader = await downCmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
|
|
while (await reader.ReadAsync(ct).ConfigureAwait(false))
|
|
{
|
|
var nodeExecutionId = reader.GetGuid(0);
|
|
Guid? parentExecutionId = reader.IsDBNull(1) ? null : reader.GetGuid(1);
|
|
var rowCount = reader.GetInt32(2);
|
|
var channels = SplitAggregate(reader.IsDBNull(3) ? null : reader.GetString(3));
|
|
var statuses = SplitAggregate(reader.IsDBNull(4) ? null : reader.GetString(4));
|
|
var sourceSiteId = reader.IsDBNull(5) ? null : reader.GetString(5);
|
|
var sourceInstanceId = reader.IsDBNull(6) ? null : reader.GetString(6);
|
|
DateTime? firstOccurred = reader.IsDBNull(7) ? null : reader.GetDateTime(7);
|
|
DateTime? lastOccurred = reader.IsDBNull(8) ? null : reader.GetDateTime(8);
|
|
|
|
nodes.Add(new ExecutionTreeNode(
|
|
ExecutionId: nodeExecutionId,
|
|
ParentExecutionId: parentExecutionId,
|
|
RowCount: rowCount,
|
|
Channels: channels,
|
|
Statuses: statuses,
|
|
SourceSiteId: sourceSiteId,
|
|
SourceInstanceId: sourceInstanceId,
|
|
FirstOccurredAtUtc: firstOccurred,
|
|
LastOccurredAtUtc: lastOccurred));
|
|
}
|
|
}
|
|
|
|
return nodes;
|
|
}
|
|
finally
|
|
{
|
|
if (openedHere)
|
|
{
|
|
await conn.CloseAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Distinct non-null <c>SourceNode</c> values for the Audit Log page's
|
|
/// Node filter dropdown. EF Core translates this to
|
|
/// <c>SELECT DISTINCT SourceNode FROM AuditLog WHERE SourceNode IS NOT NULL ORDER BY SourceNode</c>
|
|
/// — a single index-less scan, but the column is bounded (one entry per
|
|
/// node in the cluster, currently <10) and the Central UI caches the
|
|
/// result for ~60s, so a periodic scan is acceptable.
|
|
/// </summary>
|
|
public async Task<IReadOnlyList<string>> GetDistinctSourceNodesAsync(CancellationToken ct = default)
|
|
{
|
|
return await _context.Set<AuditEvent>()
|
|
.AsNoTracking()
|
|
.Where(e => e.SourceNode != null)
|
|
.Select(e => e.SourceNode!)
|
|
.Distinct()
|
|
.OrderBy(n => n)
|
|
.ToListAsync(ct);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Splits a <c>STRING_AGG</c> comma-joined value into a distinct, ordered
|
|
/// list. A null/empty aggregate (a stub node with no rows) yields an empty
|
|
/// list rather than a single empty string.
|
|
/// </summary>
|
|
private static IReadOnlyList<string> SplitAggregate(string? aggregate)
|
|
{
|
|
if (string.IsNullOrEmpty(aggregate))
|
|
{
|
|
return Array.Empty<string>();
|
|
}
|
|
|
|
return aggregate
|
|
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
|
|
.Distinct(StringComparer.Ordinal)
|
|
.OrderBy(v => v, StringComparer.Ordinal)
|
|
.ToArray();
|
|
}
|
|
}
|