From 946d3e2aef7136c05cb181ed26637abad505997f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 2 Jun 2026 13:11:20 -0400 Subject: [PATCH] =?UTF-8?q?feat(audit):=20ScadaBridge=20C4=20=E2=80=94=20s?= =?UTF-8?q?ite=20SQLite=20two-table=20(audit=5Fevent=20canonical=20+=20aud?= =?UTF-8?q?it=5Fforward=5Fstate=20sidecar),=20forwarding=20on=20sidecar,?= =?UTF-8?q?=20IsCachedKind=20drain=20split=20(Task=202.5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Site/SqliteAuditWriter.cs | 599 +++++++++--------- .../Site/SqliteAuditWriterSchemaTests.cs | 480 +++++--------- .../Site/SqliteAuditWriterWriteTests.cs | 314 ++++++--- 3 files changed, 689 insertions(+), 704 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs index 1c0754ab..13af1da4 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs @@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using AuditEvent = ZB.MOM.WW.Audit.AuditEvent; +using AuditOutcome = ZB.MOM.WW.Audit.AuditOutcome; namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site; @@ -19,15 +20,27 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site; /// /// /// -/// The schema is bootstrapped in the constructor (Bundle B-T1). The -/// Channel-based hot-path + Bundle D -/// / support -/// surface are wired in Bundle B-T2. +/// C4 (Task 2.5) — two-table schema. The site store is now two tables: +/// the append-only canonical audit_event (the 10 canonical +/// fields stored directly — NO 24-column decompose) and +/// the mutable operational audit_forward_state sidecar that carries the +/// forwarding lifecycle (), a duplicated +/// OccurredAtUtc for the drain index range-scan, a precomputed +/// IsCachedKind flag that drives the cached/non-cached drain split without +/// re-parsing DetailsJson on the read hot-path, plus attempt bookkeeping. +/// +/// +/// Ephemeral reset. The site SQLite store is ephemeral (≈7-day retention, +/// recreated per deployment), so C4's schema change is an in-place RESET: the new +/// tables are created and the old single 24-column AuditLog table is +/// DROP-ped if present. No SQLite data migration is performed (and none is +/// needed) — any rows in a pre-C4 AuditLog table are within the retention +/// window and are discarded by the drop. /// /// /// Site rows always carry on first -/// insert; the central row-shape's IngestedAtUtc column does NOT live in -/// the site SQLite schema — central stamps it on ingest. +/// insert; the central row-shape's IngestedAtUtc is a DetailsJson field +/// stamped by central on ingest, not a site column. /// /// public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable @@ -36,8 +49,10 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY) // is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably // surfaced across all SQLite builds. We treat any constraint error on insert - // as a duplicate-eventid race and swallow it (first-write-wins) — the index - // on EventId is the only constraint on this table, so this scope is precise. + // as a duplicate-eventid race and swallow it (first-write-wins) — the PRIMARY + // KEY on audit_event.EventId is the constraint that fires first, so this scope + // is precise (the sidecar insert for the same EventId is in the same + // transaction and never reached once audit_event's insert throws). private const int SqliteErrorConstraint = 19; private readonly SqliteConnection _connection; @@ -141,95 +156,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable pragmaCmd.ExecuteNonQuery(); } - using var cmd = _connection.CreateCommand(); - cmd.CommandText = """ - CREATE TABLE IF NOT EXISTS AuditLog ( - EventId TEXT NOT NULL, - OccurredAtUtc TEXT NOT NULL, - Channel TEXT NOT NULL, - Kind TEXT NOT NULL, - CorrelationId TEXT NULL, - SourceSiteId TEXT NULL, - SourceNode TEXT NULL, - SourceInstanceId TEXT NULL, - SourceScript TEXT NULL, - Actor TEXT NULL, - Target TEXT NULL, - Status TEXT NOT NULL, - HttpStatus INTEGER NULL, - DurationMs INTEGER NULL, - ErrorMessage TEXT NULL, - ErrorDetail TEXT NULL, - RequestSummary TEXT NULL, - ResponseSummary TEXT NULL, - PayloadTruncated INTEGER NOT NULL, - Extra TEXT NULL, - ForwardState TEXT NOT NULL, - ExecutionId TEXT NULL, - ParentExecutionId TEXT NULL, - PRIMARY KEY (EventId) - ); - CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred - ON AuditLog (ForwardState, OccurredAtUtc); - """; - cmd.ExecuteNonQuery(); - - // Audit Log #23 (ExecutionId): additively add the ExecutionId column. - // CREATE TABLE IF NOT EXISTS above does NOT add columns to an AuditLog - // table that already exists from a pre-ExecutionId build, so an - // auditlog.db created by an older build needs the column ALTER-ed in. - // The file is durable across restart/failover by design (7-day - // retention), so without this step every WriteAsync on an upgraded - // deployment would bind $ExecutionId against a missing column and the - // best-effort write path would silently drop every site audit row. - // SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is - // probed first and the ALTER skipped when already there. The column is - // nullable with no default, so any row written before this migration - // reads back ExecutionId = null (back-compat). - AddColumnIfMissing("ExecutionId", "TEXT NULL"); - - // Audit Log #23 (ParentExecutionId): same idempotent upgrade path as - // ExecutionId above. A deployment that already ran the ExecutionId - // branch has an auditlog.db with the 21-column schema and no - // ParentExecutionId column; CREATE TABLE IF NOT EXISTS cannot add it, - // so it is ALTER-ed in here. Nullable with no default — rows written - // before this migration read back ParentExecutionId = null. - AddColumnIfMissing("ParentExecutionId", "TEXT NULL"); - - // SourceNode stamping: same idempotent upgrade path as ExecutionId / - // ParentExecutionId above. A deployment that already ran the - // ParentExecutionId branch has an auditlog.db with the 22-column - // schema and no SourceNode column; CREATE TABLE IF NOT EXISTS cannot - // add it, so it is ALTER-ed in here. Nullable with no default — rows - // written before this migration read back SourceNode = null. - AddColumnIfMissing("SourceNode", "TEXT NULL"); - } - - /// - /// Audit Log #23: additively adds a column to AuditLog only when - /// it is not already present (used for ExecutionId and - /// ParentExecutionId). SQLite lacks ADD COLUMN IF NOT EXISTS, - /// so the schema is probed via PRAGMA table_info first. Idempotent — - /// safe to run on every . Mirrors - /// StoreAndForwardStorage.AddColumnIfMissingAsync; kept synchronous - /// here to match the rest of this writer's bootstrap DDL. - /// - private void AddColumnIfMissing(string columnName, string columnDefinition) - { - using var probe = _connection.CreateCommand(); - probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name"; - probe.Parameters.AddWithValue("$name", columnName); - var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0; - if (exists) + // C4 (Task 2.5) — in-place reset. The site store is EPHEMERAL (≈7-day + // retention, recreated per deployment), so we do NOT migrate the old + // single 24-column AuditLog table to the new two-table shape: any rows + // it holds are within the retention window and discarded. DROP it if a + // pre-C4 deployment left it behind, then CREATE the two new tables. This + // is safe precisely BECAUSE the site store is ephemeral — never do this + // on a durable store (the central SQL Server side keeps its shim until + // C5 and is migrated, not reset). + using (var dropCmd = _connection.CreateCommand()) { - return; + dropCmd.CommandText = "DROP TABLE IF EXISTS AuditLog;"; + dropCmd.ExecuteNonQuery(); } - using var alter = _connection.CreateCommand(); - // Column name + definition are caller-controlled constants, never user - // input — safe to interpolate (parameters are not permitted in DDL). - alter.CommandText = $"ALTER TABLE AuditLog ADD COLUMN {columnName} {columnDefinition}"; - alter.ExecuteNonQuery(); + using var cmd = _connection.CreateCommand(); + cmd.CommandText = """ + -- Canonical, append-only / write-once: the 10 fields of the canonical + -- ZB.MOM.WW.Audit.AuditEvent stored directly (DetailsJson carries the + -- ScadaBridge domain fields). No forwarding state lives here — that is + -- the audit_forward_state sidecar's concern. + CREATE TABLE IF NOT EXISTS audit_event ( + EventId TEXT NOT NULL, + OccurredAtUtc TEXT NOT NULL, + Actor TEXT NOT NULL, + Action TEXT NOT NULL, + Outcome TEXT NOT NULL, + Category TEXT NULL, + Target TEXT NULL, + SourceNode TEXT NULL, + CorrelationId TEXT NULL, + DetailsJson TEXT NULL, + PRIMARY KEY (EventId) + ); + + -- Operational, mutable: the forwarding lifecycle for each canonical + -- row. OccurredAtUtc is duplicated here so the drain range-scan stays + -- on this one table's index; IsCachedKind is precomputed at insert so + -- the cached/non-cached drain split never re-parses DetailsJson on the + -- read hot-path. + CREATE TABLE IF NOT EXISTS audit_forward_state ( + EventId TEXT NOT NULL, + ForwardState TEXT NOT NULL, + OccurredAtUtc TEXT NOT NULL, + IsCachedKind INTEGER NOT NULL, + AttemptCount INTEGER NOT NULL DEFAULT 0, + LastAttemptUtc TEXT NULL, + PRIMARY KEY (EventId), + FOREIGN KEY (EventId) REFERENCES audit_event(EventId) + ); + + -- Drain index: every read filters on (ForwardState, IsCachedKind) and + -- range-scans/orders by OccurredAtUtc, so this composite covers the + -- four reads + the backlog COUNT/MIN. + CREATE INDEX IF NOT EXISTS IX_fwd + ON audit_forward_state (ForwardState, IsCachedKind, OccurredAtUtc); + """; + cmd.ExecuteNonQuery(); } /// @@ -237,9 +220,9 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable { ArgumentNullException.ThrowIfNull(evt); - // C3 transitional shim: the canonical record carries no ForwardState - // (a site-storage-only concern). Site rows always start Pending; the - // forwarding columns + queries are unchanged from the 24-column schema. + // The canonical record carries no ForwardState (a site-storage-only + // concern). Site rows always start Pending; the sidecar row is written + // alongside the canonical row in the same transaction. var pending = new PendingAuditEvent(evt, AuditForwardState.Pending); // CreateBounded(FullMode=Wait) means WriteAsync will await room rather @@ -313,101 +296,99 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable using var transaction = _connection.BeginTransaction(); try { - using var cmd = _connection.CreateCommand(); - cmd.Transaction = transaction; - cmd.CommandText = """ - INSERT INTO AuditLog ( - EventId, OccurredAtUtc, Channel, Kind, CorrelationId, - SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, - Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, - RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, - ExecutionId, ParentExecutionId + // INSERT 1: the canonical row, stored DIRECTLY (the 10 canonical + // fields straight off the AuditEvent — no Decompose; audit_event + // holds canonical shape, not the legacy 24-column shape). + using var eventCmd = _connection.CreateCommand(); + eventCmd.Transaction = transaction; + eventCmd.CommandText = """ + INSERT INTO audit_event ( + EventId, OccurredAtUtc, Actor, Action, Outcome, + Category, Target, SourceNode, CorrelationId, DetailsJson ) VALUES ( - $EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId, - $SourceSiteId, $SourceNode, $SourceInstanceId, $SourceScript, $Actor, $Target, - $Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail, - $RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState, - $ExecutionId, $ParentExecutionId + $EventId, $OccurredAtUtc, $Actor, $Action, $Outcome, + $Category, $Target, $SourceNode, $CorrelationId, $DetailsJson ); """; + var eEventId = eventCmd.Parameters.Add("$EventId", SqliteType.Text); + var eOccurredAt = eventCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text); + var eActor = eventCmd.Parameters.Add("$Actor", SqliteType.Text); + var eAction = eventCmd.Parameters.Add("$Action", SqliteType.Text); + var eOutcome = eventCmd.Parameters.Add("$Outcome", SqliteType.Text); + var eCategory = eventCmd.Parameters.Add("$Category", SqliteType.Text); + var eTarget = eventCmd.Parameters.Add("$Target", SqliteType.Text); + var eSourceNode = eventCmd.Parameters.Add("$SourceNode", SqliteType.Text); + var eCorrelationId = eventCmd.Parameters.Add("$CorrelationId", SqliteType.Text); + var eDetailsJson = eventCmd.Parameters.Add("$DetailsJson", SqliteType.Text); - var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text); - var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text); - var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text); - var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text); - var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text); - var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text); - var pSourceNode = cmd.Parameters.Add("$SourceNode", SqliteType.Text); - var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text); - var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text); - var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text); - var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text); - var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text); - var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer); - var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer); - var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text); - var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text); - var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text); - var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text); - var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer); - var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text); - var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text); - var pExecutionId = cmd.Parameters.Add("$ExecutionId", SqliteType.Text); - var pParentExecutionId = cmd.Parameters.Add("$ParentExecutionId", SqliteType.Text); + // INSERT 2: the operational sidecar row. ForwardState=Pending, + // OccurredAtUtc duplicated for the drain index, IsCachedKind + // precomputed (so the read split never parses DetailsJson), + // AttemptCount=0, LastAttemptUtc=NULL. + using var fwdCmd = _connection.CreateCommand(); + fwdCmd.Transaction = transaction; + fwdCmd.CommandText = """ + INSERT INTO audit_forward_state ( + EventId, ForwardState, OccurredAtUtc, IsCachedKind, AttemptCount, LastAttemptUtc + ) VALUES ( + $EventId, $ForwardState, $OccurredAtUtc, $IsCachedKind, 0, NULL + ); + """; + var fEventId = fwdCmd.Parameters.Add("$EventId", SqliteType.Text); + var fForwardState = fwdCmd.Parameters.Add("$ForwardState", SqliteType.Text); + var fOccurredAt = fwdCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text); + var fIsCachedKind = fwdCmd.Parameters.Add("$IsCachedKind", SqliteType.Integer); foreach (var pending in batch) { - // C3 transitional shim: decompose the canonical record into - // the typed 24-column values the existing SQLite schema - // expects (Channel/Kind/Status + the DetailsJson domain - // fields). ForwardState rides alongside the canonical record - // (site-storage-only) and is bound from pending.ForwardState. - var r = AuditRowProjection.Decompose(pending.Event); - pEventId.Value = r.EventId.ToString(); - pOccurredAt.Value = r.OccurredAtUtc.ToString("o"); - pChannel.Value = r.Channel.ToString(); - pKind.Value = r.Kind.ToString(); - pCorrelationId.Value = (object?)r.CorrelationId?.ToString() ?? DBNull.Value; - pSourceSiteId.Value = (object?)r.SourceSiteId ?? DBNull.Value; + var evt = pending.Event; + // Canonical OccurredAtUtc is UTC by construction; store the + // round-trip "o" form so string comparison stays monotonic + // (the drain range-scan and ORDER BY rely on it). + var occurredText = evt.OccurredAtUtc.UtcDateTime.ToString( + "o", System.Globalization.CultureInfo.InvariantCulture); + + eEventId.Value = evt.EventId.ToString(); + eOccurredAt.Value = occurredText; + // Canonical Actor is a required non-null string. + eActor.Value = evt.Actor ?? string.Empty; + eAction.Value = evt.Action; + eOutcome.Value = evt.Outcome.ToString(); + eCategory.Value = (object?)evt.Category ?? DBNull.Value; + eTarget.Value = (object?)evt.Target ?? DBNull.Value; // SourceNode-stamping: caller-provided value wins (preserves // rows reconciled in from other nodes via the same writer); // otherwise stamp from the local INodeIdentityProvider. The // event record itself is NOT mutated — stamping is at write // time only. If the provider also returns null (unconfigured - // node), the row's SourceNode stays NULL — operators see - // "needs config" via the schema, not a magic fallback string. - var sourceNode = r.SourceNode ?? _nodeIdentity.NodeName; - pSourceNode.Value = (object?)sourceNode ?? DBNull.Value; - pSourceInstanceId.Value = (object?)r.SourceInstanceId ?? DBNull.Value; - pSourceScript.Value = (object?)r.SourceScript ?? DBNull.Value; - pActor.Value = (object?)r.Actor ?? DBNull.Value; - pTarget.Value = (object?)r.Target ?? DBNull.Value; - pStatus.Value = r.Status.ToString(); - pHttpStatus.Value = (object?)r.HttpStatus ?? DBNull.Value; - pDurationMs.Value = (object?)r.DurationMs ?? DBNull.Value; - pErrorMessage.Value = (object?)r.ErrorMessage ?? DBNull.Value; - pErrorDetail.Value = (object?)r.ErrorDetail ?? DBNull.Value; - pRequestSummary.Value = (object?)r.RequestSummary ?? DBNull.Value; - pResponseSummary.Value = (object?)r.ResponseSummary ?? DBNull.Value; - pPayloadTruncated.Value = r.PayloadTruncated ? 1 : 0; - pExtra.Value = (object?)r.Extra ?? DBNull.Value; - pForwardState.Value = pending.ForwardState.ToString(); - pExecutionId.Value = (object?)r.ExecutionId?.ToString() ?? DBNull.Value; - pParentExecutionId.Value = (object?)r.ParentExecutionId?.ToString() ?? DBNull.Value; + // node), the column stays NULL — operators see "needs config" + // via the schema, not a magic fallback string. + var sourceNode = evt.SourceNode ?? _nodeIdentity.NodeName; + eSourceNode.Value = (object?)sourceNode ?? DBNull.Value; + eCorrelationId.Value = (object?)evt.CorrelationId?.ToString() ?? DBNull.Value; + eDetailsJson.Value = (object?)evt.DetailsJson ?? DBNull.Value; + + fEventId.Value = evt.EventId.ToString(); + fForwardState.Value = pending.ForwardState.ToString(); + fOccurredAt.Value = occurredText; + fIsCachedKind.Value = IsCachedKind(evt.DetailsJson) ? 1 : 0; try { - cmd.ExecuteNonQuery(); + eventCmd.ExecuteNonQuery(); + fwdCmd.ExecuteNonQuery(); pending.Completion.TrySetResult(); } catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint) { - // Duplicate EventId — first-write-wins (alog.md §11). - // Treat as success: the lifecycle event is durably - // recorded under the first writer's payload. + // Duplicate EventId — first-write-wins (alog.md §11). The + // audit_event PRIMARY KEY throws before the sidecar insert + // runs, so neither table gains a second row. Treat as + // success: the lifecycle event is durably recorded under + // the first writer's payload. _logger.LogDebug(ex, "Duplicate EventId {EventId} swallowed by SqliteAuditWriter", - r.EventId); + evt.EventId); pending.Completion.TrySetResult(); } } @@ -429,17 +410,36 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry // drain (joined with the operational tracking row + pushed via // IngestCachedTelemetryAsync into the central dual-write transaction). - // ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit - // them; ReadPendingCachedTelemetryAsync below is the dedicated read surface - // the new SiteAuditTelemetryActor cached-drain uses. - private static readonly string[] CachedTelemetryKindNames = + // C4: this is the SAME set the pre-C4 ReadPendingCachedTelemetryAsync query + // filtered on (Kind IN (...)); it is now precomputed into the sidecar's + // IsCachedKind flag at INSERT (see IsCachedKind) so the read split is a cheap + // integer predicate, not a JSON parse. ReadPendingAsync drains everything + // with IsCachedKind=0; ReadPendingCachedTelemetryAsync drains IsCachedKind=1. + private static readonly HashSet CachedTelemetryKinds = new() { - nameof(AuditKind.CachedSubmit), - nameof(AuditKind.ApiCallCached), - nameof(AuditKind.DbWriteCached), - nameof(AuditKind.CachedResolve), + AuditKind.CachedSubmit, + AuditKind.ApiCallCached, + AuditKind.DbWriteCached, + AuditKind.CachedResolve, }; + /// + /// C4: precomputes the sidecar's IsCachedKind flag from a canonical + /// row's DetailsJson. Parses the + /// discriminator via and returns true + /// iff it is one of the cached-lifecycle kinds + /// (, , + /// , ). + /// Runs once per event at INSERT time so the cached/non-cached drain split is + /// a cheap integer predicate on read, never a JSON parse on the hot path. + /// + private static bool IsCachedKind(string? detailsJson) + { + var details = AuditDetailsCodec.Deserialize(detailsJson); + var kind = AuditRowProjection.ParseEnum(details.Kind, AuditKind.InboundRequest); + return CachedTelemetryKinds.Contains(kind); + } + /// public Task> ReadPendingAsync(int limit, CancellationToken ct = default) { @@ -451,47 +451,35 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // AuditLog-005: read via the dedicated _readConnection so this scan // (which can be expensive when the backlog grows under a central // outage) does not block the batched writer on _writeLock. WAL mode - // gives us a stable snapshot of the table while writes proceed on the + // gives us a stable snapshot of the tables while writes proceed on the // writer connection. _readLock serialises this connection across // multiple concurrent read callers since SqliteConnection itself is // not thread-safe. - // AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the - // cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync - // + the combined-telemetry drain. Kind is stored as the enum's name (see - // FlushBatch's pKind.Value), so a string-IN against the constant kind - // names matches the on-disk shape exactly. + // C4: JOIN the sidecar and filter on IsCachedKind=0 — the cached- + // lifecycle kinds (IsCachedKind=1) flow through + // ReadPendingCachedTelemetryAsync + the combined-telemetry drain. The + // split is a precomputed integer predicate on the indexed sidecar, not + // a DetailsJson parse. Ordering is by the sidecar's OccurredAtUtc with + // EventId as the deterministic tiebreaker. lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _readConnection.CreateCommand(); cmd.CommandText = """ - SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, - SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, - Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, - RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, - ExecutionId, ParentExecutionId - FROM AuditLog - WHERE ForwardState = $pending - AND Kind NOT IN ($k0, $k1, $k2, $k3) - ORDER BY OccurredAtUtc ASC, EventId ASC + SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome, + ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson + FROM audit_event ae + JOIN audit_forward_state fs ON fs.EventId = ae.EventId + WHERE fs.ForwardState = $pending + AND fs.IsCachedKind = 0 + ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC LIMIT $limit; """; cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); - cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]); - cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]); - cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]); - cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]); cmd.Parameters.AddWithValue("$limit", limit); - var rows = new List(Math.Min(limit, 256)); - using var reader = cmd.ExecuteReader(); - while (reader.Read()) - { - rows.Add(MapRow(reader)); - } - - return Task.FromResult>(rows); + return Task.FromResult(ReadRows(cmd, limit)); } } @@ -504,42 +492,29 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0."); } - // AuditLog-001: dedicated read surface for the cached-call lifecycle - // drain — symmetric to ReadPendingAsync but filtered to the four - // cached AuditKinds. Same _readConnection + _readLock pattern so the - // hot-path writer is not contended. + // AuditLog-001 / C4: dedicated read surface for the cached-call lifecycle + // drain — symmetric to ReadPendingAsync but filtered to IsCachedKind=1. + // Same _readConnection + _readLock pattern so the hot-path writer is not + // contended. lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _readConnection.CreateCommand(); cmd.CommandText = """ - SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, - SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, - Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, - RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, - ExecutionId, ParentExecutionId - FROM AuditLog - WHERE ForwardState = $pending - AND Kind IN ($k0, $k1, $k2, $k3) - ORDER BY OccurredAtUtc ASC, EventId ASC + SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome, + ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson + FROM audit_event ae + JOIN audit_forward_state fs ON fs.EventId = ae.EventId + WHERE fs.ForwardState = $pending + AND fs.IsCachedKind = 1 + ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC LIMIT $limit; """; cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); - cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]); - cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]); - cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]); - cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]); cmd.Parameters.AddWithValue("$limit", limit); - var rows = new List(Math.Min(limit, 256)); - using var reader = cmd.ExecuteReader(); - while (reader.Read()) - { - rows.Add(MapRow(reader)); - } - - return Task.FromResult>(rows); + return Task.FromResult(ReadRows(cmd, limit)); } } @@ -565,34 +540,27 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // AuditLog-005: mirror ReadPendingAsync — read via _readConnection / // _readLock so this query never contends with the batched writer on - // _writeLock. + // _writeLock. C4: JOIN the sidecar and filter on ForwardState='Forwarded' + // (no IsCachedKind split — both cached and non-cached Forwarded rows are + // returned, as before). lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _readConnection.CreateCommand(); cmd.CommandText = """ - SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, - SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, - Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, - RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, - ExecutionId, ParentExecutionId - FROM AuditLog - WHERE ForwardState = $forwarded - ORDER BY OccurredAtUtc ASC, EventId ASC + SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome, + ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson + FROM audit_event ae + JOIN audit_forward_state fs ON fs.EventId = ae.EventId + WHERE fs.ForwardState = $forwarded + ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC LIMIT $limit; """; cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString()); cmd.Parameters.AddWithValue("$limit", limit); - var rows = new List(Math.Min(limit, 256)); - using var reader = cmd.ExecuteReader(); - while (reader.Read()) - { - rows.Add(MapRow(reader)); - } - - return Task.FromResult>(rows); + return Task.FromResult(ReadRows(cmd, limit)); } } @@ -610,11 +578,16 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _connection.CreateCommand(); - // Build a single IN (...) parameter list so we issue one UPDATE per - // batch regardless of size. Each id is bound as its own parameter, - // so no string concatenation of user data ever enters the SQL. + // C4: flip the sidecar — UPDATE audit_forward_state, not the canonical + // audit_event (which is append-only / write-once). Bump AttemptCount + + // stamp LastAttemptUtc so operators can see how many drain passes a row + // took to forward. Build a single IN (...) parameter list so we issue + // one UPDATE per batch regardless of size. Each id is bound as its own + // parameter, so no string concatenation of user data ever enters the SQL. var sb = new System.Text.StringBuilder(); - sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN ("); + sb.Append("UPDATE audit_forward_state SET ForwardState = $forwarded, ") + .Append("AttemptCount = AttemptCount + 1, LastAttemptUtc = $now ") + .Append("WHERE EventId IN ("); for (int i = 0; i < eventIds.Count; i++) { if (i > 0) sb.Append(','); @@ -625,6 +598,8 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable sb.Append(");"); cmd.CommandText = sb.ToString(); cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString()); + cmd.Parameters.AddWithValue("$now", DateTime.UtcNow.ToString( + "o", System.Globalization.CultureInfo.InvariantCulture)); cmd.ExecuteNonQuery(); return Task.CompletedTask; @@ -641,22 +616,24 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable } // AuditLog-005: read via _readConnection / _readLock — same lock- - // decoupling as ReadPendingAsync. + // decoupling as ReadPendingAsync. C4: JOIN the sidecar; the range scan + // is on the sidecar's duplicated OccurredAtUtc so it stays on IX_fwd. + // Both Pending and Forwarded rows are returned (the central reconciliation + // puller dedups on EventId; re-shipping a Forwarded-but-not-yet-ingested + // row is safe). lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _readConnection.CreateCommand(); cmd.CommandText = """ - SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, - SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, - Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, - RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, - ExecutionId, ParentExecutionId - FROM AuditLog - WHERE ForwardState IN ($pending, $forwarded) - AND OccurredAtUtc >= $since - ORDER BY OccurredAtUtc ASC, EventId ASC + SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome, + ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson + FROM audit_event ae + JOIN audit_forward_state fs ON fs.EventId = ae.EventId + WHERE fs.ForwardState IN ($pending, $forwarded) + AND fs.OccurredAtUtc >= $since + ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC LIMIT $limit; """; cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); @@ -668,14 +645,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable "o", System.Globalization.CultureInfo.InvariantCulture)); cmd.Parameters.AddWithValue("$limit", batchSize); - var rows = new List(Math.Min(batchSize, 256)); - using var reader = cmd.ExecuteReader(); - while (reader.Read()) - { - rows.Add(MapRow(reader)); - } - - return Task.FromResult>(rows); + return Task.FromResult(ReadRows(cmd, batchSize)); } } @@ -693,8 +663,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable ObjectDisposedException.ThrowIf(_disposed, this); using var cmd = _connection.CreateCommand(); + // C4: flip the sidecar from Pending/Forwarded → Reconciled. Rows + // already Reconciled are left untouched (idempotent re-call), and the + // canonical audit_event row is never modified. var sb = new System.Text.StringBuilder(); - sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ") + sb.Append("UPDATE audit_forward_state SET ForwardState = $reconciled ") .Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN ("); for (int i = 0; i < eventIds.Count; i++) { @@ -726,18 +699,17 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // central outage the Pending backlog can grow to hundreds of thousands // of rows and the COUNT(*) scan correspondingly stretches; that no // longer adds tail latency to user-facing audit writes. + // C4: count over the sidecar (audit_forward_state) — the canonical + // audit_event table carries no ForwardState. The IX_fwd index makes both + // aggregates cheap (count is a covering scan, min is the first key). lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); - // Single round-trip — COUNT(*) + MIN(OccurredAtUtc) over the same - // index range avoids a second scan. The IX_SiteAuditLog_ForwardState_Occurred - // index makes both aggregates cheap (count is a covering scan, min - // is the first key). using var cmd = _readConnection.CreateCommand(); cmd.CommandText = """ SELECT COUNT(*), MIN(OccurredAtUtc) - FROM AuditLog + FROM audit_forward_state WHERE ForwardState = $pending; """; cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); @@ -788,38 +760,49 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable ? value : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc); + /// + /// Executes (one of the four reads, each already + /// projecting the 10 audit_event columns in canonical order) and + /// materialises the rows via . + /// + private static IReadOnlyList ReadRows(SqliteCommand cmd, int capacityHint) + { + var rows = new List(Math.Min(capacityHint, 256)); + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + rows.Add(MapRow(reader)); + } + return rows; + } + + /// + /// C4: builds the canonical DIRECTLY from the 10 + /// stored audit_event columns — no 24-column Recompose, because + /// audit_event already holds the canonical fields + DetailsJson. + /// Outcome is stored as the enum's name; the safe + /// degrades an unknown/renamed + /// value gracefully rather than throwing. + /// private static AuditEvent MapRow(SqliteDataReader reader) { - // C3 transitional shim: recompose the canonical record from the 24 - // columns. The ForwardState column (ordinal 20) is read for the - // schema's sake but NOT placed on the canonical record — it stays a - // site-storage-only concern (the forwarding queries below own it). - return AuditRowProjection.Recompose(new AuditRowProjection.AuditRowValues( - EventId: Guid.Parse(reader.GetString(0)), - OccurredAtUtc: DateTime.Parse(reader.GetString(1), - System.Globalization.CultureInfo.InvariantCulture, - System.Globalization.DateTimeStyles.RoundtripKind), - IngestedAtUtc: null, - Channel: AuditRowProjection.ParseEnum(reader.GetString(2), AuditChannel.ApiInbound), - Kind: AuditRowProjection.ParseEnum(reader.GetString(3), AuditKind.InboundRequest), - Status: AuditRowProjection.ParseEnum(reader.GetString(11), AuditStatus.Submitted), - CorrelationId: reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)), - ExecutionId: reader.IsDBNull(21) ? null : Guid.Parse(reader.GetString(21)), - ParentExecutionId: reader.IsDBNull(22) ? null : Guid.Parse(reader.GetString(22)), - SourceSiteId: reader.IsDBNull(5) ? null : reader.GetString(5), - SourceNode: reader.IsDBNull(6) ? null : reader.GetString(6), - SourceInstanceId: reader.IsDBNull(7) ? null : reader.GetString(7), - SourceScript: reader.IsDBNull(8) ? null : reader.GetString(8), - Actor: reader.IsDBNull(9) ? null : reader.GetString(9), - Target: reader.IsDBNull(10) ? null : reader.GetString(10), - HttpStatus: reader.IsDBNull(12) ? null : reader.GetInt32(12), - DurationMs: reader.IsDBNull(13) ? null : reader.GetInt32(13), - ErrorMessage: reader.IsDBNull(14) ? null : reader.GetString(14), - ErrorDetail: reader.IsDBNull(15) ? null : reader.GetString(15), - RequestSummary: reader.IsDBNull(16) ? null : reader.GetString(16), - ResponseSummary: reader.IsDBNull(17) ? null : reader.GetString(17), - PayloadTruncated: reader.GetInt32(18) != 0, - Extra: reader.IsDBNull(19) ? null : reader.GetString(19))); + return new AuditEvent + { + EventId = Guid.Parse(reader.GetString(0)), + OccurredAtUtc = new DateTimeOffset(DateTime.SpecifyKind( + DateTime.Parse(reader.GetString(1), + System.Globalization.CultureInfo.InvariantCulture, + System.Globalization.DateTimeStyles.RoundtripKind), + DateTimeKind.Utc)), + Actor = reader.GetString(2), + Action = reader.GetString(3), + Outcome = AuditRowProjection.ParseEnum(reader.GetString(4), AuditOutcome.Success), + Category = reader.IsDBNull(5) ? null : reader.GetString(5), + Target = reader.IsDBNull(6) ? null : reader.GetString(6), + SourceNode = reader.IsDBNull(7) ? null : reader.GetString(7), + CorrelationId = reader.IsDBNull(8) ? null : Guid.Parse(reader.GetString(8)), + DetailsJson = reader.IsDBNull(9) ? null : reader.GetString(9), + }; } /// @@ -903,7 +886,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable { /// Initializes a new instance of the PendingAuditEvent class. /// The canonical audit event to persist. - /// Site-local forwarding state stored alongside the canonical row (C3 shim — not a canonical field). + /// Initial site-local forwarding state written to the sidecar row (always Pending for fresh events). public PendingAuditEvent(AuditEvent evt, AuditForwardState forwardState) { Event = evt; @@ -913,7 +896,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable /// The canonical audit event to persist. public AuditEvent Event { get; } - /// Site-local forwarding state for this row (C3 shim — bound to the ForwardState column). + /// Initial forwarding state for this row's sidecar (bound to audit_forward_state.ForwardState). public AuditForwardState ForwardState { get; } /// Task completion source for write completion signaling. public TaskCompletionSource Completion { get; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs index 7b91320f..5284cd87 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs @@ -10,9 +10,12 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Site; /// -/// Bundle B (M2-T1) schema-bootstrap tests for . -/// Uses an in-memory shared-cache SQLite database so the same connection name -/// reaches the same file-less db across both the writer and the verifier. +/// C4 (Task 2.5) schema-bootstrap tests for 's +/// two-table site schema — the append-only canonical audit_event table + +/// the mutable operational audit_forward_state sidecar + the IX_fwd +/// drain index. Uses an in-memory shared-cache SQLite database so the same +/// connection name reaches the same file-less db across both the writer and the +/// verifier. /// public class SqliteAuditWriterSchemaTests { @@ -38,6 +41,16 @@ public class SqliteAuditWriterSchemaTests return (writer, dataSource); } + private static SqliteAuditWriter CreateWriterOver(string dataSource) + { + var options = new SqliteAuditWriterOptions { DatabasePath = dataSource }; + return new SqliteAuditWriter( + Options.Create(options), + NullLogger.Instance, + new FakeNodeIdentityProvider(), + connectionStringOverride: $"Data Source={dataSource};Cache=Shared"); + } + private static SqliteConnection OpenVerifierConnection(string dataSource) { var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); @@ -45,15 +58,37 @@ public class SqliteAuditWriterSchemaTests return connection; } - [Fact] - public void Opens_Creates_AuditLog_Table_With_23Columns_And_PK_On_EventId() + private static List ColumnNames(SqliteConnection connection, string table) { - var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_AuditLog_Table_With_23Columns_And_PK_On_EventId)); + using var cmd = connection.CreateCommand(); + cmd.CommandText = $"PRAGMA table_info({table});"; + using var reader = cmd.ExecuteReader(); + var names = new List(); + while (reader.Read()) + { + names.Add(reader.GetString(1)); + } + return names; + } + + private static bool TableExists(SqliteConnection connection, string table) + { + using var cmd = connection.CreateCommand(); + cmd.CommandText = + "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = $name;"; + cmd.Parameters.AddWithValue("$name", table); + return Convert.ToInt32(cmd.ExecuteScalar()) > 0; + } + + [Fact] + public void Opens_Creates_audit_event_Canonical_Table_With_10Columns_And_PK_On_EventId() + { + var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_audit_event_Canonical_Table_With_10Columns_And_PK_On_EventId)); using (writer) { using var connection = OpenVerifierConnection(dataSource); using var cmd = connection.CreateCommand(); - cmd.CommandText = "PRAGMA table_info(AuditLog);"; + cmd.CommandText = "PRAGMA table_info(audit_event);"; using var reader = cmd.ExecuteReader(); var columns = new List<(string Name, int Pk)>(); @@ -62,16 +97,13 @@ public class SqliteAuditWriterSchemaTests columns.Add((reader.GetString(1), reader.GetInt32(5))); } - Assert.Equal(23, columns.Count); - + // The 10 canonical ZB.MOM.WW.Audit.AuditEvent fields, stored directly. var expected = new[] { - "EventId", "OccurredAtUtc", "Channel", "Kind", "CorrelationId", - "SourceSiteId", "SourceNode", "SourceInstanceId", "SourceScript", "Actor", "Target", - "Status", "HttpStatus", "DurationMs", "ErrorMessage", "ErrorDetail", - "RequestSummary", "ResponseSummary", "PayloadTruncated", "Extra", - "ForwardState", "ExecutionId", "ParentExecutionId", + "EventId", "OccurredAtUtc", "Actor", "Action", "Outcome", + "Category", "Target", "SourceNode", "CorrelationId", "DetailsJson", }; + Assert.Equal(10, columns.Count); Assert.Equal(expected.OrderBy(n => n), columns.Select(c => c.Name).OrderBy(n => n)); // PK is EventId only. @@ -82,27 +114,46 @@ public class SqliteAuditWriterSchemaTests } [Fact] - public void Initialize_creates_AuditLog_with_SourceNode_column() + public void Opens_Creates_audit_forward_state_Sidecar_Table_With_Expected_Columns() { - var (writer, dataSource) = CreateWriter(nameof(Initialize_creates_AuditLog_with_SourceNode_column)); - using (writer) - { - using var connection = OpenVerifierConnection(dataSource); - Assert.True( - ColumnExists(connection, "SourceNode"), - "Fresh AuditLog schema must include the SourceNode column."); - } - } - - [Fact] - public void Opens_Creates_IX_ForwardState_Occurred_Index() - { - var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_IX_ForwardState_Occurred_Index)); + var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_audit_forward_state_Sidecar_Table_With_Expected_Columns)); using (writer) { using var connection = OpenVerifierConnection(dataSource); using var cmd = connection.CreateCommand(); - cmd.CommandText = "PRAGMA index_list(AuditLog);"; + cmd.CommandText = "PRAGMA table_info(audit_forward_state);"; + using var reader = cmd.ExecuteReader(); + + var columns = new List<(string Name, int Pk)>(); + while (reader.Read()) + { + columns.Add((reader.GetString(1), reader.GetInt32(5))); + } + + var expected = new[] + { + "EventId", "ForwardState", "OccurredAtUtc", + "IsCachedKind", "AttemptCount", "LastAttemptUtc", + }; + Assert.Equal(6, columns.Count); + Assert.Equal(expected.OrderBy(n => n), columns.Select(c => c.Name).OrderBy(n => n)); + + // PK is EventId only. + var pkColumns = columns.Where(c => c.Pk > 0).Select(c => c.Name).ToList(); + Assert.Single(pkColumns); + Assert.Equal("EventId", pkColumns[0]); + } + } + + [Fact] + public void Opens_Creates_IX_fwd_Index_On_ForwardState_IsCachedKind_Occurred() + { + var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_IX_fwd_Index_On_ForwardState_IsCachedKind_Occurred)); + using (writer) + { + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "PRAGMA index_list(audit_forward_state);"; using var reader = cmd.ExecuteReader(); var indexNames = new List(); @@ -111,11 +162,12 @@ public class SqliteAuditWriterSchemaTests indexNames.Add(reader.GetString(1)); } - Assert.Contains("IX_SiteAuditLog_ForwardState_Occurred", indexNames); + Assert.Contains("IX_fwd", indexNames); - // Verify the index columns are ForwardState, OccurredAtUtc in that order. + // Verify the index columns are ForwardState, IsCachedKind, OccurredAtUtc + // in that order. using var infoCmd = connection.CreateCommand(); - infoCmd.CommandText = "PRAGMA index_info(IX_SiteAuditLog_ForwardState_Occurred);"; + infoCmd.CommandText = "PRAGMA index_info(IX_fwd);"; using var infoReader = infoCmd.ExecuteReader(); var indexColumns = new List(); @@ -124,7 +176,7 @@ public class SqliteAuditWriterSchemaTests indexColumns.Add(infoReader.GetString(2)); } - Assert.Equal(new[] { "ForwardState", "OccurredAtUtc" }, indexColumns); + Assert.Equal(new[] { "ForwardState", "IsCachedKind", "OccurredAtUtc" }, indexColumns); } } @@ -144,258 +196,17 @@ public class SqliteAuditWriterSchemaTests } } - // ----- ExecutionId schema-upgrade regression (persistent auditlog.db) ----- // + // ----- C4 ephemeral in-place reset: old single-table schema is dropped ----- // /// - /// The OLD pre-ExecutionId-branch AuditLog schema — the 20-column - /// CREATE TABLE WITHOUT the ExecutionId column. A real deployment's - /// on-disk auditlog.db already contains exactly this shape, and - /// CREATE TABLE IF NOT EXISTS is a no-op against it. + /// The OLD pre-C4 single 24-column AuditLog table — exactly the shape a + /// pre-C4 deployment's on-disk auditlog.db contains. The site store is + /// ephemeral (≈7-day retention, recreated per deployment), so C4 RESETS in + /// place: the new two-table schema is created and this old table is DROP-ped. + /// No SQLite data migration is performed (or needed) — any rows it holds are + /// within the retention window and discarded. /// - private const string OldPreExecutionIdSchema = """ - CREATE TABLE IF NOT EXISTS AuditLog ( - EventId TEXT NOT NULL, - OccurredAtUtc TEXT NOT NULL, - Channel TEXT NOT NULL, - Kind TEXT NOT NULL, - CorrelationId TEXT NULL, - SourceSiteId TEXT NULL, - SourceInstanceId TEXT NULL, - SourceScript TEXT NULL, - Actor TEXT NULL, - Target TEXT NULL, - Status TEXT NOT NULL, - HttpStatus INTEGER NULL, - DurationMs INTEGER NULL, - ErrorMessage TEXT NULL, - ErrorDetail TEXT NULL, - RequestSummary TEXT NULL, - ResponseSummary TEXT NULL, - PayloadTruncated INTEGER NOT NULL, - Extra TEXT NULL, - ForwardState TEXT NOT NULL, - PRIMARY KEY (EventId) - ); - CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred - ON AuditLog (ForwardState, OccurredAtUtc); - """; - - /// - /// Seeds a shared-cache in-memory database with the OLD 20-column schema and - /// returns the open connection. The connection MUST stay open for the - /// lifetime of the test: a shared-cache in-memory database is dropped once - /// its last connection closes, so closing this would discard the seeded - /// schema before the writer opens its own connection. - /// - private static SqliteConnection SeedOldSchemaDatabase(string dataSource) - { - var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); - connection.Open(); - using var cmd = connection.CreateCommand(); - cmd.CommandText = OldPreExecutionIdSchema; - cmd.ExecuteNonQuery(); - return connection; - } - - private static SqliteAuditWriter CreateWriterOver(string dataSource) - { - var options = new SqliteAuditWriterOptions { DatabasePath = dataSource }; - return new SqliteAuditWriter( - Options.Create(options), - NullLogger.Instance, - new FakeNodeIdentityProvider(), - connectionStringOverride: $"Data Source={dataSource};Cache=Shared"); - } - - private static bool ColumnExists(SqliteConnection connection, string columnName) - { - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name"; - cmd.Parameters.AddWithValue("$name", columnName); - return Convert.ToInt32(cmd.ExecuteScalar()) > 0; - } - - [Fact] - public async Task Opening_Over_PreExisting_OldSchema_Db_Adds_ExecutionId_Column_And_WriteAsync_RoundTrips() - { - var dataSource = $"file:{nameof(Opening_Over_PreExisting_OldSchema_Db_Adds_ExecutionId_Column_And_WriteAsync_RoundTrips)}-{Guid.NewGuid():N}?mode=memory&cache=shared"; - - // A pre-branch deployment: auditlog.db already exists with the 20-column - // schema and NO ExecutionId column. - using var seedConnection = SeedOldSchemaDatabase(dataSource); - Assert.False(ColumnExists(seedConnection, "ExecutionId")); - - // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its - // InitializeSchema must ALTER the missing ExecutionId column in — the - // CREATE TABLE IF NOT EXISTS alone is a no-op against the existing table. - var executionId = Guid.NewGuid(); - await using (var writer = CreateWriterOver(dataSource)) - { - Assert.True( - ColumnExists(seedConnection, "ExecutionId"), - "SqliteAuditWriter must ALTER the ExecutionId column into a pre-existing AuditLog table."); - - // A WriteAsync binding $ExecutionId must now succeed and round-trip; - // without the ALTER it would fail with "no such column: ExecutionId" - // and — because audit writes are best-effort — silently drop the row. - var evt = ScadaBridgeAuditEventFactory.Create( - eventId: Guid.NewGuid(), - occurredAtUtc: DateTime.UtcNow, - channel: AuditChannel.ApiOutbound, - kind: AuditKind.ApiCall, - status: AuditStatus.Delivered, - executionId: executionId); - await writer.WriteAsync(evt); - - var rows = await writer.ReadPendingAsync(limit: 10); - var row = Assert.Single(rows); - Assert.Equal(executionId, row.AsRow().ExecutionId); - } - - // Idempotency: a second writer over the now-upgraded DB must not error - // (the probe sees ExecutionId already present and skips the ALTER). - await using (var writerAgain = CreateWriterOver(dataSource)) - { - Assert.True(ColumnExists(seedConnection, "ExecutionId")); - } - } - - // ----- ParentExecutionId schema-upgrade regression (persistent auditlog.db) ----- // - - /// - /// The pre-ParentExecutionId-branch AuditLog schema — the 21-column - /// CREATE TABLE that HAS ExecutionId but is WITHOUT - /// ParentExecutionId. A deployment that ran the ExecutionId branch - /// already has an on-disk auditlog.db in exactly this shape, and - /// CREATE TABLE IF NOT EXISTS is a no-op against it. - /// - private const string OldPreParentExecutionIdSchema = """ - CREATE TABLE IF NOT EXISTS AuditLog ( - EventId TEXT NOT NULL, - OccurredAtUtc TEXT NOT NULL, - Channel TEXT NOT NULL, - Kind TEXT NOT NULL, - CorrelationId TEXT NULL, - SourceSiteId TEXT NULL, - SourceInstanceId TEXT NULL, - SourceScript TEXT NULL, - Actor TEXT NULL, - Target TEXT NULL, - Status TEXT NOT NULL, - HttpStatus INTEGER NULL, - DurationMs INTEGER NULL, - ErrorMessage TEXT NULL, - ErrorDetail TEXT NULL, - RequestSummary TEXT NULL, - ResponseSummary TEXT NULL, - PayloadTruncated INTEGER NOT NULL, - Extra TEXT NULL, - ForwardState TEXT NOT NULL, - ExecutionId TEXT NULL, - PRIMARY KEY (EventId) - ); - CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred - ON AuditLog (ForwardState, OccurredAtUtc); - """; - - /// - /// Seeds a shared-cache in-memory database with the pre-ParentExecutionId - /// 21-column schema and returns the open connection. The connection MUST - /// stay open for the lifetime of the test — a shared-cache in-memory - /// database is dropped once its last connection closes. - /// - private static SqliteConnection SeedPreParentExecutionIdSchemaDatabase(string dataSource) - { - var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); - connection.Open(); - using var cmd = connection.CreateCommand(); - cmd.CommandText = OldPreParentExecutionIdSchema; - cmd.ExecuteNonQuery(); - return connection; - } - - [Fact] - public async Task Opening_Over_PreExisting_PreParentExecutionId_Db_Adds_ParentExecutionId_Column_And_WriteAsync_RoundTrips() - { - var dataSource = $"file:{nameof(Opening_Over_PreExisting_PreParentExecutionId_Db_Adds_ParentExecutionId_Column_And_WriteAsync_RoundTrips)}-{Guid.NewGuid():N}?mode=memory&cache=shared"; - - // A deployment that ran the ExecutionId branch: auditlog.db already - // exists with the 21-column schema and NO ParentExecutionId column. - using var seedConnection = SeedPreParentExecutionIdSchemaDatabase(dataSource); - Assert.True(ColumnExists(seedConnection, "ExecutionId")); - Assert.False(ColumnExists(seedConnection, "ParentExecutionId")); - - // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its - // InitializeSchema must ALTER the missing ParentExecutionId column in — - // the CREATE TABLE IF NOT EXISTS alone is a no-op against the existing - // table. - var executionId = Guid.NewGuid(); - var parentExecutionId = Guid.NewGuid(); - await using (var writer = CreateWriterOver(dataSource)) - { - Assert.True( - ColumnExists(seedConnection, "ParentExecutionId"), - "SqliteAuditWriter must ALTER the ParentExecutionId column into a pre-existing AuditLog table."); - - // A WriteAsync binding $ParentExecutionId must now succeed and - // round-trip; without the ALTER it would fail with "no such column: - // ParentExecutionId" and — because audit writes are best-effort — - // silently drop the row. - var evt = ScadaBridgeAuditEventFactory.Create( - eventId: Guid.NewGuid(), - occurredAtUtc: DateTime.UtcNow, - channel: AuditChannel.ApiOutbound, - kind: AuditKind.ApiCall, - status: AuditStatus.Delivered, - executionId: executionId, - parentExecutionId: parentExecutionId); - await writer.WriteAsync(evt); - - var rows = await writer.ReadPendingAsync(limit: 10); - var row = Assert.Single(rows); - Assert.Equal(executionId, row.AsRow().ExecutionId); - Assert.Equal(parentExecutionId, row.AsRow().ParentExecutionId); - } - - // Idempotency: a second writer over the now-upgraded DB must not error - // (the probe sees ParentExecutionId already present and skips the ALTER). - await using (var writerAgain = CreateWriterOver(dataSource)) - { - Assert.True(ColumnExists(seedConnection, "ParentExecutionId")); - } - } - - [Fact] - public async Task WriteAsync_NullParentExecutionId_RoundTripsAsNull() - { - var (writer, _) = CreateWriter(nameof(WriteAsync_NullParentExecutionId_RoundTripsAsNull)); - await using (writer) - { - var evt = ScadaBridgeAuditEventFactory.Create( - eventId: Guid.NewGuid(), - occurredAtUtc: DateTime.UtcNow, - channel: AuditChannel.Notification, - kind: AuditKind.NotifySend, - status: AuditStatus.Submitted); - // ParentExecutionId left null (not a factory arg → defaults null) - await writer.WriteAsync(evt); - - var rows = await writer.ReadPendingAsync(limit: 10); - var row = Assert.Single(rows); - Assert.Null(row.AsRow().ParentExecutionId); - } - } - - // ----- SourceNode schema-upgrade regression (persistent auditlog.db) ----- // - - /// - /// The pre-SourceNode AuditLog schema — the 22-column CREATE TABLE - /// that HAS ExecutionId + ParentExecutionId but is WITHOUT - /// SourceNode. A deployment that ran the ParentExecutionId branch - /// already has an on-disk auditlog.db in exactly this shape, and - /// CREATE TABLE IF NOT EXISTS is a no-op against it. - /// - private const string OldPreSourceNodeSchema = """ + private const string OldSingleTableSchema = """ CREATE TABLE IF NOT EXISTS AuditLog ( EventId TEXT NOT NULL, OccurredAtUtc TEXT NOT NULL, @@ -403,6 +214,7 @@ public class SqliteAuditWriterSchemaTests Kind TEXT NOT NULL, CorrelationId TEXT NULL, SourceSiteId TEXT NULL, + SourceNode TEXT NULL, SourceInstanceId TEXT NULL, SourceScript TEXT NULL, Actor TEXT NULL, @@ -426,67 +238,84 @@ public class SqliteAuditWriterSchemaTests """; /// - /// Seeds a shared-cache in-memory database with the pre-SourceNode 22-column - /// schema and returns the open connection. The connection MUST stay open for - /// the lifetime of the test — a shared-cache in-memory database is dropped - /// once its last connection closes. + /// Seeds a shared-cache in-memory database with the OLD single-table schema + /// and returns the open connection. The connection MUST stay open for the + /// lifetime of the test: a shared-cache in-memory database is dropped once its + /// last connection closes, so closing this would discard the seeded schema + /// before the writer opens its own connection. /// - private static SqliteConnection SeedPreSourceNodeSchemaDatabase(string dataSource) + private static SqliteConnection SeedOldSingleTableDatabase(string dataSource) { var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); connection.Open(); using var cmd = connection.CreateCommand(); - cmd.CommandText = OldPreSourceNodeSchema; + cmd.CommandText = OldSingleTableSchema; cmd.ExecuteNonQuery(); + // Seed one row so we can prove the reset discards it (ephemeral store). + using var insert = connection.CreateCommand(); + insert.CommandText = """ + INSERT INTO AuditLog ( + EventId, OccurredAtUtc, Channel, Kind, Status, PayloadTruncated, ForwardState + ) VALUES ( + $id, '2026-05-20T12:00:00.0000000Z', 'ApiOutbound', 'ApiCall', 'Delivered', 0, 'Pending' + ); + """; + insert.Parameters.AddWithValue("$id", Guid.NewGuid().ToString()); + insert.ExecuteNonQuery(); return connection; } [Fact] - public async Task Initialize_adds_SourceNode_to_pre_existing_schema() + public async Task Opening_Over_PreExisting_OldSingleTable_Db_Drops_It_And_Creates_Two_Table_Schema() { - var dataSource = $"file:{nameof(Initialize_adds_SourceNode_to_pre_existing_schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared"; + var dataSource = $"file:{nameof(Opening_Over_PreExisting_OldSingleTable_Db_Drops_It_And_Creates_Two_Table_Schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared"; - // A deployment that ran the ParentExecutionId branch: auditlog.db - // already exists with the 22-column schema and NO SourceNode column. - using var seedConnection = SeedPreSourceNodeSchemaDatabase(dataSource); - Assert.True(ColumnExists(seedConnection, "ExecutionId")); - Assert.True(ColumnExists(seedConnection, "ParentExecutionId")); - Assert.False(ColumnExists(seedConnection, "SourceNode")); + // A pre-C4 deployment: auditlog.db already exists with the old single + // 24-column AuditLog table (and a seeded row inside it). + using var seedConnection = SeedOldSingleTableDatabase(dataSource); + Assert.True(TableExists(seedConnection, "AuditLog")); - // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its - // InitializeSchema must ALTER the missing SourceNode column in — the - // CREATE TABLE IF NOT EXISTS alone is a no-op against the existing table. + // Upgrade: a C4 SqliteAuditWriter opens the same database. Its + // InitializeSchema RESETS in place — the old AuditLog table is dropped and + // the two new tables (+ IX_fwd) are created. No data is migrated. await using (var writer = CreateWriterOver(dataSource)) { - Assert.True( - ColumnExists(seedConnection, "SourceNode"), - "SqliteAuditWriter must ALTER the SourceNode column into a pre-existing AuditLog table."); + Assert.False( + TableExists(seedConnection, "AuditLog"), + "C4 must DROP the old single-table AuditLog on init (ephemeral in-place reset)."); + Assert.True(TableExists(seedConnection, "audit_event")); + Assert.True(TableExists(seedConnection, "audit_forward_state")); - // A WriteAsync binding $SourceNode must now succeed and round-trip; - // without the ALTER it would fail with "no such column: SourceNode" - // and — because audit writes are best-effort — silently drop the row. + // The two new tables start EMPTY — the old row was discarded, not + // migrated (the site store is ephemeral). + Assert.Empty(await writer.ReadPendingAsync(limit: 100)); + + // And a fresh WriteAsync round-trips through the new schema. var evt = ScadaBridgeAuditEventFactory.Create( eventId: Guid.NewGuid(), occurredAtUtc: DateTime.UtcNow, channel: AuditChannel.ApiOutbound, kind: AuditKind.ApiCall, - status: AuditStatus.Delivered, - sourceNode: "node-a"); + status: AuditStatus.Delivered); await writer.WriteAsync(evt); var rows = await writer.ReadPendingAsync(limit: 10); var row = Assert.Single(rows); - Assert.Equal("node-a", row.SourceNode); + Assert.Equal(evt.EventId, row.EventId); } - // Idempotency: a second writer over the now-upgraded DB must not error - // (the probe sees SourceNode already present and skips the ALTER). + // Idempotency: a second writer over the now-two-table DB must not error + // (DROP TABLE IF EXISTS is a no-op when AuditLog is already gone, and the + // CREATE TABLE IF NOT EXISTS statements are no-ops too). await using (var writerAgain = CreateWriterOver(dataSource)) { - Assert.True(ColumnExists(seedConnection, "SourceNode")); + Assert.True(TableExists(seedConnection, "audit_event")); + Assert.True(TableExists(seedConnection, "audit_forward_state")); } } + // ----- Canonical / sidecar field persistence ----- // + [Fact] public async Task WriteAsync_persists_SourceNode_field() { @@ -528,4 +357,31 @@ public class SqliteAuditWriterSchemaTests Assert.Null(row.SourceNode); } } + + [Fact] + public async Task WriteAsync_ExecutionId_RoundTrips_Through_DetailsJson() + { + var (writer, _) = CreateWriter(nameof(WriteAsync_ExecutionId_RoundTrips_Through_DetailsJson)); + await using (writer) + { + var executionId = Guid.NewGuid(); + var parentExecutionId = Guid.NewGuid(); + var evt = ScadaBridgeAuditEventFactory.Create( + eventId: Guid.NewGuid(), + occurredAtUtc: DateTime.UtcNow, + channel: AuditChannel.ApiOutbound, + kind: AuditKind.ApiCall, + status: AuditStatus.Delivered, + executionId: executionId, + parentExecutionId: parentExecutionId); + await writer.WriteAsync(evt); + + var rows = await writer.ReadPendingAsync(limit: 10); + var row = Assert.Single(rows); + // ExecutionId / ParentExecutionId ride inside DetailsJson; AsRow() + // decomposes them back out. + Assert.Equal(executionId, row.AsRow().ExecutionId); + Assert.Equal(parentExecutionId, row.AsRow().ParentExecutionId); + } + } } diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs index 07978a1f..868ab135 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs @@ -11,12 +11,13 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Site; /// -/// Bundle B (M2-T2) hot-path tests for . Exercise -/// the Channel-based enqueue, the background writer's batch INSERTs, duplicate- -/// EventId swallowing, ForwardState defaulting, and the -/// / -/// support surface that -/// Bundle D's telemetry actor will call. +/// C4 (Task 2.5) hot-path + drain tests for 's +/// two-table site schema. Exercise the Channel-based enqueue, the background +/// writer's per-event canonical(audit_event) + sidecar +/// (audit_forward_state) INSERTs, duplicate-EventId swallowing, the +/// IsCachedKind drain split, the four reads, and the +/// / +/// sidecar flips. /// public class SqliteAuditWriterWriteTests { @@ -52,10 +53,40 @@ public class SqliteAuditWriterWriteTests return connection; } - // C3 (Task 2.5): build the canonical ZB.MOM.WW.Audit.AuditEvent via the shared - // factory. The SQLite writer's transitional shim decomposes it into the 24 columns - // (defaulting ForwardState=Pending) on INSERT and recomposes the canonical record - // on read. ExecutionId/SourceNode ride through DetailsJson / the top-level field. + /// + /// Reads the sidecar ForwardState for one EventId (the column moved off + /// the single legacy table onto audit_forward_state in C4). + /// + private static string? ReadForwardState(string dataSource, Guid eventId) + { + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT ForwardState FROM audit_forward_state WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", eventId.ToString()); + return cmd.ExecuteScalar() as string; + } + + /// Sidecar ForwardState → row-count, grouped (replaces the legacy single-table GROUP BY). + private static Dictionary ForwardStateCounts(string dataSource) + { + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = + "SELECT ForwardState, COUNT(*) FROM audit_forward_state GROUP BY ForwardState;"; + using var reader = cmd.ExecuteReader(); + var byState = new Dictionary(); + while (reader.Read()) + { + byState[reader.GetString(0)] = reader.GetInt64(1); + } + return byState; + } + + // C4 (Task 2.5): build the canonical ZB.MOM.WW.Audit.AuditEvent via the shared + // factory. The SQLite writer stores the 10 canonical fields directly in + // audit_event and writes a Pending sidecar row into audit_forward_state, with + // IsCachedKind precomputed from the event's Kind. Reads recompose the canonical + // record directly from audit_event's columns. private static AuditEvent NewEvent( Guid? id = null, DateTime? occurredAtUtc = null, @@ -70,22 +101,62 @@ public class SqliteAuditWriterWriteTests executionId: executionId, sourceNode: sourceNode); + /// A cached-lifecycle event (IsCachedKind=1) — drains via the cached read surface. + private static AuditEvent NewCachedEvent( + Guid? id = null, + DateTime? occurredAtUtc = null, + AuditKind kind = AuditKind.ApiCallCached) + // Status is independent of IsCachedKind (which is derived from Kind); + // Submitted is the natural first-row status for a cached lifecycle. + => ScadaBridgeAuditEventFactory.Create( + channel: AuditChannel.ApiOutbound, + kind: kind, + status: AuditStatus.Submitted, + eventId: id ?? Guid.NewGuid(), + occurredAtUtc: occurredAtUtc ?? DateTime.UtcNow); + [Fact] - public async Task WriteAsync_FreshEvent_PersistsWithForwardStatePending() + public async Task WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending() { - var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsWithForwardStatePending)); + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending)); await using var _ = writer; var evt = NewEvent(); await writer.WriteAsync(evt); + // Canonical row landed in audit_event. using var connection = OpenVerifierConnection(dataSource); - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;"; - cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); - var actual = cmd.ExecuteScalar() as string; + using var eventCmd = connection.CreateCommand(); + eventCmd.CommandText = "SELECT Action FROM audit_event WHERE EventId = $id;"; + eventCmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); + Assert.Equal(evt.Action, eventCmd.ExecuteScalar() as string); - Assert.Equal(AuditForwardState.Pending.ToString(), actual); + // Sidecar row landed Pending. + Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId)); + } + + [Fact] + public async Task WriteAsync_Roundtrips_Canonical_Fields_Through_Read() + { + var (writer, _) = CreateWriter(nameof(WriteAsync_Roundtrips_Canonical_Fields_Through_Read)); + await using var _w = writer; + + var evt = NewEvent() with { Target = "target-1", Actor = "user-1" }; + await writer.WriteAsync(evt); + + var rows = await writer.ReadPendingAsync(limit: 10); + var row = Assert.Single(rows); + + Assert.Equal(evt.EventId, row.EventId); + Assert.Equal(evt.OccurredAtUtc, row.OccurredAtUtc); + Assert.Equal("user-1", row.Actor); + Assert.Equal(evt.Action, row.Action); + Assert.Equal(evt.Outcome, row.Outcome); + Assert.Equal(evt.Category, row.Category); + Assert.Equal("target-1", row.Target); + Assert.Equal(evt.CorrelationId, row.CorrelationId); + // DetailsJson is stored verbatim and round-trips byte-for-byte. + Assert.Equal(evt.DetailsJson, row.DetailsJson); } [Fact] @@ -100,11 +171,14 @@ public class SqliteAuditWriterWriteTests async (evt, ct) => await writer.WriteAsync(evt, ct)); using var connection = OpenVerifierConnection(dataSource); - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT COUNT(*) FROM AuditLog;"; - var count = Convert.ToInt64(cmd.ExecuteScalar()); + using var eventCmd = connection.CreateCommand(); + eventCmd.CommandText = "SELECT COUNT(*) FROM audit_event;"; + Assert.Equal(1000, Convert.ToInt64(eventCmd.ExecuteScalar())); - Assert.Equal(1000, count); + // Every canonical row has its matching sidecar row. + using var sidecarCmd = connection.CreateCommand(); + sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state;"; + Assert.Equal(1000, Convert.ToInt64(sidecarCmd.ExecuteScalar())); } [Fact] @@ -122,36 +196,98 @@ public class SqliteAuditWriterWriteTests using var connection = OpenVerifierConnection(dataSource); using var countCmd = connection.CreateCommand(); - countCmd.CommandText = "SELECT COUNT(*) FROM AuditLog WHERE EventId = $id;"; + countCmd.CommandText = "SELECT COUNT(*) FROM audit_event WHERE EventId = $id;"; countCmd.Parameters.AddWithValue("$id", sharedId.ToString()); - var count = Convert.ToInt64(countCmd.ExecuteScalar()); + Assert.Equal(1, Convert.ToInt64(countCmd.ExecuteScalar())); - Assert.Equal(1, count); + // The sidecar likewise gained exactly one row (the canonical PK throws + // before the sidecar insert runs, so neither table double-inserts). + using var sidecarCmd = connection.CreateCommand(); + sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state WHERE EventId = $id;"; + sidecarCmd.Parameters.AddWithValue("$id", sharedId.ToString()); + Assert.Equal(1, Convert.ToInt64(sidecarCmd.ExecuteScalar())); using var targetCmd = connection.CreateCommand(); - targetCmd.CommandText = "SELECT Target FROM AuditLog WHERE EventId = $id;"; + targetCmd.CommandText = "SELECT Target FROM audit_event WHERE EventId = $id;"; targetCmd.Parameters.AddWithValue("$id", sharedId.ToString()); Assert.Equal("first", targetCmd.ExecuteScalar() as string); } [Fact] - public async Task WriteAsync_ForcesForwardStatePending_IfNull() + public async Task WriteAsync_ForcesSidecarForwardStatePending() { - var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesForwardStatePending_IfNull)); + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesSidecarForwardStatePending)); await using var _ = writer; - // C3 (Task 2.5): ForwardState is no longer a field on the canonical record; - // a fresh canonical event carries none, and the SQLite shim defaults it to - // Pending on INSERT — exactly the behaviour this test pins. + // C4 (Task 2.5): ForwardState is not a field on the canonical record; a + // fresh event's sidecar row defaults to Pending on INSERT. var evt = NewEvent(); await writer.WriteAsync(evt); + Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId)); + } + + // ----- IsCachedKind drain split (precomputed at insert) ----- // + + [Fact] + public async Task WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0() + { + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0)); + await using var _ = writer; + + var cached = NewCachedEvent(); // ApiCallCached → cached + var nonCached = NewEvent(); // ApiCall → not cached + await writer.WriteAsync(cached); + await writer.WriteAsync(nonCached); + using var connection = OpenVerifierConnection(dataSource); using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;"; - cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); + cmd.CommandText = "SELECT IsCachedKind FROM audit_forward_state WHERE EventId = $id;"; + var p = cmd.Parameters.Add("$id", SqliteType.Text); - Assert.Equal(AuditForwardState.Pending.ToString(), cmd.ExecuteScalar() as string); + p.Value = cached.EventId.ToString(); + Assert.Equal(1L, Convert.ToInt64(cmd.ExecuteScalar())); + + p.Value = nonCached.EventId.ToString(); + Assert.Equal(0L, Convert.ToInt64(cmd.ExecuteScalar())); + } + + [Theory] + [InlineData(AuditKind.CachedSubmit)] + [InlineData(AuditKind.ApiCallCached)] + [InlineData(AuditKind.DbWriteCached)] + [InlineData(AuditKind.CachedResolve)] + public async Task CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending(AuditKind kind) + { + var (writer, _) = CreateWriter($"{nameof(CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending)}-{kind}"); + await using var _w = writer; + + var cached = NewCachedEvent(kind: kind); + await writer.WriteAsync(cached); + + // The cached kind appears in the cached read surface... + var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10); + Assert.Single(cachedRows, r => r.EventId == cached.EventId); + + // ...and NOT in the audit-only read surface. + var pendingRows = await writer.ReadPendingAsync(limit: 10); + Assert.DoesNotContain(pendingRows, r => r.EventId == cached.EventId); + } + + [Fact] + public async Task NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry() + { + var (writer, _) = CreateWriter(nameof(NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry)); + await using var _w = writer; + + var nonCached = NewEvent(); // ApiCall — not a cached kind + await writer.WriteAsync(nonCached); + + var pendingRows = await writer.ReadPendingAsync(limit: 10); + Assert.Single(pendingRows, r => r.EventId == nonCached.EventId); + + var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10); + Assert.DoesNotContain(cachedRows, r => r.EventId == nonCached.EventId); } [Fact] @@ -184,9 +320,9 @@ public class SqliteAuditWriterWriteTests } [Fact] - public async Task MarkForwardedAsync_FlipsRowsToForwarded() + public async Task MarkForwardedAsync_FlipsSidecarRowsToForwarded() { - var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsRowsToForwarded)); + var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsSidecarRowsToForwarded)); await using var _ = writer; var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() }; @@ -197,20 +333,33 @@ public class SqliteAuditWriterWriteTests await writer.MarkForwardedAsync(ids); - using var connection = OpenVerifierConnection(dataSource); - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;"; - using var reader = cmd.ExecuteReader(); - var byState = new Dictionary(); - while (reader.Read()) - { - byState[reader.GetString(0)] = reader.GetInt64(1); - } - + var byState = ForwardStateCounts(dataSource); Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]); Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString())); } + [Fact] + public async Task MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc() + { + var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc)); + await using var _ = writer; + + var evt = NewEvent(); + await writer.WriteAsync(evt); + + await writer.MarkForwardedAsync(new[] { evt.EventId }); + + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = + "SELECT AttemptCount, LastAttemptUtc FROM audit_forward_state WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); + using var reader = cmd.ExecuteReader(); + Assert.True(reader.Read()); + Assert.Equal(1, reader.GetInt32(0)); // AttemptCount bumped 0 → 1 + Assert.False(reader.IsDBNull(1)); // LastAttemptUtc stamped + } + [Fact] public async Task MarkForwardedAsync_NonExistentId_NoThrow() { @@ -223,6 +372,23 @@ public class SqliteAuditWriterWriteTests // No assertion needed: the call must complete without throwing. } + [Fact] + public async Task ReadForwardedAsync_Returns_Only_Forwarded_Rows() + { + var (writer, _) = CreateWriter(nameof(ReadForwardedAsync_Returns_Only_Forwarded_Rows)); + await using var _w = writer; + + var forwarded = NewEvent(); + var pending = NewEvent(); + await writer.WriteAsync(forwarded); + await writer.WriteAsync(pending); + await writer.MarkForwardedAsync(new[] { forwarded.EventId }); + + var rows = await writer.ReadForwardedAsync(limit: 10); + var row = Assert.Single(rows); + Assert.Equal(forwarded.EventId, row.EventId); + } + // ----- M6 reconciliation pull surface ----- // [Fact] @@ -327,16 +493,7 @@ public class SqliteAuditWriterWriteTests await writer.MarkReconciledAsync(new[] { a.EventId, b.EventId, c.EventId }); - using var connection = OpenVerifierConnection(dataSource); - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;"; - using var reader = cmd.ExecuteReader(); - var byState = new Dictionary(); - while (reader.Read()) - { - byState[reader.GetString(0)] = reader.GetInt64(1); - } - + var byState = ForwardStateCounts(dataSource); Assert.Equal(3, byState[AuditForwardState.Reconciled.ToString()]); Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString())); Assert.False(byState.ContainsKey(AuditForwardState.Forwarded.ToString())); @@ -354,12 +511,7 @@ public class SqliteAuditWriterWriteTests // Re-call must not throw and must leave the single row Reconciled. await writer.MarkReconciledAsync(new[] { a.EventId }); - using var connection = OpenVerifierConnection(dataSource); - using var cmd = connection.CreateCommand(); - cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;"; - cmd.Parameters.AddWithValue("$id", a.EventId.ToString()); - - Assert.Equal(AuditForwardState.Reconciled.ToString(), cmd.ExecuteScalar() as string); + Assert.Equal(AuditForwardState.Reconciled.ToString(), ReadForwardState(dataSource, a.EventId)); } [Fact] @@ -372,12 +524,12 @@ public class SqliteAuditWriterWriteTests // Completes without throwing. } - // ----- ExecutionId column (universal per-run correlation value) ----- // + // ----- ExecutionId (rides DetailsJson, recomposed via AsRow) ----- // [Fact] - public async Task WriteAsync_NonNullExecutionId_RoundTripsThroughMapRow() + public async Task WriteAsync_NonNullExecutionId_RoundTrips() { - var (writer, _) = CreateWriter(nameof(WriteAsync_NonNullExecutionId_RoundTripsThroughMapRow)); + var (writer, _) = CreateWriter(nameof(WriteAsync_NonNullExecutionId_RoundTrips)); await using var _w = writer; var executionId = Guid.NewGuid(); @@ -458,46 +610,40 @@ public class SqliteAuditWriterWriteTests Assert.Null(row.SourceNode); } - // ----- C3 hardening: safe enum-parse in MapRow ----- // + // ----- C4 hardening: safe enum-parse in MapRow ----- // /// - /// C3 hardening (Task 2.5): a row whose Channel/Kind/Status columns hold - /// an unknown/renamed enum string must NOT fault the read path; it degrades - /// gracefully to the same fallbacks used by AuditRowProjection.Decompose - /// (ApiInbound / InboundRequest / Submitted). The read is exercised via the - /// public surface which calls - /// the private MapRow. + /// C4 hardening (Task 2.5): a row whose stored Outcome column holds an + /// unknown/renamed enum string must NOT fault the read path; it degrades + /// gracefully to (the safe + /// fallback). The read is + /// exercised via the public + /// surface which calls the private MapRow. /// [Fact] - public async Task ReadPendingAsync_UnknownEnumStrings_DoNotThrow_YieldFallbackValues() + public async Task ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback() { var (writer, dataSource) = CreateWriter( - nameof(ReadPendingAsync_UnknownEnumStrings_DoNotThrow_YieldFallbackValues)); + nameof(ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback)); await using var _ = writer; var evt = NewEvent(); await writer.WriteAsync(evt); - // Tamper: overwrite the three enum columns with unknown strings that are - // not declared AuditChannel/AuditKind/AuditStatus member names. + // Tamper: overwrite the canonical Outcome column with a string that is not + // a declared AuditOutcome member name. using (var conn = OpenVerifierConnection(dataSource)) using (var cmd = conn.CreateCommand()) { - cmd.CommandText = - "UPDATE AuditLog SET Channel = 'ObsoleteChannelV0', " + - "Kind = 'LegacyKindName', Status = 'RenamedStatus99' " + - "WHERE EventId = $id;"; + cmd.CommandText = "UPDATE audit_event SET Outcome = 'RenamedOutcome99' WHERE EventId = $id;"; cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); cmd.ExecuteNonQuery(); } - // Must not throw (previously would throw ArgumentException from Enum.Parse). + // Must not throw (a raw Enum.Parse would throw ArgumentException). var rows = await writer.ReadPendingAsync(limit: 10); var row = Assert.Single(rows); - var typedRow = row.AsRow(); - Assert.Equal(AuditChannel.ApiInbound, typedRow.Channel); - Assert.Equal(AuditKind.InboundRequest, typedRow.Kind); - Assert.Equal(AuditStatus.Submitted, typedRow.Status); + Assert.Equal(AuditOutcome.Success, row.Outcome); } }