feat(audit): ScadaBridge C4 — site SQLite two-table (audit_event canonical + audit_forward_state sidecar), forwarding on sidecar, IsCachedKind drain split (Task 2.5)

This commit is contained in:
Joseph Doherty
2026-06-02 13:11:20 -04:00
parent c27b2c3d5f
commit 946d3e2aef
3 changed files with 689 additions and 704 deletions
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
using AuditOutcome = ZB.MOM.WW.Audit.AuditOutcome;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site;
@@ -19,15 +20,27 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site;
/// </summary>
/// <remarks>
/// <para>
/// The schema is bootstrapped in the constructor (Bundle B-T1). The
/// Channel-based <see cref="WriteAsync"/> hot-path + Bundle D
/// <see cref="ReadPendingAsync"/> / <see cref="MarkForwardedAsync"/> support
/// surface are wired in Bundle B-T2.
/// <b>C4 (Task 2.5) — two-table schema.</b> The site store is now two tables:
/// the append-only canonical <c>audit_event</c> (the 10 canonical
/// <see cref="AuditEvent"/> fields stored directly — NO 24-column decompose) and
/// the mutable operational <c>audit_forward_state</c> sidecar that carries the
/// forwarding lifecycle (<see cref="AuditForwardState"/>), a duplicated
/// <c>OccurredAtUtc</c> for the drain index range-scan, a precomputed
/// <c>IsCachedKind</c> flag that drives the cached/non-cached drain split without
/// re-parsing <c>DetailsJson</c> on the read hot-path, plus attempt bookkeeping.
/// </para>
/// <para>
/// <b>Ephemeral reset.</b> The site SQLite store is ephemeral (≈7-day retention,
/// recreated per deployment), so C4's schema change is an in-place RESET: the new
/// tables are created and the old single 24-column <c>AuditLog</c> table is
/// DROP-ped if present. No SQLite data migration is performed (and none is
/// needed) — any rows in a pre-C4 <c>AuditLog</c> table are within the retention
/// window and are discarded by the drop.
/// </para>
/// <para>
/// Site rows always carry <see cref="AuditForwardState.Pending"/> on first
/// insert; the central row-shape's <c>IngestedAtUtc</c> column does NOT live in
/// the site SQLite schema — central stamps it on ingest.
/// insert; the central row-shape's <c>IngestedAtUtc</c> is a DetailsJson field
/// stamped by central on ingest, not a site column.
/// </para>
/// </remarks>
public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable
@@ -36,8 +49,10 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
// is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
// surfaced across all SQLite builds. We treat any constraint error on insert
// as a duplicate-eventid race and swallow it (first-write-wins) — the index
// on EventId is the only constraint on this table, so this scope is precise.
// as a duplicate-eventid race and swallow it (first-write-wins) — the PRIMARY
// KEY on audit_event.EventId is the constraint that fires first, so this scope
// is precise (the sidecar insert for the same EventId is in the same
// transaction and never reached once audit_event's insert throws).
private const int SqliteErrorConstraint = 19;
private readonly SqliteConnection _connection;
@@ -141,95 +156,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
pragmaCmd.ExecuteNonQuery();
}
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS AuditLog (
EventId TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
Channel TEXT NOT NULL,
Kind TEXT NOT NULL,
CorrelationId TEXT NULL,
SourceSiteId TEXT NULL,
SourceNode TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL,
Actor TEXT NULL,
Target TEXT NULL,
Status TEXT NOT NULL,
HttpStatus INTEGER NULL,
DurationMs INTEGER NULL,
ErrorMessage TEXT NULL,
ErrorDetail TEXT NULL,
RequestSummary TEXT NULL,
ResponseSummary TEXT NULL,
PayloadTruncated INTEGER NOT NULL,
Extra TEXT NULL,
ForwardState TEXT NOT NULL,
ExecutionId TEXT NULL,
ParentExecutionId TEXT NULL,
PRIMARY KEY (EventId)
);
CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
ON AuditLog (ForwardState, OccurredAtUtc);
""";
cmd.ExecuteNonQuery();
// Audit Log #23 (ExecutionId): additively add the ExecutionId column.
// CREATE TABLE IF NOT EXISTS above does NOT add columns to an AuditLog
// table that already exists from a pre-ExecutionId build, so an
// auditlog.db created by an older build needs the column ALTER-ed in.
// The file is durable across restart/failover by design (7-day
// retention), so without this step every WriteAsync on an upgraded
// deployment would bind $ExecutionId against a missing column and the
// best-effort write path would silently drop every site audit row.
// SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is
// probed first and the ALTER skipped when already there. The column is
// nullable with no default, so any row written before this migration
// reads back ExecutionId = null (back-compat).
AddColumnIfMissing("ExecutionId", "TEXT NULL");
// Audit Log #23 (ParentExecutionId): same idempotent upgrade path as
// ExecutionId above. A deployment that already ran the ExecutionId
// branch has an auditlog.db with the 21-column schema and no
// ParentExecutionId column; CREATE TABLE IF NOT EXISTS cannot add it,
// so it is ALTER-ed in here. Nullable with no default — rows written
// before this migration read back ParentExecutionId = null.
AddColumnIfMissing("ParentExecutionId", "TEXT NULL");
// SourceNode stamping: same idempotent upgrade path as ExecutionId /
// ParentExecutionId above. A deployment that already ran the
// ParentExecutionId branch has an auditlog.db with the 22-column
// schema and no SourceNode column; CREATE TABLE IF NOT EXISTS cannot
// add it, so it is ALTER-ed in here. Nullable with no default — rows
// written before this migration read back SourceNode = null.
AddColumnIfMissing("SourceNode", "TEXT NULL");
}
/// <summary>
/// Audit Log #23: additively adds a column to <c>AuditLog</c> only when
/// it is not already present (used for <c>ExecutionId</c> and
/// <c>ParentExecutionId</c>). SQLite lacks <c>ADD COLUMN IF NOT EXISTS</c>,
/// so the schema is probed via <c>PRAGMA table_info</c> first. Idempotent —
/// safe to run on every <see cref="InitializeSchema"/>. Mirrors
/// <c>StoreAndForwardStorage.AddColumnIfMissingAsync</c>; kept synchronous
/// here to match the rest of this writer's bootstrap DDL.
/// </summary>
private void AddColumnIfMissing(string columnName, string columnDefinition)
{
using var probe = _connection.CreateCommand();
probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name";
probe.Parameters.AddWithValue("$name", columnName);
var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
if (exists)
// C4 (Task 2.5) — in-place reset. The site store is EPHEMERAL (≈7-day
// retention, recreated per deployment), so we do NOT migrate the old
// single 24-column AuditLog table to the new two-table shape: any rows
// it holds are within the retention window and discarded. DROP it if a
// pre-C4 deployment left it behind, then CREATE the two new tables. This
// is safe precisely BECAUSE the site store is ephemeral — never do this
// on a durable store (the central SQL Server side keeps its shim until
// C5 and is migrated, not reset).
using (var dropCmd = _connection.CreateCommand())
{
return;
dropCmd.CommandText = "DROP TABLE IF EXISTS AuditLog;";
dropCmd.ExecuteNonQuery();
}
using var alter = _connection.CreateCommand();
// Column name + definition are caller-controlled constants, never user
// input — safe to interpolate (parameters are not permitted in DDL).
alter.CommandText = $"ALTER TABLE AuditLog ADD COLUMN {columnName} {columnDefinition}";
alter.ExecuteNonQuery();
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
-- Canonical, append-only / write-once: the 10 fields of the canonical
-- ZB.MOM.WW.Audit.AuditEvent stored directly (DetailsJson carries the
-- ScadaBridge domain fields). No forwarding state lives here that is
-- the audit_forward_state sidecar's concern.
CREATE TABLE IF NOT EXISTS audit_event (
EventId TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
Actor TEXT NOT NULL,
Action TEXT NOT NULL,
Outcome TEXT NOT NULL,
Category TEXT NULL,
Target TEXT NULL,
SourceNode TEXT NULL,
CorrelationId TEXT NULL,
DetailsJson TEXT NULL,
PRIMARY KEY (EventId)
);
-- Operational, mutable: the forwarding lifecycle for each canonical
-- row. OccurredAtUtc is duplicated here so the drain range-scan stays
-- on this one table's index; IsCachedKind is precomputed at insert so
-- the cached/non-cached drain split never re-parses DetailsJson on the
-- read hot-path.
CREATE TABLE IF NOT EXISTS audit_forward_state (
EventId TEXT NOT NULL,
ForwardState TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
IsCachedKind INTEGER NOT NULL,
AttemptCount INTEGER NOT NULL DEFAULT 0,
LastAttemptUtc TEXT NULL,
PRIMARY KEY (EventId),
FOREIGN KEY (EventId) REFERENCES audit_event(EventId)
);
-- Drain index: every read filters on (ForwardState, IsCachedKind) and
-- range-scans/orders by OccurredAtUtc, so this composite covers the
-- four reads + the backlog COUNT/MIN.
CREATE INDEX IF NOT EXISTS IX_fwd
ON audit_forward_state (ForwardState, IsCachedKind, OccurredAtUtc);
""";
cmd.ExecuteNonQuery();
}
/// <inheritdoc />
@@ -237,9 +220,9 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
{
ArgumentNullException.ThrowIfNull(evt);
// C3 transitional shim: the canonical record carries no ForwardState
// (a site-storage-only concern). Site rows always start Pending; the
// forwarding columns + queries are unchanged from the 24-column schema.
// The canonical record carries no ForwardState (a site-storage-only
// concern). Site rows always start Pending; the sidecar row is written
// alongside the canonical row in the same transaction.
var pending = new PendingAuditEvent(evt, AuditForwardState.Pending);
// CreateBounded(FullMode=Wait) means WriteAsync will await room rather
@@ -313,101 +296,99 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
using var transaction = _connection.BeginTransaction();
try
{
using var cmd = _connection.CreateCommand();
cmd.Transaction = transaction;
cmd.CommandText = """
INSERT INTO AuditLog (
EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
// INSERT 1: the canonical row, stored DIRECTLY (the 10 canonical
// fields straight off the AuditEvent — no Decompose; audit_event
// holds canonical shape, not the legacy 24-column shape).
using var eventCmd = _connection.CreateCommand();
eventCmd.Transaction = transaction;
eventCmd.CommandText = """
INSERT INTO audit_event (
EventId, OccurredAtUtc, Actor, Action, Outcome,
Category, Target, SourceNode, CorrelationId, DetailsJson
) VALUES (
$EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId,
$SourceSiteId, $SourceNode, $SourceInstanceId, $SourceScript, $Actor, $Target,
$Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail,
$RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState,
$ExecutionId, $ParentExecutionId
$EventId, $OccurredAtUtc, $Actor, $Action, $Outcome,
$Category, $Target, $SourceNode, $CorrelationId, $DetailsJson
);
""";
var eEventId = eventCmd.Parameters.Add("$EventId", SqliteType.Text);
var eOccurredAt = eventCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var eActor = eventCmd.Parameters.Add("$Actor", SqliteType.Text);
var eAction = eventCmd.Parameters.Add("$Action", SqliteType.Text);
var eOutcome = eventCmd.Parameters.Add("$Outcome", SqliteType.Text);
var eCategory = eventCmd.Parameters.Add("$Category", SqliteType.Text);
var eTarget = eventCmd.Parameters.Add("$Target", SqliteType.Text);
var eSourceNode = eventCmd.Parameters.Add("$SourceNode", SqliteType.Text);
var eCorrelationId = eventCmd.Parameters.Add("$CorrelationId", SqliteType.Text);
var eDetailsJson = eventCmd.Parameters.Add("$DetailsJson", SqliteType.Text);
var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text);
var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text);
var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text);
var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text);
var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text);
var pSourceNode = cmd.Parameters.Add("$SourceNode", SqliteType.Text);
var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text);
var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text);
var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text);
var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text);
var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text);
var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer);
var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer);
var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text);
var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text);
var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text);
var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text);
var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer);
var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text);
var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text);
var pExecutionId = cmd.Parameters.Add("$ExecutionId", SqliteType.Text);
var pParentExecutionId = cmd.Parameters.Add("$ParentExecutionId", SqliteType.Text);
// INSERT 2: the operational sidecar row. ForwardState=Pending,
// OccurredAtUtc duplicated for the drain index, IsCachedKind
// precomputed (so the read split never parses DetailsJson),
// AttemptCount=0, LastAttemptUtc=NULL.
using var fwdCmd = _connection.CreateCommand();
fwdCmd.Transaction = transaction;
fwdCmd.CommandText = """
INSERT INTO audit_forward_state (
EventId, ForwardState, OccurredAtUtc, IsCachedKind, AttemptCount, LastAttemptUtc
) VALUES (
$EventId, $ForwardState, $OccurredAtUtc, $IsCachedKind, 0, NULL
);
""";
var fEventId = fwdCmd.Parameters.Add("$EventId", SqliteType.Text);
var fForwardState = fwdCmd.Parameters.Add("$ForwardState", SqliteType.Text);
var fOccurredAt = fwdCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var fIsCachedKind = fwdCmd.Parameters.Add("$IsCachedKind", SqliteType.Integer);
foreach (var pending in batch)
{
// C3 transitional shim: decompose the canonical record into
// the typed 24-column values the existing SQLite schema
// expects (Channel/Kind/Status + the DetailsJson domain
// fields). ForwardState rides alongside the canonical record
// (site-storage-only) and is bound from pending.ForwardState.
var r = AuditRowProjection.Decompose(pending.Event);
pEventId.Value = r.EventId.ToString();
pOccurredAt.Value = r.OccurredAtUtc.ToString("o");
pChannel.Value = r.Channel.ToString();
pKind.Value = r.Kind.ToString();
pCorrelationId.Value = (object?)r.CorrelationId?.ToString() ?? DBNull.Value;
pSourceSiteId.Value = (object?)r.SourceSiteId ?? DBNull.Value;
var evt = pending.Event;
// Canonical OccurredAtUtc is UTC by construction; store the
// round-trip "o" form so string comparison stays monotonic
// (the drain range-scan and ORDER BY rely on it).
var occurredText = evt.OccurredAtUtc.UtcDateTime.ToString(
"o", System.Globalization.CultureInfo.InvariantCulture);
eEventId.Value = evt.EventId.ToString();
eOccurredAt.Value = occurredText;
// Canonical Actor is a required non-null string.
eActor.Value = evt.Actor ?? string.Empty;
eAction.Value = evt.Action;
eOutcome.Value = evt.Outcome.ToString();
eCategory.Value = (object?)evt.Category ?? DBNull.Value;
eTarget.Value = (object?)evt.Target ?? DBNull.Value;
// SourceNode-stamping: caller-provided value wins (preserves
// rows reconciled in from other nodes via the same writer);
// otherwise stamp from the local INodeIdentityProvider. The
// event record itself is NOT mutated — stamping is at write
// time only. If the provider also returns null (unconfigured
// node), the row's SourceNode stays NULL — operators see
// "needs config" via the schema, not a magic fallback string.
var sourceNode = r.SourceNode ?? _nodeIdentity.NodeName;
pSourceNode.Value = (object?)sourceNode ?? DBNull.Value;
pSourceInstanceId.Value = (object?)r.SourceInstanceId ?? DBNull.Value;
pSourceScript.Value = (object?)r.SourceScript ?? DBNull.Value;
pActor.Value = (object?)r.Actor ?? DBNull.Value;
pTarget.Value = (object?)r.Target ?? DBNull.Value;
pStatus.Value = r.Status.ToString();
pHttpStatus.Value = (object?)r.HttpStatus ?? DBNull.Value;
pDurationMs.Value = (object?)r.DurationMs ?? DBNull.Value;
pErrorMessage.Value = (object?)r.ErrorMessage ?? DBNull.Value;
pErrorDetail.Value = (object?)r.ErrorDetail ?? DBNull.Value;
pRequestSummary.Value = (object?)r.RequestSummary ?? DBNull.Value;
pResponseSummary.Value = (object?)r.ResponseSummary ?? DBNull.Value;
pPayloadTruncated.Value = r.PayloadTruncated ? 1 : 0;
pExtra.Value = (object?)r.Extra ?? DBNull.Value;
pForwardState.Value = pending.ForwardState.ToString();
pExecutionId.Value = (object?)r.ExecutionId?.ToString() ?? DBNull.Value;
pParentExecutionId.Value = (object?)r.ParentExecutionId?.ToString() ?? DBNull.Value;
// node), the column stays NULL — operators see "needs config"
// via the schema, not a magic fallback string.
var sourceNode = evt.SourceNode ?? _nodeIdentity.NodeName;
eSourceNode.Value = (object?)sourceNode ?? DBNull.Value;
eCorrelationId.Value = (object?)evt.CorrelationId?.ToString() ?? DBNull.Value;
eDetailsJson.Value = (object?)evt.DetailsJson ?? DBNull.Value;
fEventId.Value = evt.EventId.ToString();
fForwardState.Value = pending.ForwardState.ToString();
fOccurredAt.Value = occurredText;
fIsCachedKind.Value = IsCachedKind(evt.DetailsJson) ? 1 : 0;
try
{
cmd.ExecuteNonQuery();
eventCmd.ExecuteNonQuery();
fwdCmd.ExecuteNonQuery();
pending.Completion.TrySetResult();
}
catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
{
// Duplicate EventId — first-write-wins (alog.md §11).
// Treat as success: the lifecycle event is durably
// recorded under the first writer's payload.
// Duplicate EventId — first-write-wins (alog.md §11). The
// audit_event PRIMARY KEY throws before the sidecar insert
// runs, so neither table gains a second row. Treat as
// success: the lifecycle event is durably recorded under
// the first writer's payload.
_logger.LogDebug(ex,
"Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
r.EventId);
evt.EventId);
pending.Completion.TrySetResult();
}
}
@@ -429,17 +410,36 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry
// drain (joined with the operational tracking row + pushed via
// IngestCachedTelemetryAsync into the central dual-write transaction).
// ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit
// them; ReadPendingCachedTelemetryAsync below is the dedicated read surface
// the new SiteAuditTelemetryActor cached-drain uses.
private static readonly string[] CachedTelemetryKindNames =
// C4: this is the SAME set the pre-C4 ReadPendingCachedTelemetryAsync query
// filtered on (Kind IN (...)); it is now precomputed into the sidecar's
// IsCachedKind flag at INSERT (see IsCachedKind) so the read split is a cheap
// integer predicate, not a JSON parse. ReadPendingAsync drains everything
// with IsCachedKind=0; ReadPendingCachedTelemetryAsync drains IsCachedKind=1.
private static readonly HashSet<AuditKind> CachedTelemetryKinds = new()
{
nameof(AuditKind.CachedSubmit),
nameof(AuditKind.ApiCallCached),
nameof(AuditKind.DbWriteCached),
nameof(AuditKind.CachedResolve),
AuditKind.CachedSubmit,
AuditKind.ApiCallCached,
AuditKind.DbWriteCached,
AuditKind.CachedResolve,
};
/// <summary>
/// C4: precomputes the sidecar's <c>IsCachedKind</c> flag from a canonical
/// row's <c>DetailsJson</c>. Parses the <see cref="AuditDetails.Kind"/>
/// discriminator via <see cref="AuditDetailsCodec"/> and returns <c>true</c>
/// iff it is one of the cached-lifecycle kinds
/// (<see cref="AuditKind.CachedSubmit"/>, <see cref="AuditKind.ApiCallCached"/>,
/// <see cref="AuditKind.DbWriteCached"/>, <see cref="AuditKind.CachedResolve"/>).
/// Runs once per event at INSERT time so the cached/non-cached drain split is
/// a cheap integer predicate on read, never a JSON parse on the hot path.
/// </summary>
private static bool IsCachedKind(string? detailsJson)
{
var details = AuditDetailsCodec.Deserialize(detailsJson);
var kind = AuditRowProjection.ParseEnum(details.Kind, AuditKind.InboundRequest);
return CachedTelemetryKinds.Contains(kind);
}
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default)
{
@@ -451,47 +451,35 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-005: read via the dedicated _readConnection so this scan
// (which can be expensive when the backlog grows under a central
// outage) does not block the batched writer on _writeLock. WAL mode
// gives us a stable snapshot of the table while writes proceed on the
// gives us a stable snapshot of the tables while writes proceed on the
// writer connection. _readLock serialises this connection across
// multiple concurrent read callers since SqliteConnection itself is
// not thread-safe.
// AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the
// cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync
// + the combined-telemetry drain. Kind is stored as the enum's name (see
// FlushBatch's pKind.Value), so a string-IN against the constant kind
// names matches the on-disk shape exactly.
// C4: JOIN the sidecar and filter on IsCachedKind=0 — the cached-
// lifecycle kinds (IsCachedKind=1) flow through
// ReadPendingCachedTelemetryAsync + the combined-telemetry drain. The
// split is a precomputed integer predicate on the indexed sidecar, not
// a DetailsJson parse. Ordering is by the sidecar's OccurredAtUtc with
// EventId as the deterministic tiebreaker.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $pending
AND Kind NOT IN ($k0, $k1, $k2, $k3)
ORDER BY OccurredAtUtc ASC, EventId ASC
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $pending
AND fs.IsCachedKind = 0
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -504,42 +492,29 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// AuditLog-001: dedicated read surface for the cached-call lifecycle
// drain — symmetric to ReadPendingAsync but filtered to the four
// cached AuditKinds. Same _readConnection + _readLock pattern so the
// hot-path writer is not contended.
// AuditLog-001 / C4: dedicated read surface for the cached-call lifecycle
// drain — symmetric to ReadPendingAsync but filtered to IsCachedKind=1.
// Same _readConnection + _readLock pattern so the hot-path writer is not
// contended.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $pending
AND Kind IN ($k0, $k1, $k2, $k3)
ORDER BY OccurredAtUtc ASC, EventId ASC
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $pending
AND fs.IsCachedKind = 1
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -565,34 +540,27 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-005: mirror ReadPendingAsync — read via _readConnection /
// _readLock so this query never contends with the batched writer on
// _writeLock.
// _writeLock. C4: JOIN the sidecar and filter on ForwardState='Forwarded'
// (no IsCachedKind split — both cached and non-cached Forwarded rows are
// returned, as before).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $forwarded
ORDER BY OccurredAtUtc ASC, EventId ASC
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState = $forwarded
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -610,11 +578,16 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
// Build a single IN (...) parameter list so we issue one UPDATE per
// batch regardless of size. Each id is bound as its own parameter,
// so no string concatenation of user data ever enters the SQL.
// C4: flip the sidecar — UPDATE audit_forward_state, not the canonical
// audit_event (which is append-only / write-once). Bump AttemptCount +
// stamp LastAttemptUtc so operators can see how many drain passes a row
// took to forward. Build a single IN (...) parameter list so we issue
// one UPDATE per batch regardless of size. Each id is bound as its own
// parameter, so no string concatenation of user data ever enters the SQL.
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN (");
sb.Append("UPDATE audit_forward_state SET ForwardState = $forwarded, ")
.Append("AttemptCount = AttemptCount + 1, LastAttemptUtc = $now ")
.Append("WHERE EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
@@ -625,6 +598,8 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$now", DateTime.UtcNow.ToString(
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.ExecuteNonQuery();
return Task.CompletedTask;
@@ -641,22 +616,24 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
// AuditLog-005: read via _readConnection / _readLock — same lock-
// decoupling as ReadPendingAsync.
// decoupling as ReadPendingAsync. C4: JOIN the sidecar; the range scan
// is on the sidecar's duplicated OccurredAtUtc so it stays on IX_fwd.
// Both Pending and Forwarded rows are returned (the central reconciliation
// puller dedups on EventId; re-shipping a Forwarded-but-not-yet-ingested
// row is safe).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState IN ($pending, $forwarded)
AND OccurredAtUtc >= $since
ORDER BY OccurredAtUtc ASC, EventId ASC
SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
FROM audit_event ae
JOIN audit_forward_state fs ON fs.EventId = ae.EventId
WHERE fs.ForwardState IN ($pending, $forwarded)
AND fs.OccurredAtUtc >= $since
ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
@@ -668,14 +645,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.Parameters.AddWithValue("$limit", batchSize);
var rows = new List<AuditEvent>(Math.Min(batchSize, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
return Task.FromResult(ReadRows(cmd, batchSize));
}
}
@@ -693,8 +663,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
// C4: flip the sidecar from Pending/Forwarded → Reconciled. Rows
// already Reconciled are left untouched (idempotent re-call), and the
// canonical audit_event row is never modified.
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ")
sb.Append("UPDATE audit_forward_state SET ForwardState = $reconciled ")
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
@@ -726,18 +699,17 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// central outage the Pending backlog can grow to hundreds of thousands
// of rows and the COUNT(*) scan correspondingly stretches; that no
// longer adds tail latency to user-facing audit writes.
// C4: count over the sidecar (audit_forward_state) — the canonical
// audit_event table carries no ForwardState. The IX_fwd index makes both
// aggregates cheap (count is a covering scan, min is the first key).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
// Single round-trip — COUNT(*) + MIN(OccurredAtUtc) over the same
// index range avoids a second scan. The IX_SiteAuditLog_ForwardState_Occurred
// index makes both aggregates cheap (count is a covering scan, min
// is the first key).
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT COUNT(*), MIN(OccurredAtUtc)
FROM AuditLog
FROM audit_forward_state
WHERE ForwardState = $pending;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
@@ -788,38 +760,49 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
/// <summary>
/// Executes <paramref name="cmd"/> (one of the four reads, each already
/// projecting the 10 <c>audit_event</c> columns in canonical order) and
/// materialises the rows via <see cref="MapRow"/>.
/// </summary>
private static IReadOnlyList<AuditEvent> ReadRows(SqliteCommand cmd, int capacityHint)
{
var rows = new List<AuditEvent>(Math.Min(capacityHint, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return rows;
}
/// <summary>
/// C4: builds the canonical <see cref="AuditEvent"/> DIRECTLY from the 10
/// stored <c>audit_event</c> columns — no 24-column <c>Recompose</c>, because
/// <c>audit_event</c> already holds the canonical fields + <c>DetailsJson</c>.
/// <c>Outcome</c> is stored as the enum's name; the safe
/// <see cref="AuditRowProjection.ParseEnum{TEnum}"/> degrades an unknown/renamed
/// value gracefully rather than throwing.
/// </summary>
private static AuditEvent MapRow(SqliteDataReader reader)
{
// C3 transitional shim: recompose the canonical record from the 24
// columns. The ForwardState column (ordinal 20) is read for the
// schema's sake but NOT placed on the canonical record — it stays a
// site-storage-only concern (the forwarding queries below own it).
return AuditRowProjection.Recompose(new AuditRowProjection.AuditRowValues(
EventId: Guid.Parse(reader.GetString(0)),
OccurredAtUtc: DateTime.Parse(reader.GetString(1),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.RoundtripKind),
IngestedAtUtc: null,
Channel: AuditRowProjection.ParseEnum<AuditChannel>(reader.GetString(2), AuditChannel.ApiInbound),
Kind: AuditRowProjection.ParseEnum<AuditKind>(reader.GetString(3), AuditKind.InboundRequest),
Status: AuditRowProjection.ParseEnum<AuditStatus>(reader.GetString(11), AuditStatus.Submitted),
CorrelationId: reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)),
ExecutionId: reader.IsDBNull(21) ? null : Guid.Parse(reader.GetString(21)),
ParentExecutionId: reader.IsDBNull(22) ? null : Guid.Parse(reader.GetString(22)),
SourceSiteId: reader.IsDBNull(5) ? null : reader.GetString(5),
SourceNode: reader.IsDBNull(6) ? null : reader.GetString(6),
SourceInstanceId: reader.IsDBNull(7) ? null : reader.GetString(7),
SourceScript: reader.IsDBNull(8) ? null : reader.GetString(8),
Actor: reader.IsDBNull(9) ? null : reader.GetString(9),
Target: reader.IsDBNull(10) ? null : reader.GetString(10),
HttpStatus: reader.IsDBNull(12) ? null : reader.GetInt32(12),
DurationMs: reader.IsDBNull(13) ? null : reader.GetInt32(13),
ErrorMessage: reader.IsDBNull(14) ? null : reader.GetString(14),
ErrorDetail: reader.IsDBNull(15) ? null : reader.GetString(15),
RequestSummary: reader.IsDBNull(16) ? null : reader.GetString(16),
ResponseSummary: reader.IsDBNull(17) ? null : reader.GetString(17),
PayloadTruncated: reader.GetInt32(18) != 0,
Extra: reader.IsDBNull(19) ? null : reader.GetString(19)));
return new AuditEvent
{
EventId = Guid.Parse(reader.GetString(0)),
OccurredAtUtc = new DateTimeOffset(DateTime.SpecifyKind(
DateTime.Parse(reader.GetString(1),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.RoundtripKind),
DateTimeKind.Utc)),
Actor = reader.GetString(2),
Action = reader.GetString(3),
Outcome = AuditRowProjection.ParseEnum(reader.GetString(4), AuditOutcome.Success),
Category = reader.IsDBNull(5) ? null : reader.GetString(5),
Target = reader.IsDBNull(6) ? null : reader.GetString(6),
SourceNode = reader.IsDBNull(7) ? null : reader.GetString(7),
CorrelationId = reader.IsDBNull(8) ? null : Guid.Parse(reader.GetString(8)),
DetailsJson = reader.IsDBNull(9) ? null : reader.GetString(9),
};
}
/// <summary>
@@ -903,7 +886,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
{
/// <summary>Initializes a new instance of the PendingAuditEvent class.</summary>
/// <param name="evt">The canonical audit event to persist.</param>
/// <param name="forwardState">Site-local forwarding state stored alongside the canonical row (C3 shim — not a canonical field).</param>
/// <param name="forwardState">Initial site-local forwarding state written to the sidecar row (always Pending for fresh events).</param>
public PendingAuditEvent(AuditEvent evt, AuditForwardState forwardState)
{
Event = evt;
@@ -913,7 +896,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
/// <summary>The canonical audit event to persist.</summary>
public AuditEvent Event { get; }
/// <summary>Site-local forwarding state for this row (C3 shim — bound to the ForwardState column).</summary>
/// <summary>Initial forwarding state for this row's sidecar (bound to audit_forward_state.ForwardState).</summary>
public AuditForwardState ForwardState { get; }
/// <summary>Task completion source for write completion signaling.</summary>
public TaskCompletionSource Completion { get; }