diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs index 13af1da4..1885771f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Site/SqliteAuditWriter.cs @@ -113,6 +113,17 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable _readConnection = new SqliteConnection(connectionString); _readConnection.Open(); + // PRAGMA foreign_keys is a per-connection setting. Set it on the read + // connection as well so that any future read-path change (e.g. a + // DELETE that may be added later) also benefits from FK enforcement. + // Pure SELECT queries are unaffected — this is defensive belt-and- + // suspenders for the read connection. + using (var pragmaCmd = _readConnection.CreateCommand()) + { + pragmaCmd.CommandText = "PRAGMA foreign_keys = ON"; + pragmaCmd.ExecuteNonQuery(); + } + _writeQueue = Channel.CreateBounded( new BoundedChannelOptions(_options.ChannelCapacity) { @@ -156,6 +167,24 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable pragmaCmd.ExecuteNonQuery(); } + // Enable FK enforcement on the WRITE connection. PRAGMA foreign_keys is + // a per-connection, per-session setting in SQLite — it is NOT persisted + // in the database file, so every new connection that may INSERT into + // audit_forward_state must set it for the FK + // audit_forward_state.EventId → audit_event.EventId + // to be a real runtime guard rather than decorative DDL. The write + // connection owns all INSERTs (and the MarkForwardedAsync / + // MarkReconciledAsync UPDATEs), so setting it here — after WAL is + // established, before the CREATE TABLEs — ensures the FK is live for + // every insert that follows. The existing insert order (audit_event + // first, then audit_forward_state, inside the same transaction) already + // satisfies the FK, so no pre-existing rows can violate the constraint. + using (var pragmaCmd = _connection.CreateCommand()) + { + pragmaCmd.CommandText = "PRAGMA foreign_keys = ON"; + pragmaCmd.ExecuteNonQuery(); + } + // C4 (Task 2.5) — in-place reset. The site store is EPHEMERAL (≈7-day // retention, recreated per deployment), so we do NOT migrate the old // single 24-column AuditLog table to the new two-table shape: any rows @@ -584,10 +613,19 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // took to forward. Build a single IN (...) parameter list so we issue // one UPDATE per batch regardless of size. Each id is bound as its own // parameter, so no string concatenation of user data ever enters the SQL. + // + // Defensive state guard: only transition rows that are still Pending or + // Forwarded (i.e. not yet Reconciled). Without this guard a mis-called + // batch that includes a Reconciled EventId would silently demote it back + // to Forwarded — a state regression that would cause duplicate central + // ingestion. Symmetric with MarkReconciledAsync's + // WHERE ForwardState IN ($pending, $forwarded) + // guard. Current callers only pass Pending IDs, so normal-path behaviour + // is unchanged; the guard is purely defensive. var sb = new System.Text.StringBuilder(); sb.Append("UPDATE audit_forward_state SET ForwardState = $forwarded, ") .Append("AttemptCount = AttemptCount + 1, LastAttemptUtc = $now ") - .Append("WHERE EventId IN ("); + .Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN ("); for (int i = 0; i < eventIds.Count; i++) { if (i > 0) sb.Append(','); @@ -598,6 +636,7 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable sb.Append(");"); cmd.CommandText = sb.ToString(); cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString()); + cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); cmd.Parameters.AddWithValue("$now", DateTime.UtcNow.ToString( "o", System.Globalization.CultureInfo.InvariantCulture)); diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs index 868ab135..b5c6c227 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs @@ -524,6 +524,72 @@ public class SqliteAuditWriterWriteTests // Completes without throwing. } + /// + /// Fix 2 / M1 state guard: + /// must NOT demote a row back to + /// . When a batch contains both a + /// Pending ID and an already-Reconciled ID: + /// + /// the Pending row transitions to Forwarded (normal path) + /// the Reconciled row stays Reconciled (AttemptCount unchanged) + /// + /// This mirrors the idempotency guard already present on + /// . + /// + [Fact] + public async Task MarkForwardedAsync_DoesNotDemoteReconciledRow_WhilePendingStillTransitions() + { + var (writer, dataSource) = CreateWriter( + nameof(MarkForwardedAsync_DoesNotDemoteReconciledRow_WhilePendingStillTransitions)); + await using var _ = writer; + + var pending = NewEvent(); + var reconciled = NewEvent(); + + await writer.WriteAsync(pending); + await writer.WriteAsync(reconciled); + + // Advance reconciled through Forwarded → Reconciled so its AttemptCount = 1. + await writer.MarkForwardedAsync(new[] { reconciled.EventId }); + await writer.MarkReconciledAsync(new[] { reconciled.EventId }); + + // Verify the reconciled row's AttemptCount is 1 before the test call. + using var conn = OpenVerifierConnection(dataSource); + long reconciledAttemptBefore; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = + "SELECT AttemptCount FROM audit_forward_state WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", reconciled.EventId.ToString()); + reconciledAttemptBefore = Convert.ToInt64(cmd.ExecuteScalar()); + } + Assert.Equal(1L, reconciledAttemptBefore); + + // Now call MarkForwardedAsync with BOTH IDs in the same batch. + await writer.MarkForwardedAsync(new[] { pending.EventId, reconciled.EventId }); + + // The Pending row must have transitioned to Forwarded. + Assert.Equal( + AuditForwardState.Forwarded.ToString(), + ReadForwardState(dataSource, pending.EventId)); + + // The Reconciled row must remain Reconciled — the state guard must have + // excluded it from the UPDATE. + Assert.Equal( + AuditForwardState.Reconciled.ToString(), + ReadForwardState(dataSource, reconciled.EventId)); + + // AttemptCount on the Reconciled row must be unchanged (still 1, not 2). + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = + "SELECT AttemptCount FROM audit_forward_state WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", reconciled.EventId.ToString()); + var attemptAfter = Convert.ToInt64(cmd.ExecuteScalar()); + Assert.Equal(reconciledAttemptBefore, attemptAfter); + } + } + // ----- ExecutionId (rides DetailsJson, recomposed via AsRow) ----- // [Fact]