780 lines
36 KiB
C#
780 lines
36 KiB
C#
using System.Threading.Channels;
|
|
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.AuditLog.Site;
|
|
|
|
/// <summary>
|
|
/// Site-side SQLite hot-path writer for Audit Log (#23) events. Mirrors the
|
|
/// <see cref="ScadaLink.SiteEventLogging.SiteEventLogger"/> design — a single
|
|
/// owned <see cref="SqliteConnection"/> serialised behind a write lock, fed by a
|
|
/// bounded <see cref="Channel{T}"/> drained on a dedicated background writer
|
|
/// task — so script-thread callers never block on disk I/O.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// The schema is bootstrapped in the constructor (Bundle B-T1). The
|
|
/// Channel-based <see cref="WriteAsync"/> hot-path + Bundle D
|
|
/// <see cref="ReadPendingAsync"/> / <see cref="MarkForwardedAsync"/> support
|
|
/// surface are wired in Bundle B-T2.
|
|
/// </para>
|
|
/// <para>
|
|
/// Site rows always carry <see cref="AuditForwardState.Pending"/> on first
|
|
/// insert; the central row-shape's <c>IngestedAtUtc</c> column does NOT live in
|
|
/// the site SQLite schema — central stamps it on ingest.
|
|
/// </para>
|
|
/// </remarks>
|
|
public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable
|
|
{
|
|
// Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19)
|
|
// on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
|
|
// is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
|
|
// surfaced across all SQLite builds. We treat any constraint error on insert
|
|
// as a duplicate-eventid race and swallow it (first-write-wins) — the index
|
|
// on EventId is the only constraint on this table, so this scope is precise.
|
|
private const int SqliteErrorConstraint = 19;
|
|
|
|
private readonly SqliteConnection _connection;
|
|
private readonly SqliteAuditWriterOptions _options;
|
|
private readonly ILogger<SqliteAuditWriter> _logger;
|
|
private readonly object _writeLock = new();
|
|
private readonly Channel<PendingAuditEvent> _writeQueue;
|
|
private readonly Task _writerLoop;
|
|
private bool _disposed;
|
|
|
|
public SqliteAuditWriter(
|
|
IOptions<SqliteAuditWriterOptions> options,
|
|
ILogger<SqliteAuditWriter> logger,
|
|
string? connectionStringOverride = null)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
ArgumentNullException.ThrowIfNull(logger);
|
|
|
|
_options = options.Value;
|
|
_logger = logger;
|
|
|
|
var connectionString = connectionStringOverride
|
|
?? $"Data Source={_options.DatabasePath};Cache=Shared";
|
|
_connection = new SqliteConnection(connectionString);
|
|
_connection.Open();
|
|
|
|
InitializeSchema();
|
|
|
|
_writeQueue = Channel.CreateBounded<PendingAuditEvent>(
|
|
new BoundedChannelOptions(_options.ChannelCapacity)
|
|
{
|
|
// The hot-path enqueue must back-pressure if the background
|
|
// writer falls behind; a higher-level fallback (Bundle B-T4)
|
|
// handles truly catastrophic primary failure with a drop-oldest
|
|
// ring buffer.
|
|
FullMode = BoundedChannelFullMode.Wait,
|
|
SingleReader = true,
|
|
SingleWriter = false,
|
|
});
|
|
_writerLoop = Task.Run(ProcessWriteQueueAsync);
|
|
}
|
|
|
|
private void InitializeSchema()
|
|
{
|
|
// auto_vacuum must be set before any table is created for it to take
|
|
// effect on a fresh database. INCREMENTAL lets a future
|
|
// `PRAGMA incremental_vacuum` shrink the file after the 7-day retention
|
|
// purge — see alog.md §10.
|
|
using (var pragmaCmd = _connection.CreateCommand())
|
|
{
|
|
pragmaCmd.CommandText = "PRAGMA auto_vacuum = INCREMENTAL";
|
|
pragmaCmd.ExecuteNonQuery();
|
|
}
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
CREATE TABLE IF NOT EXISTS AuditLog (
|
|
EventId TEXT NOT NULL,
|
|
OccurredAtUtc TEXT NOT NULL,
|
|
Channel TEXT NOT NULL,
|
|
Kind TEXT NOT NULL,
|
|
CorrelationId TEXT NULL,
|
|
SourceSiteId TEXT NULL,
|
|
SourceNode TEXT NULL,
|
|
SourceInstanceId TEXT NULL,
|
|
SourceScript TEXT NULL,
|
|
Actor TEXT NULL,
|
|
Target TEXT NULL,
|
|
Status TEXT NOT NULL,
|
|
HttpStatus INTEGER NULL,
|
|
DurationMs INTEGER NULL,
|
|
ErrorMessage TEXT NULL,
|
|
ErrorDetail TEXT NULL,
|
|
RequestSummary TEXT NULL,
|
|
ResponseSummary TEXT NULL,
|
|
PayloadTruncated INTEGER NOT NULL,
|
|
Extra TEXT NULL,
|
|
ForwardState TEXT NOT NULL,
|
|
ExecutionId TEXT NULL,
|
|
ParentExecutionId TEXT NULL,
|
|
PRIMARY KEY (EventId)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
|
|
ON AuditLog (ForwardState, OccurredAtUtc);
|
|
""";
|
|
cmd.ExecuteNonQuery();
|
|
|
|
// Audit Log #23 (ExecutionId): additively add the ExecutionId column.
|
|
// CREATE TABLE IF NOT EXISTS above does NOT add columns to an AuditLog
|
|
// table that already exists from a pre-ExecutionId build, so an
|
|
// auditlog.db created by an older build needs the column ALTER-ed in.
|
|
// The file is durable across restart/failover by design (7-day
|
|
// retention), so without this step every WriteAsync on an upgraded
|
|
// deployment would bind $ExecutionId against a missing column and the
|
|
// best-effort write path would silently drop every site audit row.
|
|
// SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is
|
|
// probed first and the ALTER skipped when already there. The column is
|
|
// nullable with no default, so any row written before this migration
|
|
// reads back ExecutionId = null (back-compat).
|
|
AddColumnIfMissing("ExecutionId", "TEXT NULL");
|
|
|
|
// Audit Log #23 (ParentExecutionId): same idempotent upgrade path as
|
|
// ExecutionId above. A deployment that already ran the ExecutionId
|
|
// branch has an auditlog.db with the 21-column schema and no
|
|
// ParentExecutionId column; CREATE TABLE IF NOT EXISTS cannot add it,
|
|
// so it is ALTER-ed in here. Nullable with no default — rows written
|
|
// before this migration read back ParentExecutionId = null.
|
|
AddColumnIfMissing("ParentExecutionId", "TEXT NULL");
|
|
|
|
// SourceNode stamping: same idempotent upgrade path as ExecutionId /
|
|
// ParentExecutionId above. A deployment that already ran the
|
|
// ParentExecutionId branch has an auditlog.db with the 22-column
|
|
// schema and no SourceNode column; CREATE TABLE IF NOT EXISTS cannot
|
|
// add it, so it is ALTER-ed in here. Nullable with no default — rows
|
|
// written before this migration read back SourceNode = null.
|
|
AddColumnIfMissing("SourceNode", "TEXT NULL");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Audit Log #23: additively adds a column to <c>AuditLog</c> only when
|
|
/// it is not already present (used for <c>ExecutionId</c> and
|
|
/// <c>ParentExecutionId</c>). SQLite lacks <c>ADD COLUMN IF NOT EXISTS</c>,
|
|
/// so the schema is probed via <c>PRAGMA table_info</c> first. Idempotent —
|
|
/// safe to run on every <see cref="InitializeSchema"/>. Mirrors
|
|
/// <c>StoreAndForwardStorage.AddColumnIfMissingAsync</c>; kept synchronous
|
|
/// here to match the rest of this writer's bootstrap DDL.
|
|
/// </summary>
|
|
private void AddColumnIfMissing(string columnName, string columnDefinition)
|
|
{
|
|
using var probe = _connection.CreateCommand();
|
|
probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name";
|
|
probe.Parameters.AddWithValue("$name", columnName);
|
|
var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
|
|
if (exists)
|
|
{
|
|
return;
|
|
}
|
|
|
|
using var alter = _connection.CreateCommand();
|
|
// Column name + definition are caller-controlled constants, never user
|
|
// input — safe to interpolate (parameters are not permitted in DDL).
|
|
alter.CommandText = $"ALTER TABLE AuditLog ADD COLUMN {columnName} {columnDefinition}";
|
|
alter.ExecuteNonQuery();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Enqueue an event for durable persistence. The returned <see cref="Task"/>
|
|
/// completes once the event has been INSERTed (or, in the duplicate-EventId
|
|
/// case, recognised as already present); it faults only if the writer loop
|
|
/// itself collapses. The enqueue side never blocks on disk I/O — it only
|
|
/// awaits the bounded channel's back-pressure when the writer is briefly
|
|
/// behind.
|
|
/// </summary>
|
|
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(evt);
|
|
|
|
// Site rows always carry a non-null ForwardState; central rows leave it
|
|
// null. Force Pending on enqueue so callers can pass a bare AuditEvent
|
|
// without thinking about site-vs-central provenance.
|
|
var siteEvt = evt.ForwardState is null
|
|
? evt with { ForwardState = AuditForwardState.Pending }
|
|
: evt;
|
|
|
|
var pending = new PendingAuditEvent(siteEvt);
|
|
|
|
// CreateBounded(FullMode=Wait) means WriteAsync will await room rather
|
|
// than throw when full — exactly the hot-path back-pressure semantics
|
|
// we want.
|
|
if (!_writeQueue.Writer.TryWrite(pending))
|
|
{
|
|
// The writer is either completed (logger disposed) or the channel
|
|
// is at capacity. Fall back to the async path which honours the
|
|
// FullMode=Wait policy.
|
|
return WriteSlowPathAsync(pending, ct);
|
|
}
|
|
|
|
return pending.Completion.Task;
|
|
}
|
|
|
|
private async Task WriteSlowPathAsync(PendingAuditEvent pending, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await _writeQueue.Writer.WriteAsync(pending, ct).ConfigureAwait(false);
|
|
}
|
|
catch (ChannelClosedException)
|
|
{
|
|
pending.Completion.TrySetException(
|
|
new ObjectDisposedException(nameof(SqliteAuditWriter),
|
|
"Event could not be recorded: the audit writer has been disposed."));
|
|
}
|
|
|
|
await pending.Completion.Task.ConfigureAwait(false);
|
|
}
|
|
|
|
private async Task ProcessWriteQueueAsync()
|
|
{
|
|
var batch = new List<PendingAuditEvent>(_options.BatchSize);
|
|
|
|
// ReadAllAsync completes when the channel is marked complete (Dispose).
|
|
await foreach (var first in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false))
|
|
{
|
|
batch.Clear();
|
|
batch.Add(first);
|
|
|
|
// Pull additional ready events up to BatchSize. TryRead is non-
|
|
// blocking and lets us amortise the transaction overhead across a
|
|
// burst of concurrent enqueues.
|
|
while (batch.Count < _options.BatchSize &&
|
|
_writeQueue.Reader.TryRead(out var next))
|
|
{
|
|
batch.Add(next);
|
|
}
|
|
|
|
FlushBatch(batch);
|
|
}
|
|
}
|
|
|
|
private void FlushBatch(IReadOnlyList<PendingAuditEvent> batch)
|
|
{
|
|
lock (_writeLock)
|
|
{
|
|
if (_disposed)
|
|
{
|
|
foreach (var pending in batch)
|
|
{
|
|
pending.Completion.TrySetException(
|
|
new ObjectDisposedException(nameof(SqliteAuditWriter),
|
|
"Event could not be recorded: the audit writer was disposed before the write completed."));
|
|
}
|
|
return;
|
|
}
|
|
|
|
using var transaction = _connection.BeginTransaction();
|
|
try
|
|
{
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.Transaction = transaction;
|
|
cmd.CommandText = """
|
|
INSERT INTO AuditLog (
|
|
EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
|
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
|
|
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
|
|
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
|
|
ExecutionId, ParentExecutionId
|
|
) VALUES (
|
|
$EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId,
|
|
$SourceSiteId, $SourceNode, $SourceInstanceId, $SourceScript, $Actor, $Target,
|
|
$Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail,
|
|
$RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState,
|
|
$ExecutionId, $ParentExecutionId
|
|
);
|
|
""";
|
|
|
|
var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text);
|
|
var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
|
|
var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text);
|
|
var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text);
|
|
var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text);
|
|
var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text);
|
|
var pSourceNode = cmd.Parameters.Add("$SourceNode", SqliteType.Text);
|
|
var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text);
|
|
var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text);
|
|
var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text);
|
|
var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text);
|
|
var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text);
|
|
var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer);
|
|
var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer);
|
|
var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text);
|
|
var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text);
|
|
var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text);
|
|
var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text);
|
|
var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer);
|
|
var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text);
|
|
var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text);
|
|
var pExecutionId = cmd.Parameters.Add("$ExecutionId", SqliteType.Text);
|
|
var pParentExecutionId = cmd.Parameters.Add("$ParentExecutionId", SqliteType.Text);
|
|
|
|
foreach (var pending in batch)
|
|
{
|
|
var e = pending.Event;
|
|
pEventId.Value = e.EventId.ToString();
|
|
pOccurredAt.Value = e.OccurredAtUtc.ToString("o");
|
|
pChannel.Value = e.Channel.ToString();
|
|
pKind.Value = e.Kind.ToString();
|
|
pCorrelationId.Value = (object?)e.CorrelationId?.ToString() ?? DBNull.Value;
|
|
pSourceSiteId.Value = (object?)e.SourceSiteId ?? DBNull.Value;
|
|
pSourceNode.Value = (object?)e.SourceNode ?? DBNull.Value;
|
|
pSourceInstanceId.Value = (object?)e.SourceInstanceId ?? DBNull.Value;
|
|
pSourceScript.Value = (object?)e.SourceScript ?? DBNull.Value;
|
|
pActor.Value = (object?)e.Actor ?? DBNull.Value;
|
|
pTarget.Value = (object?)e.Target ?? DBNull.Value;
|
|
pStatus.Value = e.Status.ToString();
|
|
pHttpStatus.Value = (object?)e.HttpStatus ?? DBNull.Value;
|
|
pDurationMs.Value = (object?)e.DurationMs ?? DBNull.Value;
|
|
pErrorMessage.Value = (object?)e.ErrorMessage ?? DBNull.Value;
|
|
pErrorDetail.Value = (object?)e.ErrorDetail ?? DBNull.Value;
|
|
pRequestSummary.Value = (object?)e.RequestSummary ?? DBNull.Value;
|
|
pResponseSummary.Value = (object?)e.ResponseSummary ?? DBNull.Value;
|
|
pPayloadTruncated.Value = e.PayloadTruncated ? 1 : 0;
|
|
pExtra.Value = (object?)e.Extra ?? DBNull.Value;
|
|
pForwardState.Value = (e.ForwardState ?? AuditForwardState.Pending).ToString();
|
|
pExecutionId.Value = (object?)e.ExecutionId?.ToString() ?? DBNull.Value;
|
|
pParentExecutionId.Value = (object?)e.ParentExecutionId?.ToString() ?? DBNull.Value;
|
|
|
|
try
|
|
{
|
|
cmd.ExecuteNonQuery();
|
|
pending.Completion.TrySetResult();
|
|
}
|
|
catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
|
|
{
|
|
// Duplicate EventId — first-write-wins (alog.md §11).
|
|
// Treat as success: the lifecycle event is durably
|
|
// recorded under the first writer's payload.
|
|
_logger.LogDebug(ex,
|
|
"Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
|
|
e.EventId);
|
|
pending.Completion.TrySetResult();
|
|
}
|
|
}
|
|
|
|
transaction.Commit();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
transaction.Rollback();
|
|
_logger.LogError(ex, "SqliteAuditWriter batch insert failed; faulting {Count} pending events", batch.Count);
|
|
foreach (var pending in batch)
|
|
{
|
|
pending.Completion.TrySetException(ex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns up to <paramref name="limit"/> rows in <see cref="AuditForwardState.Pending"/>,
|
|
/// oldest <see cref="AuditEvent.OccurredAtUtc"/> first, with <see cref="AuditEvent.EventId"/>
|
|
/// as the deterministic tiebreaker. Called by Bundle D's site telemetry
|
|
/// actor to build a batch for the gRPC push.
|
|
/// </summary>
|
|
public Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default)
|
|
{
|
|
if (limit <= 0)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
|
|
}
|
|
|
|
// SqliteConnection is not thread-safe so we go through the same write
|
|
// lock the batch INSERTer uses. The actor caller is single-threaded,
|
|
// so contention is bounded.
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
|
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
|
|
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
|
|
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
|
|
ExecutionId, ParentExecutionId
|
|
FROM AuditLog
|
|
WHERE ForwardState = $pending
|
|
ORDER BY OccurredAtUtc ASC, EventId ASC
|
|
LIMIT $limit;
|
|
""";
|
|
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
|
|
cmd.Parameters.AddWithValue("$limit", limit);
|
|
|
|
var rows = new List<AuditEvent>(Math.Min(limit, 256));
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
{
|
|
rows.Add(MapRow(reader));
|
|
}
|
|
|
|
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns up to <paramref name="limit"/> rows in
|
|
/// <see cref="AuditForwardState.Forwarded"/>, oldest
|
|
/// <see cref="AuditEvent.OccurredAtUtc"/> first, with
|
|
/// <see cref="AuditEvent.EventId"/> as the deterministic tiebreaker. The
|
|
/// <see cref="AuditForwardState.Forwarded"/>-specific counterpart of
|
|
/// <see cref="ReadPendingAsync"/>; used by tests to assert a row reached the
|
|
/// <see cref="AuditForwardState.Forwarded"/> state specifically (unlike
|
|
/// <see cref="ReadPendingSinceAsync"/>, which also returns
|
|
/// <see cref="AuditForwardState.Pending"/> rows).
|
|
/// </summary>
|
|
public Task<IReadOnlyList<AuditEvent>> ReadForwardedAsync(int limit, CancellationToken ct = default)
|
|
{
|
|
if (limit <= 0)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
|
|
}
|
|
|
|
// Mirror ReadPendingAsync: the write lock guards the single connection.
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
|
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
|
|
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
|
|
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
|
|
ExecutionId, ParentExecutionId
|
|
FROM AuditLog
|
|
WHERE ForwardState = $forwarded
|
|
ORDER BY OccurredAtUtc ASC, EventId ASC
|
|
LIMIT $limit;
|
|
""";
|
|
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
|
|
cmd.Parameters.AddWithValue("$limit", limit);
|
|
|
|
var rows = new List<AuditEvent>(Math.Min(limit, 256));
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
{
|
|
rows.Add(MapRow(reader));
|
|
}
|
|
|
|
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Flips the supplied EventIds from <see cref="AuditForwardState.Pending"/> to
|
|
/// <see cref="AuditForwardState.Forwarded"/> in a single UPDATE. Non-existent
|
|
/// or already-forwarded ids are no-ops.
|
|
/// </summary>
|
|
public Task MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(eventIds);
|
|
if (eventIds.Count == 0)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
// Build a single IN (...) parameter list so we issue one UPDATE per
|
|
// batch regardless of size. Each id is bound as its own parameter,
|
|
// so no string concatenation of user data ever enters the SQL.
|
|
var sb = new System.Text.StringBuilder();
|
|
sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN (");
|
|
for (int i = 0; i < eventIds.Count; i++)
|
|
{
|
|
if (i > 0) sb.Append(',');
|
|
var p = $"$id{i}";
|
|
sb.Append(p);
|
|
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
|
|
}
|
|
sb.Append(");");
|
|
cmd.CommandText = sb.ToString();
|
|
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
|
|
|
|
cmd.ExecuteNonQuery();
|
|
return Task.CompletedTask;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// M6 reconciliation-pull read: returns up to <paramref name="batchSize"/> rows
|
|
/// whose <c>OccurredAtUtc >= sinceUtc</c> and whose <see cref="AuditForwardState"/>
|
|
/// is still <see cref="AuditForwardState.Pending"/> or
|
|
/// <see cref="AuditForwardState.Forwarded"/>. Forwarded rows are included so the
|
|
/// brief race window between a site-Forwarded ack and central ingest cannot
|
|
/// silently drop rows; central dedups on <see cref="AuditEvent.EventId"/>.
|
|
/// Ordered oldest <see cref="AuditEvent.OccurredAtUtc"/> first, EventId tiebreaker.
|
|
/// </summary>
|
|
public Task<IReadOnlyList<AuditEvent>> ReadPendingSinceAsync(
|
|
DateTime sinceUtc, int batchSize, CancellationToken ct = default)
|
|
{
|
|
if (batchSize <= 0)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(batchSize), "batchSize must be > 0.");
|
|
}
|
|
|
|
// Mirror ReadPendingAsync: the write lock guards the single connection.
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
|
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
|
|
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
|
|
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
|
|
ExecutionId, ParentExecutionId
|
|
FROM AuditLog
|
|
WHERE ForwardState IN ($pending, $forwarded)
|
|
AND OccurredAtUtc >= $since
|
|
ORDER BY OccurredAtUtc ASC, EventId ASC
|
|
LIMIT $limit;
|
|
""";
|
|
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
|
|
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
|
|
// Normalise to UTC ISO-8601 round-trip format to match how OccurredAtUtc
|
|
// is stored on insert ("o" format) — string comparison is monotonic for
|
|
// that encoding so we can index-scan against it.
|
|
cmd.Parameters.AddWithValue("$since", EnsureUtc(sinceUtc).ToString(
|
|
"o", System.Globalization.CultureInfo.InvariantCulture));
|
|
cmd.Parameters.AddWithValue("$limit", batchSize);
|
|
|
|
var rows = new List<AuditEvent>(Math.Min(batchSize, 256));
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
{
|
|
rows.Add(MapRow(reader));
|
|
}
|
|
|
|
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// M6 reconciliation-pull commit: flips the supplied EventIds to
|
|
/// <see cref="AuditForwardState.Reconciled"/>, but ONLY for rows currently in
|
|
/// <see cref="AuditForwardState.Pending"/> or <see cref="AuditForwardState.Forwarded"/>.
|
|
/// Rows already in <see cref="AuditForwardState.Reconciled"/> are left untouched
|
|
/// (idempotent re-call). Non-existent ids are silent no-ops.
|
|
/// </summary>
|
|
public Task MarkReconciledAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(eventIds);
|
|
if (eventIds.Count == 0)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
using var cmd = _connection.CreateCommand();
|
|
var sb = new System.Text.StringBuilder();
|
|
sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ")
|
|
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
|
|
for (int i = 0; i < eventIds.Count; i++)
|
|
{
|
|
if (i > 0) sb.Append(',');
|
|
var p = $"$id{i}";
|
|
sb.Append(p);
|
|
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
|
|
}
|
|
sb.Append(");");
|
|
cmd.CommandText = sb.ToString();
|
|
cmd.Parameters.AddWithValue("$reconciled", AuditForwardState.Reconciled.ToString());
|
|
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
|
|
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
|
|
|
|
cmd.ExecuteNonQuery();
|
|
return Task.CompletedTask;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// M6 Bundle E (T6) health-metric surface: returns a point-in-time snapshot
|
|
/// of the site queue's pending count, the oldest pending row's
|
|
/// <see cref="AuditEvent.OccurredAtUtc"/>, and the on-disk file size. Called
|
|
/// by the site-side <c>SiteAuditBacklogReporter</c> hosted service on its
|
|
/// 30 s tick to refresh the <c>SiteHealthReport.SiteAuditBacklog</c> field.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The pending-count + oldest-row queries run inside the same write lock as
|
|
/// the hot-path INSERT batch so the snapshot is consistent against the
|
|
/// connection's view (no torn read of an in-flight transaction). The on-disk
|
|
/// size lookup happens OUTSIDE the lock — it's a stat() call on the file
|
|
/// path and doesn't touch the connection. In-memory and missing files
|
|
/// return 0 bytes (the snapshot is for ops dashboards, not a correctness
|
|
/// invariant).
|
|
/// </remarks>
|
|
public Task<SiteAuditBacklogSnapshot> GetBacklogStatsAsync(CancellationToken ct = default)
|
|
{
|
|
int pendingCount;
|
|
DateTime? oldestPending;
|
|
|
|
lock (_writeLock)
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
// Single round-trip — COUNT(*) + MIN(OccurredAtUtc) over the same
|
|
// index range avoids a second scan. The IX_SiteAuditLog_ForwardState_Occurred
|
|
// index makes both aggregates cheap (count is a covering scan, min
|
|
// is the first key).
|
|
using var cmd = _connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
SELECT COUNT(*), MIN(OccurredAtUtc)
|
|
FROM AuditLog
|
|
WHERE ForwardState = $pending;
|
|
""";
|
|
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
|
|
|
|
using var reader = cmd.ExecuteReader();
|
|
reader.Read();
|
|
pendingCount = reader.GetInt32(0);
|
|
oldestPending = reader.IsDBNull(1)
|
|
? null
|
|
: DateTime.Parse(reader.GetString(1),
|
|
System.Globalization.CultureInfo.InvariantCulture,
|
|
System.Globalization.DateTimeStyles.RoundtripKind);
|
|
}
|
|
|
|
// File-size lookup outside the lock — the DatabasePath option is the
|
|
// canonical source. The connection-string-override branch (used by
|
|
// some tests) keeps the same DatabasePath value, so this works
|
|
// uniformly. In-memory / mode=memory paths return 0 because the file
|
|
// doesn't exist on disk.
|
|
long onDiskBytes = 0;
|
|
try
|
|
{
|
|
if (!string.IsNullOrEmpty(_options.DatabasePath) &&
|
|
!_options.DatabasePath.StartsWith(":memory:", StringComparison.Ordinal) &&
|
|
!_options.DatabasePath.Contains("mode=memory", StringComparison.OrdinalIgnoreCase) &&
|
|
File.Exists(_options.DatabasePath))
|
|
{
|
|
onDiskBytes = new FileInfo(_options.DatabasePath).Length;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// File system probe is a best-effort health-metric — never abort
|
|
// a backlog snapshot because stat() failed. Log and report 0.
|
|
_logger.LogDebug(ex,
|
|
"SqliteAuditWriter could not stat DB path {Path} for backlog snapshot.",
|
|
_options.DatabasePath);
|
|
}
|
|
|
|
return Task.FromResult(new SiteAuditBacklogSnapshot(
|
|
PendingCount: pendingCount,
|
|
OldestPendingUtc: oldestPending,
|
|
OnDiskBytes: onDiskBytes));
|
|
}
|
|
|
|
private static DateTime EnsureUtc(DateTime value) =>
|
|
value.Kind == DateTimeKind.Utc
|
|
? value
|
|
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
|
|
|
|
private static AuditEvent MapRow(SqliteDataReader reader)
|
|
{
|
|
return new AuditEvent
|
|
{
|
|
EventId = Guid.Parse(reader.GetString(0)),
|
|
OccurredAtUtc = DateTime.Parse(reader.GetString(1),
|
|
System.Globalization.CultureInfo.InvariantCulture,
|
|
System.Globalization.DateTimeStyles.RoundtripKind),
|
|
Channel = Enum.Parse<AuditChannel>(reader.GetString(2)),
|
|
Kind = Enum.Parse<AuditKind>(reader.GetString(3)),
|
|
CorrelationId = reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)),
|
|
SourceSiteId = reader.IsDBNull(5) ? null : reader.GetString(5),
|
|
SourceNode = reader.IsDBNull(6) ? null : reader.GetString(6),
|
|
SourceInstanceId = reader.IsDBNull(7) ? null : reader.GetString(7),
|
|
SourceScript = reader.IsDBNull(8) ? null : reader.GetString(8),
|
|
Actor = reader.IsDBNull(9) ? null : reader.GetString(9),
|
|
Target = reader.IsDBNull(10) ? null : reader.GetString(10),
|
|
Status = Enum.Parse<AuditStatus>(reader.GetString(11)),
|
|
HttpStatus = reader.IsDBNull(12) ? null : reader.GetInt32(12),
|
|
DurationMs = reader.IsDBNull(13) ? null : reader.GetInt32(13),
|
|
ErrorMessage = reader.IsDBNull(14) ? null : reader.GetString(14),
|
|
ErrorDetail = reader.IsDBNull(15) ? null : reader.GetString(15),
|
|
RequestSummary = reader.IsDBNull(16) ? null : reader.GetString(16),
|
|
ResponseSummary = reader.IsDBNull(17) ? null : reader.GetString(17),
|
|
PayloadTruncated = reader.GetInt32(18) != 0,
|
|
Extra = reader.IsDBNull(19) ? null : reader.GetString(19),
|
|
ForwardState = Enum.Parse<AuditForwardState>(reader.GetString(20)),
|
|
ExecutionId = reader.IsDBNull(21) ? null : Guid.Parse(reader.GetString(21)),
|
|
ParentExecutionId = reader.IsDBNull(22) ? null : Guid.Parse(reader.GetString(22)),
|
|
};
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
DisposeAsync().AsTask().GetAwaiter().GetResult();
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
Task? writerLoop;
|
|
lock (_writeLock)
|
|
{
|
|
if (_disposed) return;
|
|
// Stop accepting new events. Setting _disposed first ensures any
|
|
// FlushBatch entered after we mark disposed will fault its pending
|
|
// events rather than touching the about-to-close connection.
|
|
_writeQueue.Writer.TryComplete();
|
|
writerLoop = _writerLoop;
|
|
}
|
|
|
|
// Wait outside the lock — the loop reacquires it for each batch.
|
|
try
|
|
{
|
|
if (writerLoop is not null)
|
|
{
|
|
await writerLoop.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
|
|
}
|
|
}
|
|
catch (TimeoutException)
|
|
{
|
|
_logger.LogWarning("SqliteAuditWriter writer loop did not drain within 5s of dispose.");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// The loop's per-batch try/catch already routed individual failures
|
|
// to pending TCSes; a top-level fault here is unexpected.
|
|
_logger.LogError(ex, "SqliteAuditWriter writer loop faulted during dispose.");
|
|
}
|
|
|
|
lock (_writeLock)
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_connection.Dispose();
|
|
}
|
|
}
|
|
|
|
/// <summary>An audit event awaiting persistence by the background writer.</summary>
|
|
private sealed class PendingAuditEvent
|
|
{
|
|
public PendingAuditEvent(AuditEvent evt)
|
|
{
|
|
Event = evt;
|
|
Completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
}
|
|
|
|
public AuditEvent Event { get; }
|
|
public TaskCompletionSource Completion { get; }
|
|
}
|
|
}
|