Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs
T

944 lines
45 KiB
C#

using System.Threading.Channels;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
using AuditOutcome = ZB.MOM.WW.Audit.AuditOutcome;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site;
/// <summary>
/// Site-side SQLite hot-path writer for Audit Log (#23) events. Mirrors the
/// <see cref="ZB.MOM.WW.ScadaBridge.SiteEventLogging.SiteEventLogger"/> design — a single
/// owned <see cref="SqliteConnection"/> serialised behind a write lock, fed by a
/// bounded <see cref="Channel{T}"/> drained on a dedicated background writer
/// task — so script-thread callers never block on disk I/O.
/// </summary>
/// <remarks>
/// <para>
/// <b>C4 (Task 2.5) — two-table schema.</b> The site store is now two tables:
/// the append-only canonical <c>audit_event</c> (the 10 canonical
/// <see cref="AuditEvent"/> fields stored directly — NO 24-column decompose) and
/// the mutable operational <c>audit_forward_state</c> sidecar that carries the
/// forwarding lifecycle (<see cref="AuditForwardState"/>), a duplicated
/// <c>OccurredAtUtc</c> for the drain index range-scan, a precomputed
/// <c>IsCachedKind</c> flag that drives the cached/non-cached drain split without
/// re-parsing <c>DetailsJson</c> on the read hot-path, plus attempt bookkeeping.
/// </para>
/// <para>
/// <b>Ephemeral reset.</b> The site SQLite store is ephemeral (≈7-day retention,
/// recreated per deployment), so C4's schema change is an in-place RESET: the new
/// tables are created and the old single 24-column <c>AuditLog</c> table is
/// DROP-ped if present. No SQLite data migration is performed (and none is
/// needed) — any rows in a pre-C4 <c>AuditLog</c> table are within the retention
/// window and are discarded by the drop.
/// </para>
/// <para>
/// Site rows always carry <see cref="AuditForwardState.Pending"/> on first
/// insert; the central row-shape's <c>IngestedAtUtc</c> is a DetailsJson field
/// stamped by central on ingest, not a site column.
/// </para>
/// </remarks>
public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable
{
// Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19)
// on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
// is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
// surfaced across all SQLite builds. We treat any constraint error on insert
// as a duplicate-eventid race and swallow it (first-write-wins) — the PRIMARY
// KEY on audit_event.EventId is the constraint that fires first, so this scope
// is precise (the sidecar insert for the same EventId is in the same
// transaction and never reached once audit_event's insert throws).
private const int SqliteErrorConstraint = 19;
private readonly SqliteConnection _connection;
// AuditLog-005: dedicated read-only connection used by GetBacklogStatsAsync,
// ReadPendingAsync, ReadPendingSinceAsync, and ReadForwardedAsync so a slow
// backlog scan (COUNT(*) over hundreds of thousands of Pending rows under a
// central outage) never parks the hot-path writer behind _writeLock.
// SQLite-with-WAL allows a second connection on the same file to read
// concurrently with the writer; the writer's WAL pragma is set in
// InitializeSchema before this connection is opened. The reader connection
// has its own _readLock because SqliteConnection itself is not thread-safe
// even in read-only mode — multiple read callers can otherwise interleave
// commands on the shared connection.
private readonly SqliteConnection _readConnection;
private readonly object _readLock = new();
private readonly SqliteAuditWriterOptions _options;
private readonly ILogger<SqliteAuditWriter> _logger;
private readonly INodeIdentityProvider _nodeIdentity;
private readonly object _writeLock = new();
private readonly Channel<PendingAuditEvent> _writeQueue;
private readonly Task _writerLoop;
private bool _disposed;
/// <summary>Initializes a new instance of the SqliteAuditWriter class.</summary>
/// <param name="options">Configuration options for the audit writer.</param>
/// <param name="logger">Logger instance.</param>
/// <param name="nodeIdentity">Node identity provider.</param>
/// <param name="connectionStringOverride">Optional connection string override.</param>
public SqliteAuditWriter(
IOptions<SqliteAuditWriterOptions> options,
ILogger<SqliteAuditWriter> logger,
INodeIdentityProvider nodeIdentity,
string? connectionStringOverride = null)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(nodeIdentity);
_options = options.Value;
_logger = logger;
_nodeIdentity = nodeIdentity;
var connectionString = connectionStringOverride
?? $"Data Source={_options.DatabasePath};Cache=Shared";
_connection = new SqliteConnection(connectionString);
_connection.Open();
InitializeSchema();
// AuditLog-005: open a second connection for read-only callers
// (GetBacklogStatsAsync, ReadPendingAsync, ReadPendingSinceAsync,
// ReadForwardedAsync). InitializeSchema set journal_mode=WAL on the
// writer connection, which is a database-level setting that persists
// for the file — subsequent connections to the same file see WAL and
// can read concurrently with the writer without taking _writeLock.
// Reuse the same connection string so the read connection sees the
// same Data Source / Cache settings as the writer.
_readConnection = new SqliteConnection(connectionString);
_readConnection.Open();
// PRAGMA foreign_keys is a per-connection setting. Set it on the read
// connection as well so that any future read-path change (e.g. a
// DELETE that may be added later) also benefits from FK enforcement.
// Pure SELECT queries are unaffected — this is defensive belt-and-
// suspenders for the read connection.
using (var pragmaCmd = _readConnection.CreateCommand())
{
pragmaCmd.CommandText = "PRAGMA foreign_keys = ON";
pragmaCmd.ExecuteNonQuery();
}
_writeQueue = Channel.CreateBounded<PendingAuditEvent>(
new BoundedChannelOptions(_options.ChannelCapacity)
{
// The hot-path enqueue must back-pressure if the background
// writer falls behind; a higher-level fallback (Bundle B-T4)
// handles truly catastrophic primary failure with a drop-oldest
// ring buffer.
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
});
_writerLoop = Task.Run(ProcessWriteQueueAsync);
}
private void InitializeSchema()
{
// auto_vacuum must be set before any table is created for it to take
// effect on a fresh database. INCREMENTAL lets a future
// `PRAGMA incremental_vacuum` shrink the file after the 7-day retention
// purge — see alog.md §10.
using (var pragmaCmd = _connection.CreateCommand())
{
pragmaCmd.CommandText = "PRAGMA auto_vacuum = INCREMENTAL";
pragmaCmd.ExecuteNonQuery();
}
// AuditLog-005: enable WAL so a second connection on the same file can
// serve read-only callers (GetBacklogStatsAsync, ReadPendingAsync,
// ReadPendingSinceAsync, ReadForwardedAsync) concurrently with the
// batched writer, decoupling those reads from _writeLock. WAL is a
// database-level setting persisted in the file header; setting it on
// the writer connection means every connection opened to the file
// afterwards inherits WAL behaviour. PRAGMA journal_mode returns the
// mode actually adopted ("memory" for ":memory:" / shared-cache memory
// mode, "wal" for file-backed) — we don't error if WAL was rejected
// because the read connection's correctness does not depend on WAL
// itself, only its concurrency advantage does.
using (var pragmaCmd = _connection.CreateCommand())
{
pragmaCmd.CommandText = "PRAGMA journal_mode = WAL";
pragmaCmd.ExecuteNonQuery();
}
// Enable FK enforcement on the WRITE connection. PRAGMA foreign_keys is
// a per-connection, per-session setting in SQLite — it is NOT persisted
// in the database file, so every new connection that may INSERT into
// audit_forward_state must set it for the FK
// audit_forward_state.EventId → audit_event.EventId
// to be a real runtime guard rather than decorative DDL. The write
// connection owns all INSERTs (and the MarkForwardedAsync /
// MarkReconciledAsync UPDATEs), so setting it here — after WAL is
// established, before the CREATE TABLEs — ensures the FK is live for
// every insert that follows. The existing insert order (audit_event
// first, then audit_forward_state, inside the same transaction) already
// satisfies the FK, so no pre-existing rows can violate the constraint.
using (var pragmaCmd = _connection.CreateCommand())
{
pragmaCmd.CommandText = "PRAGMA foreign_keys = ON";
pragmaCmd.ExecuteNonQuery();
}
// C4 (Task 2.5) — in-place reset. The site store is EPHEMERAL (≈7-day
// retention, recreated per deployment), so we do NOT migrate the old
// single 24-column AuditLog table to the new two-table shape: any rows
// it holds are within the retention window and discarded. DROP it if a
// pre-C4 deployment left it behind, then CREATE the two new tables. This
// is safe precisely BECAUSE the site store is ephemeral — never do this
// on a durable store (the central SQL Server side keeps its shim until
// C5 and is migrated, not reset).
using (var dropCmd = _connection.CreateCommand())
{
dropCmd.CommandText = "DROP TABLE IF EXISTS AuditLog;";
dropCmd.ExecuteNonQuery();
}
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
-- Canonical, append-only / write-once: the 10 fields of the canonical
-- ZB.MOM.WW.Audit.AuditEvent stored directly (DetailsJson carries the
-- ScadaBridge domain fields). No forwarding state lives here that is
-- the audit_forward_state sidecar's concern.
CREATE TABLE IF NOT EXISTS audit_event (
EventId TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
Actor TEXT NOT NULL,
Action TEXT NOT NULL,
Outcome TEXT NOT NULL,
Category TEXT NULL,
Target TEXT NULL,
SourceNode TEXT NULL,
CorrelationId TEXT NULL,
DetailsJson TEXT NULL,
PRIMARY KEY (EventId)
);
-- Operational, mutable: the forwarding lifecycle for each canonical
-- row. OccurredAtUtc is duplicated here so the drain range-scan stays
-- on this one table's index; IsCachedKind is precomputed at insert so
-- the cached/non-cached drain split never re-parses DetailsJson on the
-- read hot-path.
CREATE TABLE IF NOT EXISTS audit_forward_state (
EventId TEXT NOT NULL,
ForwardState TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
IsCachedKind INTEGER NOT NULL,
AttemptCount INTEGER NOT NULL DEFAULT 0,
LastAttemptUtc TEXT NULL,
PRIMARY KEY (EventId),
FOREIGN KEY (EventId) REFERENCES audit_event(EventId)
);
-- Drain index: every read filters on (ForwardState, IsCachedKind) and
-- range-scans/orders by OccurredAtUtc, so this composite covers the
-- four reads + the backlog COUNT/MIN.
CREATE INDEX IF NOT EXISTS IX_fwd
ON audit_forward_state (ForwardState, IsCachedKind, OccurredAtUtc);
""";
cmd.ExecuteNonQuery();
}
/// <inheritdoc />
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(evt);
// The canonical record carries no ForwardState (a site-storage-only
// concern). Site rows always start Pending; the sidecar row is written
// alongside the canonical row in the same transaction.
var pending = new PendingAuditEvent(evt, AuditForwardState.Pending);
// CreateBounded(FullMode=Wait) means WriteAsync will await room rather
// than throw when full — exactly the hot-path back-pressure semantics
// we want.
if (!_writeQueue.Writer.TryWrite(pending))
{
// The writer is either completed (logger disposed) or the channel
// is at capacity. Fall back to the async path which honours the
// FullMode=Wait policy.
return WriteSlowPathAsync(pending, ct);
}
return pending.Completion.Task;
}
private async Task WriteSlowPathAsync(PendingAuditEvent pending, CancellationToken ct)
{
try
{
await _writeQueue.Writer.WriteAsync(pending, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
pending.Completion.TrySetException(
new ObjectDisposedException(nameof(SqliteAuditWriter),
"Event could not be recorded: the audit writer has been disposed."));
}
await pending.Completion.Task.ConfigureAwait(false);
}
private async Task ProcessWriteQueueAsync()
{
var batch = new List<PendingAuditEvent>(_options.BatchSize);
// ReadAllAsync completes when the channel is marked complete (Dispose).
await foreach (var first in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false))
{
batch.Clear();
batch.Add(first);
// Pull additional ready events up to BatchSize. TryRead is non-
// blocking and lets us amortise the transaction overhead across a
// burst of concurrent enqueues.
while (batch.Count < _options.BatchSize &&
_writeQueue.Reader.TryRead(out var next))
{
batch.Add(next);
}
FlushBatch(batch);
}
}
private void FlushBatch(IReadOnlyList<PendingAuditEvent> batch)
{
lock (_writeLock)
{
if (_disposed)
{
foreach (var pending in batch)
{
pending.Completion.TrySetException(
new ObjectDisposedException(nameof(SqliteAuditWriter),
"Event could not be recorded: the audit writer was disposed before the write completed."));
}
return;
}
using var transaction = _connection.BeginTransaction();
try
{
// INSERT 1: the canonical row, stored DIRECTLY (the 10 canonical
// fields straight off the AuditEvent — no Decompose; audit_event
// holds canonical shape, not the legacy 24-column shape).
using var eventCmd = _connection.CreateCommand();
eventCmd.Transaction = transaction;
eventCmd.CommandText = """
INSERT INTO audit_event (
EventId, OccurredAtUtc, Actor, Action, Outcome,
Category, Target, SourceNode, CorrelationId, DetailsJson
) VALUES (
$EventId, $OccurredAtUtc, $Actor, $Action, $Outcome,
$Category, $Target, $SourceNode, $CorrelationId, $DetailsJson
);
""";
var eEventId = eventCmd.Parameters.Add("$EventId", SqliteType.Text);
var eOccurredAt = eventCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var eActor = eventCmd.Parameters.Add("$Actor", SqliteType.Text);
var eAction = eventCmd.Parameters.Add("$Action", SqliteType.Text);
var eOutcome = eventCmd.Parameters.Add("$Outcome", SqliteType.Text);
var eCategory = eventCmd.Parameters.Add("$Category", SqliteType.Text);
var eTarget = eventCmd.Parameters.Add("$Target", SqliteType.Text);
var eSourceNode = eventCmd.Parameters.Add("$SourceNode", SqliteType.Text);
var eCorrelationId = eventCmd.Parameters.Add("$CorrelationId", SqliteType.Text);
var eDetailsJson = eventCmd.Parameters.Add("$DetailsJson", SqliteType.Text);
// INSERT 2: the operational sidecar row. ForwardState=Pending,
// OccurredAtUtc duplicated for the drain index, IsCachedKind
// precomputed (so the read split never parses DetailsJson),
// AttemptCount=0, LastAttemptUtc=NULL.
using var fwdCmd = _connection.CreateCommand();
fwdCmd.Transaction = transaction;
fwdCmd.CommandText = """
INSERT INTO audit_forward_state (
EventId, ForwardState, OccurredAtUtc, IsCachedKind, AttemptCount, LastAttemptUtc
) VALUES (
$EventId, $ForwardState, $OccurredAtUtc, $IsCachedKind, 0, NULL
);
""";
var fEventId = fwdCmd.Parameters.Add("$EventId", SqliteType.Text);
var fForwardState = fwdCmd.Parameters.Add("$ForwardState", SqliteType.Text);
var fOccurredAt = fwdCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var fIsCachedKind = fwdCmd.Parameters.Add("$IsCachedKind", SqliteType.Integer);
foreach (var pending in batch)
{
var evt = pending.Event;
// Canonical OccurredAtUtc is UTC by construction; store the
// round-trip "o" form so string comparison stays monotonic
// (the drain range-scan and ORDER BY rely on it).
var occurredText = evt.OccurredAtUtc.UtcDateTime.ToString(
"o", System.Globalization.CultureInfo.InvariantCulture);
eEventId.Value = evt.EventId.ToString();
eOccurredAt.Value = occurredText;
// Canonical Actor is a required non-null string.
eActor.Value = evt.Actor ?? string.Empty;
eAction.Value = evt.Action;
eOutcome.Value = evt.Outcome.ToString();
eCategory.Value = (object?)evt.Category ?? DBNull.Value;
eTarget.Value = (object?)evt.Target ?? DBNull.Value;
// SourceNode-stamping: caller-provided value wins (preserves
// rows reconciled in from other nodes via the same writer);
// otherwise stamp from the local INodeIdentityProvider. The
// event record itself is NOT mutated — stamping is at write
// time only. If the provider also returns null (unconfigured
// node), the column stays NULL — operators see "needs config"
// via the schema, not a magic fallback string.
var sourceNode = evt.SourceNode ?? _nodeIdentity.NodeName;
eSourceNode.Value = (object?)sourceNode ?? DBNull.Value;
eCorrelationId.Value = (object?)evt.CorrelationId?.ToString() ?? DBNull.Value;
eDetailsJson.Value = (object?)evt.DetailsJson ?? DBNull.Value;
fEventId.Value = evt.EventId.ToString();
fForwardState.Value = pending.ForwardState.ToString();
fOccurredAt.Value = occurredText;
fIsCachedKind.Value = IsCachedKind(evt.DetailsJson) ? 1 : 0;
try
{
eventCmd.ExecuteNonQuery();
fwdCmd.ExecuteNonQuery();
pending.Completion.TrySetResult();
}
catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
{
// Duplicate EventId — first-write-wins (alog.md §11). The
// audit_event PRIMARY KEY throws before the sidecar insert
// runs, so neither table gains a second row. Treat as
// success: the lifecycle event is durably recorded under
// the first writer's payload.
_logger.LogDebug(ex,
"Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
evt.EventId);
pending.Completion.TrySetResult();
}
}
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
_logger.LogError(ex, "SqliteAuditWriter batch insert failed; faulting {Count} pending events", batch.Count);
foreach (var pending in batch)
{
pending.Completion.TrySetException(ex);
}
}
}
}
// AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry
// drain (joined with the operational tracking row + pushed via
// IngestCachedTelemetryAsync into the central dual-write transaction).
// C4: this is the SAME set the pre-C4 ReadPendingCachedTelemetryAsync query
// filtered on (Kind IN (...)); it is now precomputed into the sidecar's
// IsCachedKind flag at INSERT (see IsCachedKind) so the read split is a cheap
// integer predicate, not a JSON parse. ReadPendingAsync drains everything
// with IsCachedKind=0; ReadPendingCachedTelemetryAsync drains IsCachedKind=1.
private static readonly HashSet<AuditKind> CachedTelemetryKinds = new()
{
AuditKind.CachedSubmit,
AuditKind.ApiCallCached,
AuditKind.DbWriteCached,
AuditKind.CachedResolve,
};
/// <summary>
/// C4: precomputes the sidecar's <c>IsCachedKind</c> flag from a canonical
/// row's <c>DetailsJson</c>. Parses the <see cref="AuditDetails.Kind"/>
/// discriminator via <see cref="AuditDetailsCodec"/> and returns <c>true</c>
/// iff it is one of the cached-lifecycle kinds
/// (<see cref="AuditKind.CachedSubmit"/>, <see cref="AuditKind.ApiCallCached"/>,
/// <see cref="AuditKind.DbWriteCached"/>, <see cref="AuditKind.CachedResolve"/>).
/// Runs once per event at INSERT time so the cached/non-cached drain split is
/// a cheap integer predicate on read, never a JSON parse on the hot path.
/// </summary>
private static bool IsCachedKind(string? detailsJson)
{
var details = AuditDetailsCodec.Deserialize(detailsJson);
var kind = AuditRowProjection.ParseEnum(details.Kind, AuditKind.InboundRequest);
return CachedTelemetryKinds.Contains(kind);
}
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// AuditLog-005: read via the dedicated _readConnection so this scan
// (which can be expensive when the backlog grows under a central
// outage) does not block the batched writer on _writeLock. WAL mode
// gives us a stable snapshot of the tables while writes proceed on the
// writer connection. _readLock serialises this connection across
// multiple concurrent read callers since SqliteConnection itself is
// not thread-safe.
// C4: JOIN the sidecar and filter on IsCachedKind=0 — the cached-
// lifecycle kinds (IsCachedKind=1) flow through
// ReadPendingCachedTelemetryAsync + the combined-telemetry drain. The
// split is a precomputed integer predicate on the indexed sidecar, not
// a DetailsJson parse. Ordering is by the sidecar's OccurredAtUtc with
// EventId as the deterministic tiebreaker.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $pending
AND fs.IsCachedKind = 0
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
return Task.FromResult(ReadRows(cmd, limit));
}
}
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingCachedTelemetryAsync(
int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// AuditLog-001 / C4: dedicated read surface for the cached-call lifecycle
// drain — symmetric to ReadPendingAsync but filtered to IsCachedKind=1.
// Same _readConnection + _readLock pattern so the hot-path writer is not
// contended.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $pending
AND fs.IsCachedKind = 1
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
return Task.FromResult(ReadRows(cmd, limit));
}
}
/// <summary>
/// Returns up to <paramref name="limit"/> rows in
/// <see cref="AuditForwardState.Forwarded"/>, oldest
/// <see cref="AuditEvent.OccurredAtUtc"/> first, with
/// <see cref="AuditEvent.EventId"/> as the deterministic tiebreaker. The
/// <see cref="AuditForwardState.Forwarded"/>-specific counterpart of
/// <see cref="ReadPendingAsync"/>; used by tests to assert a row reached the
/// <see cref="AuditForwardState.Forwarded"/> state specifically (unlike
/// <see cref="ReadPendingSinceAsync"/>, which also returns
/// <see cref="AuditForwardState.Pending"/> rows).
/// </summary>
/// <param name="limit">Maximum number of rows to return.</param>
/// <param name="ct">Cancellation token.</param>
public Task<IReadOnlyList<AuditEvent>> ReadForwardedAsync(int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// AuditLog-005: mirror ReadPendingAsync — read via _readConnection /
// _readLock so this query never contends with the batched writer on
// _writeLock. C4: JOIN the sidecar and filter on ForwardState='Forwarded'
// (no IsCachedKind split — both cached and non-cached Forwarded rows are
// returned, as before).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $forwarded
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
return Task.FromResult(ReadRows(cmd, limit));
}
}
/// <inheritdoc />
public Task MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(eventIds);
if (eventIds.Count == 0)
{
return Task.CompletedTask;
}
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
// C4: flip the sidecar — UPDATE audit_forward_state, not the canonical
// audit_event (which is append-only / write-once). Bump AttemptCount +
// stamp LastAttemptUtc so operators can see how many drain passes a row
// took to forward. Build a single IN (...) parameter list so we issue
// one UPDATE per batch regardless of size. Each id is bound as its own
// parameter, so no string concatenation of user data ever enters the SQL.
//
// Defensive state guard: only transition rows that are still Pending or
// Forwarded (i.e. not yet Reconciled). Without this guard a mis-called
// batch that includes a Reconciled EventId would silently demote it back
// to Forwarded — a state regression that would cause duplicate central
// ingestion. Symmetric with MarkReconciledAsync's
// WHERE ForwardState IN ($pending, $forwarded)
// guard. Current callers only pass Pending IDs, so normal-path behaviour
// is unchanged; the guard is purely defensive.
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE audit_forward_state SET ForwardState = $forwarded, ")
.Append("AttemptCount = AttemptCount + 1, LastAttemptUtc = $now ")
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
var p = $"$id{i}";
sb.Append(p);
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
}
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$now", DateTime.UtcNow.ToString(
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
}
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingSinceAsync(
DateTime sinceUtc, int batchSize, CancellationToken ct = default)
{
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), "batchSize must be > 0.");
}
// AuditLog-005: read via _readConnection / _readLock — same lock-
// decoupling as ReadPendingAsync. C4: JOIN the sidecar; the range scan
// is on the sidecar's duplicated OccurredAtUtc so it stays on IX_fwd.
// Both Pending and Forwarded rows are returned (the central reconciliation
// puller dedups on EventId; re-shipping a Forwarded-but-not-yet-ingested
// row is safe).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState IN ($pending, $forwarded)
AND fs.OccurredAtUtc >= $since
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
// Normalise to UTC ISO-8601 round-trip format to match how OccurredAtUtc
// is stored on insert ("o" format) — string comparison is monotonic for
// that encoding so we can index-scan against it.
cmd.Parameters.AddWithValue("$since", EnsureUtc(sinceUtc).ToString(
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.Parameters.AddWithValue("$limit", batchSize);
return Task.FromResult(ReadRows(cmd, batchSize));
}
}
/// <inheritdoc />
public Task MarkReconciledAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(eventIds);
if (eventIds.Count == 0)
{
return Task.CompletedTask;
}
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
// C4: flip the sidecar from Pending/Forwarded → Reconciled. Rows
// already Reconciled are left untouched (idempotent re-call), and the
// canonical audit_event row is never modified.
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE audit_forward_state SET ForwardState = $reconciled ")
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
var p = $"$id{i}";
sb.Append(p);
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
}
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$reconciled", AuditForwardState.Reconciled.ToString());
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
}
/// <inheritdoc />
public Task<SiteAuditBacklogSnapshot> GetBacklogStatsAsync(CancellationToken ct = default)
{
int pendingCount;
DateTime? oldestPending;
// AuditLog-005: read via the dedicated _readConnection (under
// _readLock) so this probe — polled every 30 s by SiteAuditBacklogReporter
// — never blocks the batched hot-path writer on _writeLock. Under a
// central outage the Pending backlog can grow to hundreds of thousands
// of rows and the COUNT(*) scan correspondingly stretches; that no
// longer adds tail latency to user-facing audit writes.
// C4: count over the sidecar (audit_forward_state) — the canonical
// audit_event table carries no ForwardState. The IX_fwd index makes both
// aggregates cheap (count is a covering scan, min is the first key).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT COUNT(*), MIN(OccurredAtUtc)
FROM audit_forward_state
WHERE ForwardState = $pending;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
using var reader = cmd.ExecuteReader();
reader.Read();
pendingCount = reader.GetInt32(0);
oldestPending = reader.IsDBNull(1)
? null
: DateTime.Parse(reader.GetString(1),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.RoundtripKind);
}
// File-size lookup outside the lock — the DatabasePath option is the
// canonical source. The connection-string-override branch (used by
// some tests) keeps the same DatabasePath value, so this works
// uniformly. In-memory / mode=memory paths return 0 because the file
// doesn't exist on disk.
long onDiskBytes = 0;
try
{
if (!string.IsNullOrEmpty(_options.DatabasePath) &&
!_options.DatabasePath.StartsWith(":memory:", StringComparison.Ordinal) &&
!_options.DatabasePath.Contains("mode=memory", StringComparison.OrdinalIgnoreCase) &&
File.Exists(_options.DatabasePath))
{
onDiskBytes = new FileInfo(_options.DatabasePath).Length;
}
}
catch (Exception ex)
{
// File system probe is a best-effort health-metric — never abort
// a backlog snapshot because stat() failed. Log and report 0.
_logger.LogDebug(ex,
"SqliteAuditWriter could not stat DB path {Path} for backlog snapshot.",
_options.DatabasePath);
}
return Task.FromResult(new SiteAuditBacklogSnapshot(
PendingCount: pendingCount,
OldestPendingUtc: oldestPending,
OnDiskBytes: onDiskBytes));
}
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
/// <summary>
/// Executes <paramref name="cmd"/> (one of the four reads, each already
/// projecting the 10 <c>audit_event</c> columns in canonical order) and
/// materialises the rows via <see cref="MapRow"/>.
/// </summary>
private static IReadOnlyList<AuditEvent> ReadRows(SqliteCommand cmd, int capacityHint)
{
var rows = new List<AuditEvent>(Math.Min(capacityHint, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return rows;
}
/// <summary>
/// C4: builds the canonical <see cref="AuditEvent"/> DIRECTLY from the 10
/// stored <c>audit_event</c> columns — no 24-column <c>Recompose</c>, because
/// <c>audit_event</c> already holds the canonical fields + <c>DetailsJson</c>.
/// <c>Outcome</c> is stored as the enum's name; the safe
/// <see cref="AuditRowProjection.ParseEnum{TEnum}"/> degrades an unknown/renamed
/// value gracefully rather than throwing.
/// </summary>
private static AuditEvent MapRow(SqliteDataReader reader)
{
return new AuditEvent
{
EventId = Guid.Parse(reader.GetString(0)),
OccurredAtUtc = new DateTimeOffset(DateTime.SpecifyKind(
DateTime.Parse(reader.GetString(1),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.RoundtripKind),
DateTimeKind.Utc)),
Actor = reader.GetString(2),
Action = reader.GetString(3),
Outcome = AuditRowProjection.ParseEnum(reader.GetString(4), AuditOutcome.Success),
Category = reader.IsDBNull(5) ? null : reader.GetString(5),
Target = reader.IsDBNull(6) ? null : reader.GetString(6),
SourceNode = reader.IsDBNull(7) ? null : reader.GetString(7),
CorrelationId = reader.IsDBNull(8) ? null : Guid.Parse(reader.GetString(8)),
DetailsJson = reader.IsDBNull(9) ? null : reader.GetString(9),
};
}
/// <summary>
/// Disposes the audit writer and releases resources.
/// </summary>
/// <remarks>
/// AuditLog-006: prefer <see cref="DisposeAsync"/> when possible (DI honours
/// <see cref="IAsyncDisposable"/> on singletons). The sync path remains for
/// callers that only know about <see cref="IDisposable"/> (e.g. legacy
/// composition roots, <c>using</c> statements without <c>await</c>). To
/// avoid the classic sync-over-async deadlock on a captured
/// <see cref="SynchronizationContext"/> (ASP.NET request thread, Akka
/// dispatcher under some configurations), we hop to the thread pool via
/// <see cref="Task.Run(Func{Task})"/> before blocking on the result — the
/// async continuation inside <see cref="DisposeAsync"/> then resumes on a
/// pool thread with no captured context, so <c>GetResult()</c> never waits
/// on the very thread the continuation needs.
/// </remarks>
public void Dispose()
{
Task.Run(async () => await DisposeAsync().ConfigureAwait(false))
.GetAwaiter().GetResult();
}
/// <summary>Asynchronously disposes the audit writer and releases resources.</summary>
public async ValueTask DisposeAsync()
{
Task? writerLoop;
lock (_writeLock)
{
if (_disposed) return;
// Stop accepting new events. Completing the channel writer is the
// shutdown signal: WriteAsync calls observe the completion and
// fault, and the writer loop drains any already-buffered items
// before exiting. _disposed is intentionally NOT set here — it
// flips only after the loop has fully drained (second lock block
// below), so FlushBatch's existing _disposed check guards the
// post-drain window when the connection is about to close.
_writeQueue.Writer.TryComplete();
writerLoop = _writerLoop;
}
// Wait outside the lock — the loop reacquires it for each batch.
try
{
if (writerLoop is not null)
{
await writerLoop.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
}
}
catch (TimeoutException)
{
_logger.LogWarning("SqliteAuditWriter writer loop did not drain within 5s of dispose.");
}
catch (Exception ex)
{
// The loop's per-batch try/catch already routed individual failures
// to pending TCSes; a top-level fault here is unexpected.
_logger.LogError(ex, "SqliteAuditWriter writer loop faulted during dispose.");
}
lock (_writeLock)
{
if (_disposed) return;
_disposed = true;
_connection.Dispose();
}
// AuditLog-005: dispose the dedicated read connection after the writer
// is fully drained and closed. _readLock is taken to fence out any
// in-flight read caller that grabbed the lock before _disposed flipped
// — they observe ObjectDisposedException on the next attempt.
lock (_readLock)
{
_readConnection.Dispose();
}
}
/// <summary>An audit event awaiting persistence by the background writer.</summary>
private sealed class PendingAuditEvent
{
/// <summary>Initializes a new instance of the PendingAuditEvent class.</summary>
/// <param name="evt">The canonical audit event to persist.</param>
/// <param name="forwardState">Initial site-local forwarding state written to the sidecar row (always Pending for fresh events).</param>
public PendingAuditEvent(AuditEvent evt, AuditForwardState forwardState)
{
Event = evt;
ForwardState = forwardState;
Completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}
/// <summary>The canonical audit event to persist.</summary>
public AuditEvent Event { get; }
/// <summary>Initial forwarding state for this row's sidecar (bound to audit_forward_state.ForwardState).</summary>
public AuditForwardState ForwardState { get; }
/// <summary>Task completion source for write completion signaling.</summary>
public TaskCompletionSource Completion { get; }
}
}