diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs
index 1c0754ab..13af1da4 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
+using AuditOutcome = ZB.MOM.WW.Audit.AuditOutcome;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site;
@@ -19,15 +20,27 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Site;
///
///
///
-/// The schema is bootstrapped in the constructor (Bundle B-T1). The
-/// Channel-based hot-path + Bundle D
-/// / support
-/// surface are wired in Bundle B-T2.
+/// C4 (Task 2.5) — two-table schema. The site store is now two tables:
+/// the append-only canonical audit_event (the 10 canonical
+/// fields stored directly — NO 24-column decompose) and
+/// the mutable operational audit_forward_state sidecar that carries the
+/// forwarding lifecycle (), a duplicated
+/// OccurredAtUtc for the drain index range-scan, a precomputed
+/// IsCachedKind flag that drives the cached/non-cached drain split without
+/// re-parsing DetailsJson on the read hot-path, plus attempt bookkeeping.
+///
+///
+/// Ephemeral reset. The site SQLite store is ephemeral (≈7-day retention,
+/// recreated per deployment), so C4's schema change is an in-place RESET: the new
+/// tables are created and the old single 24-column AuditLog table is
+/// DROP-ped if present. No SQLite data migration is performed (and none is
+/// needed) — any rows in a pre-C4 AuditLog table are within the retention
+/// window and are discarded by the drop.
///
///
/// Site rows always carry on first
-/// insert; the central row-shape's IngestedAtUtc column does NOT live in
-/// the site SQLite schema — central stamps it on ingest.
+/// insert; the central row-shape's IngestedAtUtc is a DetailsJson field
+/// stamped by central on ingest, not a site column.
///
///
public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable
@@ -36,8 +49,10 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
// is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
// surfaced across all SQLite builds. We treat any constraint error on insert
- // as a duplicate-eventid race and swallow it (first-write-wins) — the index
- // on EventId is the only constraint on this table, so this scope is precise.
+ // as a duplicate-eventid race and swallow it (first-write-wins) — the PRIMARY
+ // KEY on audit_event.EventId is the constraint that fires first, so this scope
+ // is precise (the sidecar insert for the same EventId is in the same
+ // transaction and never reached once audit_event's insert throws).
private const int SqliteErrorConstraint = 19;
private readonly SqliteConnection _connection;
@@ -141,95 +156,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
pragmaCmd.ExecuteNonQuery();
}
- using var cmd = _connection.CreateCommand();
- cmd.CommandText = """
- CREATE TABLE IF NOT EXISTS AuditLog (
- EventId TEXT NOT NULL,
- OccurredAtUtc TEXT NOT NULL,
- Channel TEXT NOT NULL,
- Kind TEXT NOT NULL,
- CorrelationId TEXT NULL,
- SourceSiteId TEXT NULL,
- SourceNode TEXT NULL,
- SourceInstanceId TEXT NULL,
- SourceScript TEXT NULL,
- Actor TEXT NULL,
- Target TEXT NULL,
- Status TEXT NOT NULL,
- HttpStatus INTEGER NULL,
- DurationMs INTEGER NULL,
- ErrorMessage TEXT NULL,
- ErrorDetail TEXT NULL,
- RequestSummary TEXT NULL,
- ResponseSummary TEXT NULL,
- PayloadTruncated INTEGER NOT NULL,
- Extra TEXT NULL,
- ForwardState TEXT NOT NULL,
- ExecutionId TEXT NULL,
- ParentExecutionId TEXT NULL,
- PRIMARY KEY (EventId)
- );
- CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
- ON AuditLog (ForwardState, OccurredAtUtc);
- """;
- cmd.ExecuteNonQuery();
-
- // Audit Log #23 (ExecutionId): additively add the ExecutionId column.
- // CREATE TABLE IF NOT EXISTS above does NOT add columns to an AuditLog
- // table that already exists from a pre-ExecutionId build, so an
- // auditlog.db created by an older build needs the column ALTER-ed in.
- // The file is durable across restart/failover by design (7-day
- // retention), so without this step every WriteAsync on an upgraded
- // deployment would bind $ExecutionId against a missing column and the
- // best-effort write path would silently drop every site audit row.
- // SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is
- // probed first and the ALTER skipped when already there. The column is
- // nullable with no default, so any row written before this migration
- // reads back ExecutionId = null (back-compat).
- AddColumnIfMissing("ExecutionId", "TEXT NULL");
-
- // Audit Log #23 (ParentExecutionId): same idempotent upgrade path as
- // ExecutionId above. A deployment that already ran the ExecutionId
- // branch has an auditlog.db with the 21-column schema and no
- // ParentExecutionId column; CREATE TABLE IF NOT EXISTS cannot add it,
- // so it is ALTER-ed in here. Nullable with no default — rows written
- // before this migration read back ParentExecutionId = null.
- AddColumnIfMissing("ParentExecutionId", "TEXT NULL");
-
- // SourceNode stamping: same idempotent upgrade path as ExecutionId /
- // ParentExecutionId above. A deployment that already ran the
- // ParentExecutionId branch has an auditlog.db with the 22-column
- // schema and no SourceNode column; CREATE TABLE IF NOT EXISTS cannot
- // add it, so it is ALTER-ed in here. Nullable with no default — rows
- // written before this migration read back SourceNode = null.
- AddColumnIfMissing("SourceNode", "TEXT NULL");
- }
-
- ///
- /// Audit Log #23: additively adds a column to AuditLog only when
- /// it is not already present (used for ExecutionId and
- /// ParentExecutionId). SQLite lacks ADD COLUMN IF NOT EXISTS,
- /// so the schema is probed via PRAGMA table_info first. Idempotent —
- /// safe to run on every . Mirrors
- /// StoreAndForwardStorage.AddColumnIfMissingAsync; kept synchronous
- /// here to match the rest of this writer's bootstrap DDL.
- ///
- private void AddColumnIfMissing(string columnName, string columnDefinition)
- {
- using var probe = _connection.CreateCommand();
- probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name";
- probe.Parameters.AddWithValue("$name", columnName);
- var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
- if (exists)
+ // C4 (Task 2.5) — in-place reset. The site store is EPHEMERAL (≈7-day
+ // retention, recreated per deployment), so we do NOT migrate the old
+ // single 24-column AuditLog table to the new two-table shape: any rows
+ // it holds are within the retention window and discarded. DROP it if a
+ // pre-C4 deployment left it behind, then CREATE the two new tables. This
+ // is safe precisely BECAUSE the site store is ephemeral — never do this
+ // on a durable store (the central SQL Server side keeps its shim until
+ // C5 and is migrated, not reset).
+ using (var dropCmd = _connection.CreateCommand())
{
- return;
+ dropCmd.CommandText = "DROP TABLE IF EXISTS AuditLog;";
+ dropCmd.ExecuteNonQuery();
}
- using var alter = _connection.CreateCommand();
- // Column name + definition are caller-controlled constants, never user
- // input — safe to interpolate (parameters are not permitted in DDL).
- alter.CommandText = $"ALTER TABLE AuditLog ADD COLUMN {columnName} {columnDefinition}";
- alter.ExecuteNonQuery();
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ -- Canonical, append-only / write-once: the 10 fields of the canonical
+ -- ZB.MOM.WW.Audit.AuditEvent stored directly (DetailsJson carries the
+ -- ScadaBridge domain fields). No forwarding state lives here — that is
+ -- the audit_forward_state sidecar's concern.
+ CREATE TABLE IF NOT EXISTS audit_event (
+ EventId TEXT NOT NULL,
+ OccurredAtUtc TEXT NOT NULL,
+ Actor TEXT NOT NULL,
+ Action TEXT NOT NULL,
+ Outcome TEXT NOT NULL,
+ Category TEXT NULL,
+ Target TEXT NULL,
+ SourceNode TEXT NULL,
+ CorrelationId TEXT NULL,
+ DetailsJson TEXT NULL,
+ PRIMARY KEY (EventId)
+ );
+
+ -- Operational, mutable: the forwarding lifecycle for each canonical
+ -- row. OccurredAtUtc is duplicated here so the drain range-scan stays
+ -- on this one table's index; IsCachedKind is precomputed at insert so
+ -- the cached/non-cached drain split never re-parses DetailsJson on the
+ -- read hot-path.
+ CREATE TABLE IF NOT EXISTS audit_forward_state (
+ EventId TEXT NOT NULL,
+ ForwardState TEXT NOT NULL,
+ OccurredAtUtc TEXT NOT NULL,
+ IsCachedKind INTEGER NOT NULL,
+ AttemptCount INTEGER NOT NULL DEFAULT 0,
+ LastAttemptUtc TEXT NULL,
+ PRIMARY KEY (EventId),
+ FOREIGN KEY (EventId) REFERENCES audit_event(EventId)
+ );
+
+ -- Drain index: every read filters on (ForwardState, IsCachedKind) and
+ -- range-scans/orders by OccurredAtUtc, so this composite covers the
+ -- four reads + the backlog COUNT/MIN.
+ CREATE INDEX IF NOT EXISTS IX_fwd
+ ON audit_forward_state (ForwardState, IsCachedKind, OccurredAtUtc);
+ """;
+ cmd.ExecuteNonQuery();
}
///
@@ -237,9 +220,9 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
{
ArgumentNullException.ThrowIfNull(evt);
- // C3 transitional shim: the canonical record carries no ForwardState
- // (a site-storage-only concern). Site rows always start Pending; the
- // forwarding columns + queries are unchanged from the 24-column schema.
+ // The canonical record carries no ForwardState (a site-storage-only
+ // concern). Site rows always start Pending; the sidecar row is written
+ // alongside the canonical row in the same transaction.
var pending = new PendingAuditEvent(evt, AuditForwardState.Pending);
// CreateBounded(FullMode=Wait) means WriteAsync will await room rather
@@ -313,101 +296,99 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
using var transaction = _connection.BeginTransaction();
try
{
- using var cmd = _connection.CreateCommand();
- cmd.Transaction = transaction;
- cmd.CommandText = """
- INSERT INTO AuditLog (
- EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
- SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
- Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
- RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
- ExecutionId, ParentExecutionId
+ // INSERT 1: the canonical row, stored DIRECTLY (the 10 canonical
+ // fields straight off the AuditEvent — no Decompose; audit_event
+ // holds canonical shape, not the legacy 24-column shape).
+ using var eventCmd = _connection.CreateCommand();
+ eventCmd.Transaction = transaction;
+ eventCmd.CommandText = """
+ INSERT INTO audit_event (
+ EventId, OccurredAtUtc, Actor, Action, Outcome,
+ Category, Target, SourceNode, CorrelationId, DetailsJson
) VALUES (
- $EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId,
- $SourceSiteId, $SourceNode, $SourceInstanceId, $SourceScript, $Actor, $Target,
- $Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail,
- $RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState,
- $ExecutionId, $ParentExecutionId
+ $EventId, $OccurredAtUtc, $Actor, $Action, $Outcome,
+ $Category, $Target, $SourceNode, $CorrelationId, $DetailsJson
);
""";
+ var eEventId = eventCmd.Parameters.Add("$EventId", SqliteType.Text);
+ var eOccurredAt = eventCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
+ var eActor = eventCmd.Parameters.Add("$Actor", SqliteType.Text);
+ var eAction = eventCmd.Parameters.Add("$Action", SqliteType.Text);
+ var eOutcome = eventCmd.Parameters.Add("$Outcome", SqliteType.Text);
+ var eCategory = eventCmd.Parameters.Add("$Category", SqliteType.Text);
+ var eTarget = eventCmd.Parameters.Add("$Target", SqliteType.Text);
+ var eSourceNode = eventCmd.Parameters.Add("$SourceNode", SqliteType.Text);
+ var eCorrelationId = eventCmd.Parameters.Add("$CorrelationId", SqliteType.Text);
+ var eDetailsJson = eventCmd.Parameters.Add("$DetailsJson", SqliteType.Text);
- var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text);
- var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
- var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text);
- var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text);
- var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text);
- var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text);
- var pSourceNode = cmd.Parameters.Add("$SourceNode", SqliteType.Text);
- var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text);
- var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text);
- var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text);
- var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text);
- var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text);
- var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer);
- var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer);
- var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text);
- var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text);
- var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text);
- var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text);
- var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer);
- var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text);
- var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text);
- var pExecutionId = cmd.Parameters.Add("$ExecutionId", SqliteType.Text);
- var pParentExecutionId = cmd.Parameters.Add("$ParentExecutionId", SqliteType.Text);
+ // INSERT 2: the operational sidecar row. ForwardState=Pending,
+ // OccurredAtUtc duplicated for the drain index, IsCachedKind
+ // precomputed (so the read split never parses DetailsJson),
+ // AttemptCount=0, LastAttemptUtc=NULL.
+ using var fwdCmd = _connection.CreateCommand();
+ fwdCmd.Transaction = transaction;
+ fwdCmd.CommandText = """
+ INSERT INTO audit_forward_state (
+ EventId, ForwardState, OccurredAtUtc, IsCachedKind, AttemptCount, LastAttemptUtc
+ ) VALUES (
+ $EventId, $ForwardState, $OccurredAtUtc, $IsCachedKind, 0, NULL
+ );
+ """;
+ var fEventId = fwdCmd.Parameters.Add("$EventId", SqliteType.Text);
+ var fForwardState = fwdCmd.Parameters.Add("$ForwardState", SqliteType.Text);
+ var fOccurredAt = fwdCmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
+ var fIsCachedKind = fwdCmd.Parameters.Add("$IsCachedKind", SqliteType.Integer);
foreach (var pending in batch)
{
- // C3 transitional shim: decompose the canonical record into
- // the typed 24-column values the existing SQLite schema
- // expects (Channel/Kind/Status + the DetailsJson domain
- // fields). ForwardState rides alongside the canonical record
- // (site-storage-only) and is bound from pending.ForwardState.
- var r = AuditRowProjection.Decompose(pending.Event);
- pEventId.Value = r.EventId.ToString();
- pOccurredAt.Value = r.OccurredAtUtc.ToString("o");
- pChannel.Value = r.Channel.ToString();
- pKind.Value = r.Kind.ToString();
- pCorrelationId.Value = (object?)r.CorrelationId?.ToString() ?? DBNull.Value;
- pSourceSiteId.Value = (object?)r.SourceSiteId ?? DBNull.Value;
+ var evt = pending.Event;
+ // Canonical OccurredAtUtc is UTC by construction; store the
+ // round-trip "o" form so string comparison stays monotonic
+ // (the drain range-scan and ORDER BY rely on it).
+ var occurredText = evt.OccurredAtUtc.UtcDateTime.ToString(
+ "o", System.Globalization.CultureInfo.InvariantCulture);
+
+ eEventId.Value = evt.EventId.ToString();
+ eOccurredAt.Value = occurredText;
+ // Canonical Actor is a required non-null string.
+ eActor.Value = evt.Actor ?? string.Empty;
+ eAction.Value = evt.Action;
+ eOutcome.Value = evt.Outcome.ToString();
+ eCategory.Value = (object?)evt.Category ?? DBNull.Value;
+ eTarget.Value = (object?)evt.Target ?? DBNull.Value;
// SourceNode-stamping: caller-provided value wins (preserves
// rows reconciled in from other nodes via the same writer);
// otherwise stamp from the local INodeIdentityProvider. The
// event record itself is NOT mutated — stamping is at write
// time only. If the provider also returns null (unconfigured
- // node), the row's SourceNode stays NULL — operators see
- // "needs config" via the schema, not a magic fallback string.
- var sourceNode = r.SourceNode ?? _nodeIdentity.NodeName;
- pSourceNode.Value = (object?)sourceNode ?? DBNull.Value;
- pSourceInstanceId.Value = (object?)r.SourceInstanceId ?? DBNull.Value;
- pSourceScript.Value = (object?)r.SourceScript ?? DBNull.Value;
- pActor.Value = (object?)r.Actor ?? DBNull.Value;
- pTarget.Value = (object?)r.Target ?? DBNull.Value;
- pStatus.Value = r.Status.ToString();
- pHttpStatus.Value = (object?)r.HttpStatus ?? DBNull.Value;
- pDurationMs.Value = (object?)r.DurationMs ?? DBNull.Value;
- pErrorMessage.Value = (object?)r.ErrorMessage ?? DBNull.Value;
- pErrorDetail.Value = (object?)r.ErrorDetail ?? DBNull.Value;
- pRequestSummary.Value = (object?)r.RequestSummary ?? DBNull.Value;
- pResponseSummary.Value = (object?)r.ResponseSummary ?? DBNull.Value;
- pPayloadTruncated.Value = r.PayloadTruncated ? 1 : 0;
- pExtra.Value = (object?)r.Extra ?? DBNull.Value;
- pForwardState.Value = pending.ForwardState.ToString();
- pExecutionId.Value = (object?)r.ExecutionId?.ToString() ?? DBNull.Value;
- pParentExecutionId.Value = (object?)r.ParentExecutionId?.ToString() ?? DBNull.Value;
+ // node), the column stays NULL — operators see "needs config"
+ // via the schema, not a magic fallback string.
+ var sourceNode = evt.SourceNode ?? _nodeIdentity.NodeName;
+ eSourceNode.Value = (object?)sourceNode ?? DBNull.Value;
+ eCorrelationId.Value = (object?)evt.CorrelationId?.ToString() ?? DBNull.Value;
+ eDetailsJson.Value = (object?)evt.DetailsJson ?? DBNull.Value;
+
+ fEventId.Value = evt.EventId.ToString();
+ fForwardState.Value = pending.ForwardState.ToString();
+ fOccurredAt.Value = occurredText;
+ fIsCachedKind.Value = IsCachedKind(evt.DetailsJson) ? 1 : 0;
try
{
- cmd.ExecuteNonQuery();
+ eventCmd.ExecuteNonQuery();
+ fwdCmd.ExecuteNonQuery();
pending.Completion.TrySetResult();
}
catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
{
- // Duplicate EventId — first-write-wins (alog.md §11).
- // Treat as success: the lifecycle event is durably
- // recorded under the first writer's payload.
+ // Duplicate EventId — first-write-wins (alog.md §11). The
+ // audit_event PRIMARY KEY throws before the sidecar insert
+ // runs, so neither table gains a second row. Treat as
+ // success: the lifecycle event is durably recorded under
+ // the first writer's payload.
_logger.LogDebug(ex,
"Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
- r.EventId);
+ evt.EventId);
pending.Completion.TrySetResult();
}
}
@@ -429,17 +410,36 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry
// drain (joined with the operational tracking row + pushed via
// IngestCachedTelemetryAsync into the central dual-write transaction).
- // ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit
- // them; ReadPendingCachedTelemetryAsync below is the dedicated read surface
- // the new SiteAuditTelemetryActor cached-drain uses.
- private static readonly string[] CachedTelemetryKindNames =
+ // C4: this is the SAME set the pre-C4 ReadPendingCachedTelemetryAsync query
+ // filtered on (Kind IN (...)); it is now precomputed into the sidecar's
+ // IsCachedKind flag at INSERT (see IsCachedKind) so the read split is a cheap
+ // integer predicate, not a JSON parse. ReadPendingAsync drains everything
+ // with IsCachedKind=0; ReadPendingCachedTelemetryAsync drains IsCachedKind=1.
+ private static readonly HashSet CachedTelemetryKinds = new()
{
- nameof(AuditKind.CachedSubmit),
- nameof(AuditKind.ApiCallCached),
- nameof(AuditKind.DbWriteCached),
- nameof(AuditKind.CachedResolve),
+ AuditKind.CachedSubmit,
+ AuditKind.ApiCallCached,
+ AuditKind.DbWriteCached,
+ AuditKind.CachedResolve,
};
+ ///
+ /// C4: precomputes the sidecar's IsCachedKind flag from a canonical
+ /// row's DetailsJson. Parses the
+ /// discriminator via and returns true
+ /// iff it is one of the cached-lifecycle kinds
+ /// (, ,
+ /// , ).
+ /// Runs once per event at INSERT time so the cached/non-cached drain split is
+ /// a cheap integer predicate on read, never a JSON parse on the hot path.
+ ///
+ private static bool IsCachedKind(string? detailsJson)
+ {
+ var details = AuditDetailsCodec.Deserialize(detailsJson);
+ var kind = AuditRowProjection.ParseEnum(details.Kind, AuditKind.InboundRequest);
+ return CachedTelemetryKinds.Contains(kind);
+ }
+
///
public Task> ReadPendingAsync(int limit, CancellationToken ct = default)
{
@@ -451,47 +451,35 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-005: read via the dedicated _readConnection so this scan
// (which can be expensive when the backlog grows under a central
// outage) does not block the batched writer on _writeLock. WAL mode
- // gives us a stable snapshot of the table while writes proceed on the
+ // gives us a stable snapshot of the tables while writes proceed on the
// writer connection. _readLock serialises this connection across
// multiple concurrent read callers since SqliteConnection itself is
// not thread-safe.
- // AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the
- // cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync
- // + the combined-telemetry drain. Kind is stored as the enum's name (see
- // FlushBatch's pKind.Value), so a string-IN against the constant kind
- // names matches the on-disk shape exactly.
+ // C4: JOIN the sidecar and filter on IsCachedKind=0 — the cached-
+ // lifecycle kinds (IsCachedKind=1) flow through
+ // ReadPendingCachedTelemetryAsync + the combined-telemetry drain. The
+ // split is a precomputed integer predicate on the indexed sidecar, not
+ // a DetailsJson parse. Ordering is by the sidecar's OccurredAtUtc with
+ // EventId as the deterministic tiebreaker.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
- SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
- SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
- Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
- RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
- ExecutionId, ParentExecutionId
- FROM AuditLog
- WHERE ForwardState = $pending
- AND Kind NOT IN ($k0, $k1, $k2, $k3)
- ORDER BY OccurredAtUtc ASC, EventId ASC
+ SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
+ ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
+ FROM audit_event ae
+ JOIN audit_forward_state fs ON fs.EventId = ae.EventId
+ WHERE fs.ForwardState = $pending
+ AND fs.IsCachedKind = 0
+ ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
- cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
- cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
- cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
- cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
- var rows = new List(Math.Min(limit, 256));
- using var reader = cmd.ExecuteReader();
- while (reader.Read())
- {
- rows.Add(MapRow(reader));
- }
-
- return Task.FromResult>(rows);
+ return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -504,42 +492,29 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
- // AuditLog-001: dedicated read surface for the cached-call lifecycle
- // drain — symmetric to ReadPendingAsync but filtered to the four
- // cached AuditKinds. Same _readConnection + _readLock pattern so the
- // hot-path writer is not contended.
+ // AuditLog-001 / C4: dedicated read surface for the cached-call lifecycle
+ // drain — symmetric to ReadPendingAsync but filtered to IsCachedKind=1.
+ // Same _readConnection + _readLock pattern so the hot-path writer is not
+ // contended.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
- SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
- SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
- Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
- RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
- ExecutionId, ParentExecutionId
- FROM AuditLog
- WHERE ForwardState = $pending
- AND Kind IN ($k0, $k1, $k2, $k3)
- ORDER BY OccurredAtUtc ASC, EventId ASC
+ SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
+ ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
+ FROM audit_event ae
+ JOIN audit_forward_state fs ON fs.EventId = ae.EventId
+ WHERE fs.ForwardState = $pending
+ AND fs.IsCachedKind = 1
+ ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
- cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
- cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
- cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
- cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
- var rows = new List(Math.Min(limit, 256));
- using var reader = cmd.ExecuteReader();
- while (reader.Read())
- {
- rows.Add(MapRow(reader));
- }
-
- return Task.FromResult>(rows);
+ return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -565,34 +540,27 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// AuditLog-005: mirror ReadPendingAsync — read via _readConnection /
// _readLock so this query never contends with the batched writer on
- // _writeLock.
+ // _writeLock. C4: JOIN the sidecar and filter on ForwardState='Forwarded'
+ // (no IsCachedKind split — both cached and non-cached Forwarded rows are
+ // returned, as before).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
- SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
- SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
- Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
- RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
- ExecutionId, ParentExecutionId
- FROM AuditLog
- WHERE ForwardState = $forwarded
- ORDER BY OccurredAtUtc ASC, EventId ASC
+ SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
+ ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
+ FROM audit_event ae
+ JOIN audit_forward_state fs ON fs.EventId = ae.EventId
+ WHERE fs.ForwardState = $forwarded
+ ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
- var rows = new List(Math.Min(limit, 256));
- using var reader = cmd.ExecuteReader();
- while (reader.Read())
- {
- rows.Add(MapRow(reader));
- }
-
- return Task.FromResult>(rows);
+ return Task.FromResult(ReadRows(cmd, limit));
}
}
@@ -610,11 +578,16 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
- // Build a single IN (...) parameter list so we issue one UPDATE per
- // batch regardless of size. Each id is bound as its own parameter,
- // so no string concatenation of user data ever enters the SQL.
+ // C4: flip the sidecar — UPDATE audit_forward_state, not the canonical
+ // audit_event (which is append-only / write-once). Bump AttemptCount +
+ // stamp LastAttemptUtc so operators can see how many drain passes a row
+ // took to forward. Build a single IN (...) parameter list so we issue
+ // one UPDATE per batch regardless of size. Each id is bound as its own
+ // parameter, so no string concatenation of user data ever enters the SQL.
var sb = new System.Text.StringBuilder();
- sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN (");
+ sb.Append("UPDATE audit_forward_state SET ForwardState = $forwarded, ")
+ .Append("AttemptCount = AttemptCount + 1, LastAttemptUtc = $now ")
+ .Append("WHERE EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
@@ -625,6 +598,8 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
+ cmd.Parameters.AddWithValue("$now", DateTime.UtcNow.ToString(
+ "o", System.Globalization.CultureInfo.InvariantCulture));
cmd.ExecuteNonQuery();
return Task.CompletedTask;
@@ -641,22 +616,24 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
// AuditLog-005: read via _readConnection / _readLock — same lock-
- // decoupling as ReadPendingAsync.
+ // decoupling as ReadPendingAsync. C4: JOIN the sidecar; the range scan
+ // is on the sidecar's duplicated OccurredAtUtc so it stays on IX_fwd.
+ // Both Pending and Forwarded rows are returned (the central reconciliation
+ // puller dedups on EventId; re-shipping a Forwarded-but-not-yet-ingested
+ // row is safe).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
- SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
- SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
- Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
- RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
- ExecutionId, ParentExecutionId
- FROM AuditLog
- WHERE ForwardState IN ($pending, $forwarded)
- AND OccurredAtUtc >= $since
- ORDER BY OccurredAtUtc ASC, EventId ASC
+ SELECT ae.EventId, ae.OccurredAtUtc, ae.Actor, ae.Action, ae.Outcome,
+ ae.Category, ae.Target, ae.SourceNode, ae.CorrelationId, ae.DetailsJson
+ FROM audit_event ae
+ JOIN audit_forward_state fs ON fs.EventId = ae.EventId
+ WHERE fs.ForwardState IN ($pending, $forwarded)
+ AND fs.OccurredAtUtc >= $since
+ ORDER BY fs.OccurredAtUtc ASC, ae.EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
@@ -668,14 +645,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.Parameters.AddWithValue("$limit", batchSize);
- var rows = new List(Math.Min(batchSize, 256));
- using var reader = cmd.ExecuteReader();
- while (reader.Read())
- {
- rows.Add(MapRow(reader));
- }
-
- return Task.FromResult>(rows);
+ return Task.FromResult(ReadRows(cmd, batchSize));
}
}
@@ -693,8 +663,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
+ // C4: flip the sidecar from Pending/Forwarded → Reconciled. Rows
+ // already Reconciled are left untouched (idempotent re-call), and the
+ // canonical audit_event row is never modified.
var sb = new System.Text.StringBuilder();
- sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ")
+ sb.Append("UPDATE audit_forward_state SET ForwardState = $reconciled ")
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
@@ -726,18 +699,17 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// central outage the Pending backlog can grow to hundreds of thousands
// of rows and the COUNT(*) scan correspondingly stretches; that no
// longer adds tail latency to user-facing audit writes.
+ // C4: count over the sidecar (audit_forward_state) — the canonical
+ // audit_event table carries no ForwardState. The IX_fwd index makes both
+ // aggregates cheap (count is a covering scan, min is the first key).
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
- // Single round-trip — COUNT(*) + MIN(OccurredAtUtc) over the same
- // index range avoids a second scan. The IX_SiteAuditLog_ForwardState_Occurred
- // index makes both aggregates cheap (count is a covering scan, min
- // is the first key).
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT COUNT(*), MIN(OccurredAtUtc)
- FROM AuditLog
+ FROM audit_forward_state
WHERE ForwardState = $pending;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
@@ -788,38 +760,49 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
+ ///
+ /// Executes (one of the four reads, each already
+ /// projecting the 10 audit_event columns in canonical order) and
+ /// materialises the rows via .
+ ///
+ private static IReadOnlyList ReadRows(SqliteCommand cmd, int capacityHint)
+ {
+ var rows = new List(Math.Min(capacityHint, 256));
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rows.Add(MapRow(reader));
+ }
+ return rows;
+ }
+
+ ///
+ /// C4: builds the canonical DIRECTLY from the 10
+ /// stored audit_event columns — no 24-column Recompose, because
+ /// audit_event already holds the canonical fields + DetailsJson.
+ /// Outcome is stored as the enum's name; the safe
+ /// degrades an unknown/renamed
+ /// value gracefully rather than throwing.
+ ///
private static AuditEvent MapRow(SqliteDataReader reader)
{
- // C3 transitional shim: recompose the canonical record from the 24
- // columns. The ForwardState column (ordinal 20) is read for the
- // schema's sake but NOT placed on the canonical record — it stays a
- // site-storage-only concern (the forwarding queries below own it).
- return AuditRowProjection.Recompose(new AuditRowProjection.AuditRowValues(
- EventId: Guid.Parse(reader.GetString(0)),
- OccurredAtUtc: DateTime.Parse(reader.GetString(1),
- System.Globalization.CultureInfo.InvariantCulture,
- System.Globalization.DateTimeStyles.RoundtripKind),
- IngestedAtUtc: null,
- Channel: AuditRowProjection.ParseEnum(reader.GetString(2), AuditChannel.ApiInbound),
- Kind: AuditRowProjection.ParseEnum(reader.GetString(3), AuditKind.InboundRequest),
- Status: AuditRowProjection.ParseEnum(reader.GetString(11), AuditStatus.Submitted),
- CorrelationId: reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)),
- ExecutionId: reader.IsDBNull(21) ? null : Guid.Parse(reader.GetString(21)),
- ParentExecutionId: reader.IsDBNull(22) ? null : Guid.Parse(reader.GetString(22)),
- SourceSiteId: reader.IsDBNull(5) ? null : reader.GetString(5),
- SourceNode: reader.IsDBNull(6) ? null : reader.GetString(6),
- SourceInstanceId: reader.IsDBNull(7) ? null : reader.GetString(7),
- SourceScript: reader.IsDBNull(8) ? null : reader.GetString(8),
- Actor: reader.IsDBNull(9) ? null : reader.GetString(9),
- Target: reader.IsDBNull(10) ? null : reader.GetString(10),
- HttpStatus: reader.IsDBNull(12) ? null : reader.GetInt32(12),
- DurationMs: reader.IsDBNull(13) ? null : reader.GetInt32(13),
- ErrorMessage: reader.IsDBNull(14) ? null : reader.GetString(14),
- ErrorDetail: reader.IsDBNull(15) ? null : reader.GetString(15),
- RequestSummary: reader.IsDBNull(16) ? null : reader.GetString(16),
- ResponseSummary: reader.IsDBNull(17) ? null : reader.GetString(17),
- PayloadTruncated: reader.GetInt32(18) != 0,
- Extra: reader.IsDBNull(19) ? null : reader.GetString(19)));
+ return new AuditEvent
+ {
+ EventId = Guid.Parse(reader.GetString(0)),
+ OccurredAtUtc = new DateTimeOffset(DateTime.SpecifyKind(
+ DateTime.Parse(reader.GetString(1),
+ System.Globalization.CultureInfo.InvariantCulture,
+ System.Globalization.DateTimeStyles.RoundtripKind),
+ DateTimeKind.Utc)),
+ Actor = reader.GetString(2),
+ Action = reader.GetString(3),
+ Outcome = AuditRowProjection.ParseEnum(reader.GetString(4), AuditOutcome.Success),
+ Category = reader.IsDBNull(5) ? null : reader.GetString(5),
+ Target = reader.IsDBNull(6) ? null : reader.GetString(6),
+ SourceNode = reader.IsDBNull(7) ? null : reader.GetString(7),
+ CorrelationId = reader.IsDBNull(8) ? null : Guid.Parse(reader.GetString(8)),
+ DetailsJson = reader.IsDBNull(9) ? null : reader.GetString(9),
+ };
}
///
@@ -903,7 +886,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
{
/// Initializes a new instance of the PendingAuditEvent class.
/// The canonical audit event to persist.
- /// Site-local forwarding state stored alongside the canonical row (C3 shim — not a canonical field).
+ /// Initial site-local forwarding state written to the sidecar row (always Pending for fresh events).
public PendingAuditEvent(AuditEvent evt, AuditForwardState forwardState)
{
Event = evt;
@@ -913,7 +896,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
/// The canonical audit event to persist.
public AuditEvent Event { get; }
- /// Site-local forwarding state for this row (C3 shim — bound to the ForwardState column).
+ /// Initial forwarding state for this row's sidecar (bound to audit_forward_state.ForwardState).
public AuditForwardState ForwardState { get; }
/// Task completion source for write completion signaling.
public TaskCompletionSource Completion { get; }
diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs
index 7b91320f..5284cd87 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs
@@ -10,9 +10,12 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Site;
///
-/// Bundle B (M2-T1) schema-bootstrap tests for .
-/// Uses an in-memory shared-cache SQLite database so the same connection name
-/// reaches the same file-less db across both the writer and the verifier.
+/// C4 (Task 2.5) schema-bootstrap tests for 's
+/// two-table site schema — the append-only canonical audit_event table +
+/// the mutable operational audit_forward_state sidecar + the IX_fwd
+/// drain index. Uses an in-memory shared-cache SQLite database so the same
+/// connection name reaches the same file-less db across both the writer and the
+/// verifier.
///
public class SqliteAuditWriterSchemaTests
{
@@ -38,6 +41,16 @@ public class SqliteAuditWriterSchemaTests
return (writer, dataSource);
}
+ private static SqliteAuditWriter CreateWriterOver(string dataSource)
+ {
+ var options = new SqliteAuditWriterOptions { DatabasePath = dataSource };
+ return new SqliteAuditWriter(
+ Options.Create(options),
+ NullLogger.Instance,
+ new FakeNodeIdentityProvider(),
+ connectionStringOverride: $"Data Source={dataSource};Cache=Shared");
+ }
+
private static SqliteConnection OpenVerifierConnection(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
@@ -45,15 +58,37 @@ public class SqliteAuditWriterSchemaTests
return connection;
}
- [Fact]
- public void Opens_Creates_AuditLog_Table_With_23Columns_And_PK_On_EventId()
+ private static List ColumnNames(SqliteConnection connection, string table)
{
- var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_AuditLog_Table_With_23Columns_And_PK_On_EventId));
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = $"PRAGMA table_info({table});";
+ using var reader = cmd.ExecuteReader();
+ var names = new List();
+ while (reader.Read())
+ {
+ names.Add(reader.GetString(1));
+ }
+ return names;
+ }
+
+ private static bool TableExists(SqliteConnection connection, string table)
+ {
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText =
+ "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = $name;";
+ cmd.Parameters.AddWithValue("$name", table);
+ return Convert.ToInt32(cmd.ExecuteScalar()) > 0;
+ }
+
+ [Fact]
+ public void Opens_Creates_audit_event_Canonical_Table_With_10Columns_And_PK_On_EventId()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_audit_event_Canonical_Table_With_10Columns_And_PK_On_EventId));
using (writer)
{
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
- cmd.CommandText = "PRAGMA table_info(AuditLog);";
+ cmd.CommandText = "PRAGMA table_info(audit_event);";
using var reader = cmd.ExecuteReader();
var columns = new List<(string Name, int Pk)>();
@@ -62,16 +97,13 @@ public class SqliteAuditWriterSchemaTests
columns.Add((reader.GetString(1), reader.GetInt32(5)));
}
- Assert.Equal(23, columns.Count);
-
+ // The 10 canonical ZB.MOM.WW.Audit.AuditEvent fields, stored directly.
var expected = new[]
{
- "EventId", "OccurredAtUtc", "Channel", "Kind", "CorrelationId",
- "SourceSiteId", "SourceNode", "SourceInstanceId", "SourceScript", "Actor", "Target",
- "Status", "HttpStatus", "DurationMs", "ErrorMessage", "ErrorDetail",
- "RequestSummary", "ResponseSummary", "PayloadTruncated", "Extra",
- "ForwardState", "ExecutionId", "ParentExecutionId",
+ "EventId", "OccurredAtUtc", "Actor", "Action", "Outcome",
+ "Category", "Target", "SourceNode", "CorrelationId", "DetailsJson",
};
+ Assert.Equal(10, columns.Count);
Assert.Equal(expected.OrderBy(n => n), columns.Select(c => c.Name).OrderBy(n => n));
// PK is EventId only.
@@ -82,27 +114,46 @@ public class SqliteAuditWriterSchemaTests
}
[Fact]
- public void Initialize_creates_AuditLog_with_SourceNode_column()
+ public void Opens_Creates_audit_forward_state_Sidecar_Table_With_Expected_Columns()
{
- var (writer, dataSource) = CreateWriter(nameof(Initialize_creates_AuditLog_with_SourceNode_column));
- using (writer)
- {
- using var connection = OpenVerifierConnection(dataSource);
- Assert.True(
- ColumnExists(connection, "SourceNode"),
- "Fresh AuditLog schema must include the SourceNode column.");
- }
- }
-
- [Fact]
- public void Opens_Creates_IX_ForwardState_Occurred_Index()
- {
- var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_IX_ForwardState_Occurred_Index));
+ var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_audit_forward_state_Sidecar_Table_With_Expected_Columns));
using (writer)
{
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
- cmd.CommandText = "PRAGMA index_list(AuditLog);";
+ cmd.CommandText = "PRAGMA table_info(audit_forward_state);";
+ using var reader = cmd.ExecuteReader();
+
+ var columns = new List<(string Name, int Pk)>();
+ while (reader.Read())
+ {
+ columns.Add((reader.GetString(1), reader.GetInt32(5)));
+ }
+
+ var expected = new[]
+ {
+ "EventId", "ForwardState", "OccurredAtUtc",
+ "IsCachedKind", "AttemptCount", "LastAttemptUtc",
+ };
+ Assert.Equal(6, columns.Count);
+ Assert.Equal(expected.OrderBy(n => n), columns.Select(c => c.Name).OrderBy(n => n));
+
+ // PK is EventId only.
+ var pkColumns = columns.Where(c => c.Pk > 0).Select(c => c.Name).ToList();
+ Assert.Single(pkColumns);
+ Assert.Equal("EventId", pkColumns[0]);
+ }
+ }
+
+ [Fact]
+ public void Opens_Creates_IX_fwd_Index_On_ForwardState_IsCachedKind_Occurred()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(Opens_Creates_IX_fwd_Index_On_ForwardState_IsCachedKind_Occurred));
+ using (writer)
+ {
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "PRAGMA index_list(audit_forward_state);";
using var reader = cmd.ExecuteReader();
var indexNames = new List();
@@ -111,11 +162,12 @@ public class SqliteAuditWriterSchemaTests
indexNames.Add(reader.GetString(1));
}
- Assert.Contains("IX_SiteAuditLog_ForwardState_Occurred", indexNames);
+ Assert.Contains("IX_fwd", indexNames);
- // Verify the index columns are ForwardState, OccurredAtUtc in that order.
+ // Verify the index columns are ForwardState, IsCachedKind, OccurredAtUtc
+ // in that order.
using var infoCmd = connection.CreateCommand();
- infoCmd.CommandText = "PRAGMA index_info(IX_SiteAuditLog_ForwardState_Occurred);";
+ infoCmd.CommandText = "PRAGMA index_info(IX_fwd);";
using var infoReader = infoCmd.ExecuteReader();
var indexColumns = new List();
@@ -124,7 +176,7 @@ public class SqliteAuditWriterSchemaTests
indexColumns.Add(infoReader.GetString(2));
}
- Assert.Equal(new[] { "ForwardState", "OccurredAtUtc" }, indexColumns);
+ Assert.Equal(new[] { "ForwardState", "IsCachedKind", "OccurredAtUtc" }, indexColumns);
}
}
@@ -144,258 +196,17 @@ public class SqliteAuditWriterSchemaTests
}
}
- // ----- ExecutionId schema-upgrade regression (persistent auditlog.db) ----- //
+ // ----- C4 ephemeral in-place reset: old single-table schema is dropped ----- //
///
- /// The OLD pre-ExecutionId-branch AuditLog schema — the 20-column
- /// CREATE TABLE WITHOUT the ExecutionId column. A real deployment's
- /// on-disk auditlog.db already contains exactly this shape, and
- /// CREATE TABLE IF NOT EXISTS is a no-op against it.
+ /// The OLD pre-C4 single 24-column AuditLog table — exactly the shape a
+ /// pre-C4 deployment's on-disk auditlog.db contains. The site store is
+ /// ephemeral (≈7-day retention, recreated per deployment), so C4 RESETS in
+ /// place: the new two-table schema is created and this old table is DROP-ped.
+ /// No SQLite data migration is performed (or needed) — any rows it holds are
+ /// within the retention window and discarded.
///
- private const string OldPreExecutionIdSchema = """
- CREATE TABLE IF NOT EXISTS AuditLog (
- EventId TEXT NOT NULL,
- OccurredAtUtc TEXT NOT NULL,
- Channel TEXT NOT NULL,
- Kind TEXT NOT NULL,
- CorrelationId TEXT NULL,
- SourceSiteId TEXT NULL,
- SourceInstanceId TEXT NULL,
- SourceScript TEXT NULL,
- Actor TEXT NULL,
- Target TEXT NULL,
- Status TEXT NOT NULL,
- HttpStatus INTEGER NULL,
- DurationMs INTEGER NULL,
- ErrorMessage TEXT NULL,
- ErrorDetail TEXT NULL,
- RequestSummary TEXT NULL,
- ResponseSummary TEXT NULL,
- PayloadTruncated INTEGER NOT NULL,
- Extra TEXT NULL,
- ForwardState TEXT NOT NULL,
- PRIMARY KEY (EventId)
- );
- CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
- ON AuditLog (ForwardState, OccurredAtUtc);
- """;
-
- ///
- /// Seeds a shared-cache in-memory database with the OLD 20-column schema and
- /// returns the open connection. The connection MUST stay open for the
- /// lifetime of the test: a shared-cache in-memory database is dropped once
- /// its last connection closes, so closing this would discard the seeded
- /// schema before the writer opens its own connection.
- ///
- private static SqliteConnection SeedOldSchemaDatabase(string dataSource)
- {
- var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
- connection.Open();
- using var cmd = connection.CreateCommand();
- cmd.CommandText = OldPreExecutionIdSchema;
- cmd.ExecuteNonQuery();
- return connection;
- }
-
- private static SqliteAuditWriter CreateWriterOver(string dataSource)
- {
- var options = new SqliteAuditWriterOptions { DatabasePath = dataSource };
- return new SqliteAuditWriter(
- Options.Create(options),
- NullLogger.Instance,
- new FakeNodeIdentityProvider(),
- connectionStringOverride: $"Data Source={dataSource};Cache=Shared");
- }
-
- private static bool ColumnExists(SqliteConnection connection, string columnName)
- {
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT COUNT(*) FROM pragma_table_info('AuditLog') WHERE name = $name";
- cmd.Parameters.AddWithValue("$name", columnName);
- return Convert.ToInt32(cmd.ExecuteScalar()) > 0;
- }
-
- [Fact]
- public async Task Opening_Over_PreExisting_OldSchema_Db_Adds_ExecutionId_Column_And_WriteAsync_RoundTrips()
- {
- var dataSource = $"file:{nameof(Opening_Over_PreExisting_OldSchema_Db_Adds_ExecutionId_Column_And_WriteAsync_RoundTrips)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
-
- // A pre-branch deployment: auditlog.db already exists with the 20-column
- // schema and NO ExecutionId column.
- using var seedConnection = SeedOldSchemaDatabase(dataSource);
- Assert.False(ColumnExists(seedConnection, "ExecutionId"));
-
- // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its
- // InitializeSchema must ALTER the missing ExecutionId column in — the
- // CREATE TABLE IF NOT EXISTS alone is a no-op against the existing table.
- var executionId = Guid.NewGuid();
- await using (var writer = CreateWriterOver(dataSource))
- {
- Assert.True(
- ColumnExists(seedConnection, "ExecutionId"),
- "SqliteAuditWriter must ALTER the ExecutionId column into a pre-existing AuditLog table.");
-
- // A WriteAsync binding $ExecutionId must now succeed and round-trip;
- // without the ALTER it would fail with "no such column: ExecutionId"
- // and — because audit writes are best-effort — silently drop the row.
- var evt = ScadaBridgeAuditEventFactory.Create(
- eventId: Guid.NewGuid(),
- occurredAtUtc: DateTime.UtcNow,
- channel: AuditChannel.ApiOutbound,
- kind: AuditKind.ApiCall,
- status: AuditStatus.Delivered,
- executionId: executionId);
- await writer.WriteAsync(evt);
-
- var rows = await writer.ReadPendingAsync(limit: 10);
- var row = Assert.Single(rows);
- Assert.Equal(executionId, row.AsRow().ExecutionId);
- }
-
- // Idempotency: a second writer over the now-upgraded DB must not error
- // (the probe sees ExecutionId already present and skips the ALTER).
- await using (var writerAgain = CreateWriterOver(dataSource))
- {
- Assert.True(ColumnExists(seedConnection, "ExecutionId"));
- }
- }
-
- // ----- ParentExecutionId schema-upgrade regression (persistent auditlog.db) ----- //
-
- ///
- /// The pre-ParentExecutionId-branch AuditLog schema — the 21-column
- /// CREATE TABLE that HAS ExecutionId but is WITHOUT
- /// ParentExecutionId. A deployment that ran the ExecutionId branch
- /// already has an on-disk auditlog.db in exactly this shape, and
- /// CREATE TABLE IF NOT EXISTS is a no-op against it.
- ///
- private const string OldPreParentExecutionIdSchema = """
- CREATE TABLE IF NOT EXISTS AuditLog (
- EventId TEXT NOT NULL,
- OccurredAtUtc TEXT NOT NULL,
- Channel TEXT NOT NULL,
- Kind TEXT NOT NULL,
- CorrelationId TEXT NULL,
- SourceSiteId TEXT NULL,
- SourceInstanceId TEXT NULL,
- SourceScript TEXT NULL,
- Actor TEXT NULL,
- Target TEXT NULL,
- Status TEXT NOT NULL,
- HttpStatus INTEGER NULL,
- DurationMs INTEGER NULL,
- ErrorMessage TEXT NULL,
- ErrorDetail TEXT NULL,
- RequestSummary TEXT NULL,
- ResponseSummary TEXT NULL,
- PayloadTruncated INTEGER NOT NULL,
- Extra TEXT NULL,
- ForwardState TEXT NOT NULL,
- ExecutionId TEXT NULL,
- PRIMARY KEY (EventId)
- );
- CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
- ON AuditLog (ForwardState, OccurredAtUtc);
- """;
-
- ///
- /// Seeds a shared-cache in-memory database with the pre-ParentExecutionId
- /// 21-column schema and returns the open connection. The connection MUST
- /// stay open for the lifetime of the test — a shared-cache in-memory
- /// database is dropped once its last connection closes.
- ///
- private static SqliteConnection SeedPreParentExecutionIdSchemaDatabase(string dataSource)
- {
- var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
- connection.Open();
- using var cmd = connection.CreateCommand();
- cmd.CommandText = OldPreParentExecutionIdSchema;
- cmd.ExecuteNonQuery();
- return connection;
- }
-
- [Fact]
- public async Task Opening_Over_PreExisting_PreParentExecutionId_Db_Adds_ParentExecutionId_Column_And_WriteAsync_RoundTrips()
- {
- var dataSource = $"file:{nameof(Opening_Over_PreExisting_PreParentExecutionId_Db_Adds_ParentExecutionId_Column_And_WriteAsync_RoundTrips)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
-
- // A deployment that ran the ExecutionId branch: auditlog.db already
- // exists with the 21-column schema and NO ParentExecutionId column.
- using var seedConnection = SeedPreParentExecutionIdSchemaDatabase(dataSource);
- Assert.True(ColumnExists(seedConnection, "ExecutionId"));
- Assert.False(ColumnExists(seedConnection, "ParentExecutionId"));
-
- // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its
- // InitializeSchema must ALTER the missing ParentExecutionId column in —
- // the CREATE TABLE IF NOT EXISTS alone is a no-op against the existing
- // table.
- var executionId = Guid.NewGuid();
- var parentExecutionId = Guid.NewGuid();
- await using (var writer = CreateWriterOver(dataSource))
- {
- Assert.True(
- ColumnExists(seedConnection, "ParentExecutionId"),
- "SqliteAuditWriter must ALTER the ParentExecutionId column into a pre-existing AuditLog table.");
-
- // A WriteAsync binding $ParentExecutionId must now succeed and
- // round-trip; without the ALTER it would fail with "no such column:
- // ParentExecutionId" and — because audit writes are best-effort —
- // silently drop the row.
- var evt = ScadaBridgeAuditEventFactory.Create(
- eventId: Guid.NewGuid(),
- occurredAtUtc: DateTime.UtcNow,
- channel: AuditChannel.ApiOutbound,
- kind: AuditKind.ApiCall,
- status: AuditStatus.Delivered,
- executionId: executionId,
- parentExecutionId: parentExecutionId);
- await writer.WriteAsync(evt);
-
- var rows = await writer.ReadPendingAsync(limit: 10);
- var row = Assert.Single(rows);
- Assert.Equal(executionId, row.AsRow().ExecutionId);
- Assert.Equal(parentExecutionId, row.AsRow().ParentExecutionId);
- }
-
- // Idempotency: a second writer over the now-upgraded DB must not error
- // (the probe sees ParentExecutionId already present and skips the ALTER).
- await using (var writerAgain = CreateWriterOver(dataSource))
- {
- Assert.True(ColumnExists(seedConnection, "ParentExecutionId"));
- }
- }
-
- [Fact]
- public async Task WriteAsync_NullParentExecutionId_RoundTripsAsNull()
- {
- var (writer, _) = CreateWriter(nameof(WriteAsync_NullParentExecutionId_RoundTripsAsNull));
- await using (writer)
- {
- var evt = ScadaBridgeAuditEventFactory.Create(
- eventId: Guid.NewGuid(),
- occurredAtUtc: DateTime.UtcNow,
- channel: AuditChannel.Notification,
- kind: AuditKind.NotifySend,
- status: AuditStatus.Submitted);
- // ParentExecutionId left null (not a factory arg → defaults null)
- await writer.WriteAsync(evt);
-
- var rows = await writer.ReadPendingAsync(limit: 10);
- var row = Assert.Single(rows);
- Assert.Null(row.AsRow().ParentExecutionId);
- }
- }
-
- // ----- SourceNode schema-upgrade regression (persistent auditlog.db) ----- //
-
- ///
- /// The pre-SourceNode AuditLog schema — the 22-column CREATE TABLE
- /// that HAS ExecutionId + ParentExecutionId but is WITHOUT
- /// SourceNode. A deployment that ran the ParentExecutionId branch
- /// already has an on-disk auditlog.db in exactly this shape, and
- /// CREATE TABLE IF NOT EXISTS is a no-op against it.
- ///
- private const string OldPreSourceNodeSchema = """
+ private const string OldSingleTableSchema = """
CREATE TABLE IF NOT EXISTS AuditLog (
EventId TEXT NOT NULL,
OccurredAtUtc TEXT NOT NULL,
@@ -403,6 +214,7 @@ public class SqliteAuditWriterSchemaTests
Kind TEXT NOT NULL,
CorrelationId TEXT NULL,
SourceSiteId TEXT NULL,
+ SourceNode TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL,
Actor TEXT NULL,
@@ -426,67 +238,84 @@ public class SqliteAuditWriterSchemaTests
""";
///
- /// Seeds a shared-cache in-memory database with the pre-SourceNode 22-column
- /// schema and returns the open connection. The connection MUST stay open for
- /// the lifetime of the test — a shared-cache in-memory database is dropped
- /// once its last connection closes.
+ /// Seeds a shared-cache in-memory database with the OLD single-table schema
+ /// and returns the open connection. The connection MUST stay open for the
+ /// lifetime of the test: a shared-cache in-memory database is dropped once its
+ /// last connection closes, so closing this would discard the seeded schema
+ /// before the writer opens its own connection.
///
- private static SqliteConnection SeedPreSourceNodeSchemaDatabase(string dataSource)
+ private static SqliteConnection SeedOldSingleTableDatabase(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
using var cmd = connection.CreateCommand();
- cmd.CommandText = OldPreSourceNodeSchema;
+ cmd.CommandText = OldSingleTableSchema;
cmd.ExecuteNonQuery();
+ // Seed one row so we can prove the reset discards it (ephemeral store).
+ using var insert = connection.CreateCommand();
+ insert.CommandText = """
+ INSERT INTO AuditLog (
+ EventId, OccurredAtUtc, Channel, Kind, Status, PayloadTruncated, ForwardState
+ ) VALUES (
+ $id, '2026-05-20T12:00:00.0000000Z', 'ApiOutbound', 'ApiCall', 'Delivered', 0, 'Pending'
+ );
+ """;
+ insert.Parameters.AddWithValue("$id", Guid.NewGuid().ToString());
+ insert.ExecuteNonQuery();
return connection;
}
[Fact]
- public async Task Initialize_adds_SourceNode_to_pre_existing_schema()
+ public async Task Opening_Over_PreExisting_OldSingleTable_Db_Drops_It_And_Creates_Two_Table_Schema()
{
- var dataSource = $"file:{nameof(Initialize_adds_SourceNode_to_pre_existing_schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
+ var dataSource = $"file:{nameof(Opening_Over_PreExisting_OldSingleTable_Db_Drops_It_And_Creates_Two_Table_Schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
- // A deployment that ran the ParentExecutionId branch: auditlog.db
- // already exists with the 22-column schema and NO SourceNode column.
- using var seedConnection = SeedPreSourceNodeSchemaDatabase(dataSource);
- Assert.True(ColumnExists(seedConnection, "ExecutionId"));
- Assert.True(ColumnExists(seedConnection, "ParentExecutionId"));
- Assert.False(ColumnExists(seedConnection, "SourceNode"));
+ // A pre-C4 deployment: auditlog.db already exists with the old single
+ // 24-column AuditLog table (and a seeded row inside it).
+ using var seedConnection = SeedOldSingleTableDatabase(dataSource);
+ Assert.True(TableExists(seedConnection, "AuditLog"));
- // Upgrade: a post-branch SqliteAuditWriter opens the same database. Its
- // InitializeSchema must ALTER the missing SourceNode column in — the
- // CREATE TABLE IF NOT EXISTS alone is a no-op against the existing table.
+ // Upgrade: a C4 SqliteAuditWriter opens the same database. Its
+ // InitializeSchema RESETS in place — the old AuditLog table is dropped and
+ // the two new tables (+ IX_fwd) are created. No data is migrated.
await using (var writer = CreateWriterOver(dataSource))
{
- Assert.True(
- ColumnExists(seedConnection, "SourceNode"),
- "SqliteAuditWriter must ALTER the SourceNode column into a pre-existing AuditLog table.");
+ Assert.False(
+ TableExists(seedConnection, "AuditLog"),
+ "C4 must DROP the old single-table AuditLog on init (ephemeral in-place reset).");
+ Assert.True(TableExists(seedConnection, "audit_event"));
+ Assert.True(TableExists(seedConnection, "audit_forward_state"));
- // A WriteAsync binding $SourceNode must now succeed and round-trip;
- // without the ALTER it would fail with "no such column: SourceNode"
- // and — because audit writes are best-effort — silently drop the row.
+ // The two new tables start EMPTY — the old row was discarded, not
+ // migrated (the site store is ephemeral).
+ Assert.Empty(await writer.ReadPendingAsync(limit: 100));
+
+ // And a fresh WriteAsync round-trips through the new schema.
var evt = ScadaBridgeAuditEventFactory.Create(
eventId: Guid.NewGuid(),
occurredAtUtc: DateTime.UtcNow,
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCall,
- status: AuditStatus.Delivered,
- sourceNode: "node-a");
+ status: AuditStatus.Delivered);
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
- Assert.Equal("node-a", row.SourceNode);
+ Assert.Equal(evt.EventId, row.EventId);
}
- // Idempotency: a second writer over the now-upgraded DB must not error
- // (the probe sees SourceNode already present and skips the ALTER).
+ // Idempotency: a second writer over the now-two-table DB must not error
+ // (DROP TABLE IF EXISTS is a no-op when AuditLog is already gone, and the
+ // CREATE TABLE IF NOT EXISTS statements are no-ops too).
await using (var writerAgain = CreateWriterOver(dataSource))
{
- Assert.True(ColumnExists(seedConnection, "SourceNode"));
+ Assert.True(TableExists(seedConnection, "audit_event"));
+ Assert.True(TableExists(seedConnection, "audit_forward_state"));
}
}
+ // ----- Canonical / sidecar field persistence ----- //
+
[Fact]
public async Task WriteAsync_persists_SourceNode_field()
{
@@ -528,4 +357,31 @@ public class SqliteAuditWriterSchemaTests
Assert.Null(row.SourceNode);
}
}
+
+ [Fact]
+ public async Task WriteAsync_ExecutionId_RoundTrips_Through_DetailsJson()
+ {
+ var (writer, _) = CreateWriter(nameof(WriteAsync_ExecutionId_RoundTrips_Through_DetailsJson));
+ await using (writer)
+ {
+ var executionId = Guid.NewGuid();
+ var parentExecutionId = Guid.NewGuid();
+ var evt = ScadaBridgeAuditEventFactory.Create(
+ eventId: Guid.NewGuid(),
+ occurredAtUtc: DateTime.UtcNow,
+ channel: AuditChannel.ApiOutbound,
+ kind: AuditKind.ApiCall,
+ status: AuditStatus.Delivered,
+ executionId: executionId,
+ parentExecutionId: parentExecutionId);
+ await writer.WriteAsync(evt);
+
+ var rows = await writer.ReadPendingAsync(limit: 10);
+ var row = Assert.Single(rows);
+ // ExecutionId / ParentExecutionId ride inside DetailsJson; AsRow()
+ // decomposes them back out.
+ Assert.Equal(executionId, row.AsRow().ExecutionId);
+ Assert.Equal(parentExecutionId, row.AsRow().ParentExecutionId);
+ }
+ }
}
diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
index 07978a1f..868ab135 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
@@ -11,12 +11,13 @@ using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Site;
///
-/// Bundle B (M2-T2) hot-path tests for . Exercise
-/// the Channel-based enqueue, the background writer's batch INSERTs, duplicate-
-/// EventId swallowing, ForwardState defaulting, and the
-/// /
-/// support surface that
-/// Bundle D's telemetry actor will call.
+/// C4 (Task 2.5) hot-path + drain tests for 's
+/// two-table site schema. Exercise the Channel-based enqueue, the background
+/// writer's per-event canonical(audit_event) + sidecar
+/// (audit_forward_state) INSERTs, duplicate-EventId swallowing, the
+/// IsCachedKind drain split, the four reads, and the
+/// /
+/// sidecar flips.
///
public class SqliteAuditWriterWriteTests
{
@@ -52,10 +53,40 @@ public class SqliteAuditWriterWriteTests
return connection;
}
- // C3 (Task 2.5): build the canonical ZB.MOM.WW.Audit.AuditEvent via the shared
- // factory. The SQLite writer's transitional shim decomposes it into the 24 columns
- // (defaulting ForwardState=Pending) on INSERT and recomposes the canonical record
- // on read. ExecutionId/SourceNode ride through DetailsJson / the top-level field.
+ ///
+ /// Reads the sidecar ForwardState for one EventId (the column moved off
+ /// the single legacy table onto audit_forward_state in C4).
+ ///
+ private static string? ReadForwardState(string dataSource, Guid eventId)
+ {
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState FROM audit_forward_state WHERE EventId = $id;";
+ cmd.Parameters.AddWithValue("$id", eventId.ToString());
+ return cmd.ExecuteScalar() as string;
+ }
+
+ /// Sidecar ForwardState → row-count, grouped (replaces the legacy single-table GROUP BY).
+ private static Dictionary ForwardStateCounts(string dataSource)
+ {
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText =
+ "SELECT ForwardState, COUNT(*) FROM audit_forward_state GROUP BY ForwardState;";
+ using var reader = cmd.ExecuteReader();
+ var byState = new Dictionary();
+ while (reader.Read())
+ {
+ byState[reader.GetString(0)] = reader.GetInt64(1);
+ }
+ return byState;
+ }
+
+ // C4 (Task 2.5): build the canonical ZB.MOM.WW.Audit.AuditEvent via the shared
+ // factory. The SQLite writer stores the 10 canonical fields directly in
+ // audit_event and writes a Pending sidecar row into audit_forward_state, with
+ // IsCachedKind precomputed from the event's Kind. Reads recompose the canonical
+ // record directly from audit_event's columns.
private static AuditEvent NewEvent(
Guid? id = null,
DateTime? occurredAtUtc = null,
@@ -70,22 +101,62 @@ public class SqliteAuditWriterWriteTests
executionId: executionId,
sourceNode: sourceNode);
+ /// A cached-lifecycle event (IsCachedKind=1) — drains via the cached read surface.
+ private static AuditEvent NewCachedEvent(
+ Guid? id = null,
+ DateTime? occurredAtUtc = null,
+ AuditKind kind = AuditKind.ApiCallCached)
+ // Status is independent of IsCachedKind (which is derived from Kind);
+ // Submitted is the natural first-row status for a cached lifecycle.
+ => ScadaBridgeAuditEventFactory.Create(
+ channel: AuditChannel.ApiOutbound,
+ kind: kind,
+ status: AuditStatus.Submitted,
+ eventId: id ?? Guid.NewGuid(),
+ occurredAtUtc: occurredAtUtc ?? DateTime.UtcNow);
+
[Fact]
- public async Task WriteAsync_FreshEvent_PersistsWithForwardStatePending()
+ public async Task WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending()
{
- var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsWithForwardStatePending));
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
+ // Canonical row landed in audit_event.
using var connection = OpenVerifierConnection(dataSource);
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
- cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
- var actual = cmd.ExecuteScalar() as string;
+ using var eventCmd = connection.CreateCommand();
+ eventCmd.CommandText = "SELECT Action FROM audit_event WHERE EventId = $id;";
+ eventCmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
+ Assert.Equal(evt.Action, eventCmd.ExecuteScalar() as string);
- Assert.Equal(AuditForwardState.Pending.ToString(), actual);
+ // Sidecar row landed Pending.
+ Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId));
+ }
+
+ [Fact]
+ public async Task WriteAsync_Roundtrips_Canonical_Fields_Through_Read()
+ {
+ var (writer, _) = CreateWriter(nameof(WriteAsync_Roundtrips_Canonical_Fields_Through_Read));
+ await using var _w = writer;
+
+ var evt = NewEvent() with { Target = "target-1", Actor = "user-1" };
+ await writer.WriteAsync(evt);
+
+ var rows = await writer.ReadPendingAsync(limit: 10);
+ var row = Assert.Single(rows);
+
+ Assert.Equal(evt.EventId, row.EventId);
+ Assert.Equal(evt.OccurredAtUtc, row.OccurredAtUtc);
+ Assert.Equal("user-1", row.Actor);
+ Assert.Equal(evt.Action, row.Action);
+ Assert.Equal(evt.Outcome, row.Outcome);
+ Assert.Equal(evt.Category, row.Category);
+ Assert.Equal("target-1", row.Target);
+ Assert.Equal(evt.CorrelationId, row.CorrelationId);
+ // DetailsJson is stored verbatim and round-trips byte-for-byte.
+ Assert.Equal(evt.DetailsJson, row.DetailsJson);
}
[Fact]
@@ -100,11 +171,14 @@ public class SqliteAuditWriterWriteTests
async (evt, ct) => await writer.WriteAsync(evt, ct));
using var connection = OpenVerifierConnection(dataSource);
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT COUNT(*) FROM AuditLog;";
- var count = Convert.ToInt64(cmd.ExecuteScalar());
+ using var eventCmd = connection.CreateCommand();
+ eventCmd.CommandText = "SELECT COUNT(*) FROM audit_event;";
+ Assert.Equal(1000, Convert.ToInt64(eventCmd.ExecuteScalar()));
- Assert.Equal(1000, count);
+ // Every canonical row has its matching sidecar row.
+ using var sidecarCmd = connection.CreateCommand();
+ sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state;";
+ Assert.Equal(1000, Convert.ToInt64(sidecarCmd.ExecuteScalar()));
}
[Fact]
@@ -122,36 +196,98 @@ public class SqliteAuditWriterWriteTests
using var connection = OpenVerifierConnection(dataSource);
using var countCmd = connection.CreateCommand();
- countCmd.CommandText = "SELECT COUNT(*) FROM AuditLog WHERE EventId = $id;";
+ countCmd.CommandText = "SELECT COUNT(*) FROM audit_event WHERE EventId = $id;";
countCmd.Parameters.AddWithValue("$id", sharedId.ToString());
- var count = Convert.ToInt64(countCmd.ExecuteScalar());
+ Assert.Equal(1, Convert.ToInt64(countCmd.ExecuteScalar()));
- Assert.Equal(1, count);
+ // The sidecar likewise gained exactly one row (the canonical PK throws
+ // before the sidecar insert runs, so neither table double-inserts).
+ using var sidecarCmd = connection.CreateCommand();
+ sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state WHERE EventId = $id;";
+ sidecarCmd.Parameters.AddWithValue("$id", sharedId.ToString());
+ Assert.Equal(1, Convert.ToInt64(sidecarCmd.ExecuteScalar()));
using var targetCmd = connection.CreateCommand();
- targetCmd.CommandText = "SELECT Target FROM AuditLog WHERE EventId = $id;";
+ targetCmd.CommandText = "SELECT Target FROM audit_event WHERE EventId = $id;";
targetCmd.Parameters.AddWithValue("$id", sharedId.ToString());
Assert.Equal("first", targetCmd.ExecuteScalar() as string);
}
[Fact]
- public async Task WriteAsync_ForcesForwardStatePending_IfNull()
+ public async Task WriteAsync_ForcesSidecarForwardStatePending()
{
- var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesForwardStatePending_IfNull));
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesSidecarForwardStatePending));
await using var _ = writer;
- // C3 (Task 2.5): ForwardState is no longer a field on the canonical record;
- // a fresh canonical event carries none, and the SQLite shim defaults it to
- // Pending on INSERT — exactly the behaviour this test pins.
+ // C4 (Task 2.5): ForwardState is not a field on the canonical record; a
+ // fresh event's sidecar row defaults to Pending on INSERT.
var evt = NewEvent();
await writer.WriteAsync(evt);
+ Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId));
+ }
+
+ // ----- IsCachedKind drain split (precomputed at insert) ----- //
+
+ [Fact]
+ public async Task WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0));
+ await using var _ = writer;
+
+ var cached = NewCachedEvent(); // ApiCallCached → cached
+ var nonCached = NewEvent(); // ApiCall → not cached
+ await writer.WriteAsync(cached);
+ await writer.WriteAsync(nonCached);
+
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
- cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
+ cmd.CommandText = "SELECT IsCachedKind FROM audit_forward_state WHERE EventId = $id;";
+ var p = cmd.Parameters.Add("$id", SqliteType.Text);
- Assert.Equal(AuditForwardState.Pending.ToString(), cmd.ExecuteScalar() as string);
+ p.Value = cached.EventId.ToString();
+ Assert.Equal(1L, Convert.ToInt64(cmd.ExecuteScalar()));
+
+ p.Value = nonCached.EventId.ToString();
+ Assert.Equal(0L, Convert.ToInt64(cmd.ExecuteScalar()));
+ }
+
+ [Theory]
+ [InlineData(AuditKind.CachedSubmit)]
+ [InlineData(AuditKind.ApiCallCached)]
+ [InlineData(AuditKind.DbWriteCached)]
+ [InlineData(AuditKind.CachedResolve)]
+ public async Task CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending(AuditKind kind)
+ {
+ var (writer, _) = CreateWriter($"{nameof(CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending)}-{kind}");
+ await using var _w = writer;
+
+ var cached = NewCachedEvent(kind: kind);
+ await writer.WriteAsync(cached);
+
+ // The cached kind appears in the cached read surface...
+ var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10);
+ Assert.Single(cachedRows, r => r.EventId == cached.EventId);
+
+ // ...and NOT in the audit-only read surface.
+ var pendingRows = await writer.ReadPendingAsync(limit: 10);
+ Assert.DoesNotContain(pendingRows, r => r.EventId == cached.EventId);
+ }
+
+ [Fact]
+ public async Task NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry()
+ {
+ var (writer, _) = CreateWriter(nameof(NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry));
+ await using var _w = writer;
+
+ var nonCached = NewEvent(); // ApiCall — not a cached kind
+ await writer.WriteAsync(nonCached);
+
+ var pendingRows = await writer.ReadPendingAsync(limit: 10);
+ Assert.Single(pendingRows, r => r.EventId == nonCached.EventId);
+
+ var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10);
+ Assert.DoesNotContain(cachedRows, r => r.EventId == nonCached.EventId);
}
[Fact]
@@ -184,9 +320,9 @@ public class SqliteAuditWriterWriteTests
}
[Fact]
- public async Task MarkForwardedAsync_FlipsRowsToForwarded()
+ public async Task MarkForwardedAsync_FlipsSidecarRowsToForwarded()
{
- var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsRowsToForwarded));
+ var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsSidecarRowsToForwarded));
await using var _ = writer;
var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };
@@ -197,20 +333,33 @@ public class SqliteAuditWriterWriteTests
await writer.MarkForwardedAsync(ids);
- using var connection = OpenVerifierConnection(dataSource);
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
- using var reader = cmd.ExecuteReader();
- var byState = new Dictionary();
- while (reader.Read())
- {
- byState[reader.GetString(0)] = reader.GetInt64(1);
- }
-
+ var byState = ForwardStateCounts(dataSource);
Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
}
+ [Fact]
+ public async Task MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc));
+ await using var _ = writer;
+
+ var evt = NewEvent();
+ await writer.WriteAsync(evt);
+
+ await writer.MarkForwardedAsync(new[] { evt.EventId });
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText =
+ "SELECT AttemptCount, LastAttemptUtc FROM audit_forward_state WHERE EventId = $id;";
+ cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
+ using var reader = cmd.ExecuteReader();
+ Assert.True(reader.Read());
+ Assert.Equal(1, reader.GetInt32(0)); // AttemptCount bumped 0 → 1
+ Assert.False(reader.IsDBNull(1)); // LastAttemptUtc stamped
+ }
+
[Fact]
public async Task MarkForwardedAsync_NonExistentId_NoThrow()
{
@@ -223,6 +372,23 @@ public class SqliteAuditWriterWriteTests
// No assertion needed: the call must complete without throwing.
}
+ [Fact]
+ public async Task ReadForwardedAsync_Returns_Only_Forwarded_Rows()
+ {
+ var (writer, _) = CreateWriter(nameof(ReadForwardedAsync_Returns_Only_Forwarded_Rows));
+ await using var _w = writer;
+
+ var forwarded = NewEvent();
+ var pending = NewEvent();
+ await writer.WriteAsync(forwarded);
+ await writer.WriteAsync(pending);
+ await writer.MarkForwardedAsync(new[] { forwarded.EventId });
+
+ var rows = await writer.ReadForwardedAsync(limit: 10);
+ var row = Assert.Single(rows);
+ Assert.Equal(forwarded.EventId, row.EventId);
+ }
+
// ----- M6 reconciliation pull surface ----- //
[Fact]
@@ -327,16 +493,7 @@ public class SqliteAuditWriterWriteTests
await writer.MarkReconciledAsync(new[] { a.EventId, b.EventId, c.EventId });
- using var connection = OpenVerifierConnection(dataSource);
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
- using var reader = cmd.ExecuteReader();
- var byState = new Dictionary();
- while (reader.Read())
- {
- byState[reader.GetString(0)] = reader.GetInt64(1);
- }
-
+ var byState = ForwardStateCounts(dataSource);
Assert.Equal(3, byState[AuditForwardState.Reconciled.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
Assert.False(byState.ContainsKey(AuditForwardState.Forwarded.ToString()));
@@ -354,12 +511,7 @@ public class SqliteAuditWriterWriteTests
// Re-call must not throw and must leave the single row Reconciled.
await writer.MarkReconciledAsync(new[] { a.EventId });
- using var connection = OpenVerifierConnection(dataSource);
- using var cmd = connection.CreateCommand();
- cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
- cmd.Parameters.AddWithValue("$id", a.EventId.ToString());
-
- Assert.Equal(AuditForwardState.Reconciled.ToString(), cmd.ExecuteScalar() as string);
+ Assert.Equal(AuditForwardState.Reconciled.ToString(), ReadForwardState(dataSource, a.EventId));
}
[Fact]
@@ -372,12 +524,12 @@ public class SqliteAuditWriterWriteTests
// Completes without throwing.
}
- // ----- ExecutionId column (universal per-run correlation value) ----- //
+ // ----- ExecutionId (rides DetailsJson, recomposed via AsRow) ----- //
[Fact]
- public async Task WriteAsync_NonNullExecutionId_RoundTripsThroughMapRow()
+ public async Task WriteAsync_NonNullExecutionId_RoundTrips()
{
- var (writer, _) = CreateWriter(nameof(WriteAsync_NonNullExecutionId_RoundTripsThroughMapRow));
+ var (writer, _) = CreateWriter(nameof(WriteAsync_NonNullExecutionId_RoundTrips));
await using var _w = writer;
var executionId = Guid.NewGuid();
@@ -458,46 +610,40 @@ public class SqliteAuditWriterWriteTests
Assert.Null(row.SourceNode);
}
- // ----- C3 hardening: safe enum-parse in MapRow ----- //
+ // ----- C4 hardening: safe enum-parse in MapRow ----- //
///
- /// C3 hardening (Task 2.5): a row whose Channel/Kind/Status columns hold
- /// an unknown/renamed enum string must NOT fault the read path; it degrades
- /// gracefully to the same fallbacks used by AuditRowProjection.Decompose
- /// (ApiInbound / InboundRequest / Submitted). The read is exercised via the
- /// public surface which calls
- /// the private MapRow.
+ /// C4 hardening (Task 2.5): a row whose stored Outcome column holds an
+ /// unknown/renamed enum string must NOT fault the read path; it degrades
+ /// gracefully to (the safe
+ /// fallback). The read is
+ /// exercised via the public
+ /// surface which calls the private MapRow.
///
[Fact]
- public async Task ReadPendingAsync_UnknownEnumStrings_DoNotThrow_YieldFallbackValues()
+ public async Task ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback()
{
var (writer, dataSource) = CreateWriter(
- nameof(ReadPendingAsync_UnknownEnumStrings_DoNotThrow_YieldFallbackValues));
+ nameof(ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
- // Tamper: overwrite the three enum columns with unknown strings that are
- // not declared AuditChannel/AuditKind/AuditStatus member names.
+ // Tamper: overwrite the canonical Outcome column with a string that is not
+ // a declared AuditOutcome member name.
using (var conn = OpenVerifierConnection(dataSource))
using (var cmd = conn.CreateCommand())
{
- cmd.CommandText =
- "UPDATE AuditLog SET Channel = 'ObsoleteChannelV0', " +
- "Kind = 'LegacyKindName', Status = 'RenamedStatus99' " +
- "WHERE EventId = $id;";
+ cmd.CommandText = "UPDATE audit_event SET Outcome = 'RenamedOutcome99' WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
cmd.ExecuteNonQuery();
}
- // Must not throw (previously would throw ArgumentException from Enum.Parse).
+ // Must not throw (a raw Enum.Parse would throw ArgumentException).
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
- var typedRow = row.AsRow();
- Assert.Equal(AuditChannel.ApiInbound, typedRow.Channel);
- Assert.Equal(AuditKind.InboundRequest, typedRow.Kind);
- Assert.Equal(AuditStatus.Submitted, typedRow.Status);
+ Assert.Equal(AuditOutcome.Success, row.Outcome);
}
}