From e4d902753bbdae824c67f04c3c7f04821d07ce7e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 15:54:54 -0400 Subject: [PATCH] feat(siteruntime): emit DbOutbound.DbWrite on sync Database.Execute*/ExecuteReader (#23 M4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audit Log #23 — M4 Bundle A (Tasks A1+A2): every script-initiated synchronous DB call routed through Database.Connection(name) now emits exactly one DbOutbound/DbWrite audit row. Implementation — three thin ADO.NET decorators in src/ScadaLink.SiteRuntime/Scripts/: - AuditingDbConnection: wraps the gateway-returned DbConnection so CreateDbCommand() hands the script an AuditingDbCommand. All other ADO.NET surface forwards unchanged. - AuditingDbCommand: intercepts ExecuteNonQuery / ExecuteScalar / ExecuteReader (sync + async). On terminal: Channel = DbOutbound, Kind = DbWrite, Status = Delivered|Failed, Extra = {"op":"write","rowsAffected":N} (Execute*), {"op":"read","rowsReturned":N} (ExecuteReader), RequestSummary = JSON of SQL + parameter values (default capture; redaction in M5), Target = ".", DurationMs captured via Stopwatch, Provenance from ScriptRuntimeContext (SourceSiteId, SourceInstanceId, SourceScript). - AuditingDbDataReader: counts rows on Read/ReadAsync and fires the audit emission exactly once on Close/CloseAsync/Dispose. DatabaseHelper now takes an IAuditWriter; ScriptRuntimeContext.Database threads through _auditWriter. When the writer is null (tests / minimal hosts) Connection() returns the raw inner DbConnection unchanged. Best-effort emission (alog.md §7): mirrors M2 Bundle F's 3-layer fail-safe — build, write, continuation. Audit-build, audit-write, and audit-continuation faults are logged + swallowed; the original ADO.NET result (or original exception) flows back to the script untouched. The SiteAuditWriteFailures counter increments automatically through the existing FallbackAuditWriter (Bundle G). Tests — tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseSyncEmissionTests.cs (7 new, all passing): 1. Execute / INSERT success — one DbWrite row, op=write, rowsAffected=1. 2. ExecuteScalar success — one DbWrite row, op=write. 3. Execute throws — Status=Failed, ErrorMessage + ErrorDetail set. 4. ExecuteReader success — op=read, rowsReturned counts rows pulled. 5. AuditWriter throws — original ADO.NET rowsAffected returned, no events captured, no exception propagates. 6. Provenance populated from context. 7. DurationMs recorded non-zero. Tests use Microsoft.Data.Sqlite in-memory (already transitively available via SiteRuntime). Total SiteRuntime test suite: 251 passing (244 baseline + 7 new). Full solution test suite passes. --- .../Scripts/AuditingDbCommand.cs | 522 ++++++++++++++++++ .../Scripts/AuditingDbConnection.cs | 116 ++++ .../Scripts/AuditingDbDataReader.cs | 157 ++++++ .../Scripts/ScriptRuntimeContext.cs | 48 +- .../Scripts/DatabaseSyncEmissionTests.cs | 296 ++++++++++ 5 files changed, 1137 insertions(+), 2 deletions(-) create mode 100644 src/ScadaLink.SiteRuntime/Scripts/AuditingDbCommand.cs create mode 100644 src/ScadaLink.SiteRuntime/Scripts/AuditingDbConnection.cs create mode 100644 src/ScadaLink.SiteRuntime/Scripts/AuditingDbDataReader.cs create mode 100644 tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseSyncEmissionTests.cs diff --git a/src/ScadaLink.SiteRuntime/Scripts/AuditingDbCommand.cs b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbCommand.cs new file mode 100644 index 0000000..2936e55 --- /dev/null +++ b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbCommand.cs @@ -0,0 +1,522 @@ +using System.Data; +using System.Data.Common; +using System.Diagnostics; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.SiteRuntime.Scripts; + +/// +/// Audit Log #23 — M4 Bundle A: decorator that emits +/// exactly one DbOutbound/DbWrite audit event per execution. +/// +/// +/// +/// Vocabulary lock (M4 plan): both writes (Execute / ExecuteScalar) and +/// reads (ExecuteReader) emit on the +/// channel. The Extra JSON column +/// distinguishes them — {"op":"write","rowsAffected":N} for writes, +/// {"op":"read","rowsReturned":N} for reads. +/// +/// +/// Best-effort emission (alog.md §7): mirrors +/// 's 3-layer fail-safe. +/// The original ADO.NET result (or original exception) flows back to the +/// script untouched; audit-build, audit-write, and audit-continuation faults +/// are all logged + swallowed. A faulted never +/// aborts the SQL call. +/// +/// +internal sealed class AuditingDbCommand : DbCommand +{ + private readonly DbCommand _inner; + private readonly IAuditWriter _auditWriter; + private readonly string _connectionName; + private readonly string _siteId; + private readonly string _instanceName; + private readonly string? _sourceScript; + private readonly ILogger _logger; + private DbConnection? _wrappingConnection; + + public AuditingDbCommand( + DbCommand inner, + IAuditWriter auditWriter, + string connectionName, + string siteId, + string instanceName, + string? sourceScript, + ILogger logger) + { + _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); + _connectionName = connectionName ?? throw new ArgumentNullException(nameof(connectionName)); + _siteId = siteId ?? string.Empty; + _instanceName = instanceName ?? string.Empty; + _sourceScript = sourceScript; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + // -- Forwarded surface ------------------------------------------------ + +#pragma warning disable CS8765 // ADO.NET base members carry pre-NRT signatures with permissive nullability + public override string CommandText + { + get => _inner.CommandText; + set => _inner.CommandText = value; + } +#pragma warning restore CS8765 + + public override int CommandTimeout + { + get => _inner.CommandTimeout; + set => _inner.CommandTimeout = value; + } + + public override CommandType CommandType + { + get => _inner.CommandType; + set => _inner.CommandType = value; + } + + public override bool DesignTimeVisible + { + get => _inner.DesignTimeVisible; + set => _inner.DesignTimeVisible = value; + } + + public override UpdateRowSource UpdatedRowSource + { + get => _inner.UpdatedRowSource; + set => _inner.UpdatedRowSource = value; + } + + protected override DbConnection? DbConnection + { + // When the script has wrapped the connection (the normal path through + // ScriptRuntimeContext.DatabaseHelper.Connection) we keep returning + // the wrapper, but writes from the user go through to the inner + // command so the underlying provider keeps its wiring intact. + get => _wrappingConnection ?? _inner.Connection; + set + { + _wrappingConnection = value; + _inner.Connection = value switch + { + AuditingDbConnection auditing => auditing.GetType() + .GetField("_inner", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + !.GetValue(auditing) as DbConnection, + _ => value + }; + } + } + + protected override DbParameterCollection DbParameterCollection => _inner.Parameters; + + protected override DbTransaction? DbTransaction + { + get => _inner.Transaction; + set => _inner.Transaction = value; + } + + public override void Cancel() => _inner.Cancel(); + + public override void Prepare() => _inner.Prepare(); + + protected override DbParameter CreateDbParameter() => _inner.CreateParameter(); + + // -- Audited execution surface --------------------------------------- + + public override int ExecuteNonQuery() + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + int rows = 0; + Exception? thrown = null; + try + { + rows = _inner.ExecuteNonQuery(); + return rows; + } + catch (Exception ex) + { + thrown = ex; + throw; + } + finally + { + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "write", + rowsAffected: thrown == null ? rows : (int?)null, + rowsReturned: null, + thrown); + } + } + + public override object? ExecuteScalar() + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + object? scalar = null; + Exception? thrown = null; + try + { + scalar = _inner.ExecuteScalar(); + return scalar; + } + catch (Exception ex) + { + thrown = ex; + throw; + } + finally + { + // ExecuteScalar is classified as "write" per the M4 vocabulary + // lock — it's a single-value execution; rowsAffected mirrors the + // inner command's value if exposed (DbCommand has no RecordsAffected + // property, so we report -1 when the provider didn't surface it). + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "write", + rowsAffected: thrown == null ? -1 : (int?)null, + rowsReturned: null, + thrown); + } + } + + public override async Task ExecuteNonQueryAsync(CancellationToken cancellationToken) + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + int rows = 0; + Exception? thrown = null; + try + { + rows = await _inner.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + return rows; + } + catch (Exception ex) + { + thrown = ex; + throw; + } + finally + { + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "write", + rowsAffected: thrown == null ? rows : (int?)null, + rowsReturned: null, + thrown); + } + } + + public override async Task ExecuteScalarAsync(CancellationToken cancellationToken) + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + object? scalar = null; + Exception? thrown = null; + try + { + scalar = await _inner.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); + return scalar; + } + catch (Exception ex) + { + thrown = ex; + throw; + } + finally + { + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "write", + rowsAffected: thrown == null ? -1 : (int?)null, + rowsReturned: null, + thrown); + } + } + + protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + DbDataReader? reader = null; + Exception? thrown = null; + try + { + reader = _inner.ExecuteReader(behavior); + return new AuditingDbDataReader( + reader, + onClose: rows => EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "read", + rowsAffected: null, + rowsReturned: rows, + thrown: null)); + } + catch (Exception ex) + { + thrown = ex; + // Emit the failure row immediately — no reader to wait on. + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "read", + rowsAffected: null, + rowsReturned: null, + thrown); + throw; + } + } + + protected override async Task ExecuteDbDataReaderAsync( + CommandBehavior behavior, CancellationToken cancellationToken) + { + var occurredAtUtc = DateTime.UtcNow; + var startTicks = Stopwatch.GetTimestamp(); + DbDataReader? reader = null; + Exception? thrown = null; + try + { + reader = await _inner.ExecuteReaderAsync(behavior, cancellationToken).ConfigureAwait(false); + return new AuditingDbDataReader( + reader, + onClose: rows => EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "read", + rowsAffected: null, + rowsReturned: rows, + thrown: null)); + } + catch (Exception ex) + { + thrown = ex; + EmitAudit( + occurredAtUtc, + ElapsedMs(startTicks), + op: "read", + rowsAffected: null, + rowsReturned: null, + thrown); + throw; + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + base.Dispose(disposing); + } + + // -- Emission --------------------------------------------------------- + + private static int ElapsedMs(long startTicks) => + (int)((Stopwatch.GetTimestamp() - startTicks) * 1000d / Stopwatch.Frequency); + + /// + /// Best-effort emission of one DbOutbound/DbWrite audit row. + /// Mirrors the M2 Bundle F EmitCallAudit 3-layer fail-safe pattern. + /// + private void EmitAudit( + DateTime occurredAtUtc, + int durationMs, + string op, + int? rowsAffected, + int? rowsReturned, + Exception? thrown) + { + AuditEvent evt; + try + { + evt = BuildAuditEvent(occurredAtUtc, durationMs, op, rowsAffected, rowsReturned, thrown); + } + catch (Exception buildEx) + { + // Defensive: building the event from already-validated fields + // shouldn't throw, but the alog.md §7 contract requires we never + // propagate to the user-facing action regardless. + _logger.LogWarning(buildEx, + "Failed to build Audit Log #23 event for {Connection} (op={Op}) — skipping emission", + _connectionName, op); + return; + } + + try + { + var writeTask = _auditWriter.WriteAsync(evt, CancellationToken.None); + if (!writeTask.IsCompleted) + { + writeTask.ContinueWith( + t => _logger.LogWarning(t.Exception, + "Audit Log #23 write failed for EventId {EventId} ({Connection} op={Op})", + evt.EventId, _connectionName, op), + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + else if (writeTask.IsFaulted) + { + _logger.LogWarning(writeTask.Exception, + "Audit Log #23 write failed for EventId {EventId} ({Connection} op={Op})", + evt.EventId, _connectionName, op); + } + } + catch (Exception writeEx) + { + // Synchronous throw from WriteAsync before its own try/catch. + // Swallow + log per alog.md §7. + _logger.LogWarning(writeEx, + "Audit Log #23 write threw synchronously for EventId {EventId} ({Connection} op={Op})", + evt.EventId, _connectionName, op); + } + } + + private AuditEvent BuildAuditEvent( + DateTime occurredAtUtc, + int durationMs, + string op, + int? rowsAffected, + int? rowsReturned, + Exception? thrown) + { + var status = thrown == null ? AuditStatus.Delivered : AuditStatus.Failed; + + // Target = "." so the audit + // row carries a human-recognisable handle without dragging the full + // (potentially huge) statement into the index column. The full + // statement + parameter values live in RequestSummary. + string target = _connectionName; + var sqlSnippet = _inner.CommandText ?? string.Empty; + if (sqlSnippet.Length > 0) + { + var snippet = sqlSnippet.Length > 60 + ? sqlSnippet[..60] + : sqlSnippet; + target = $"{_connectionName}.{snippet}"; + } + + // RequestSummary captures the SQL statement + parameter values by + // default per the alog.md M4 acceptance criteria (M5 will add + // per-connection redaction opt-in). + string? requestSummary = BuildRequestSummary(); + + // Extra carries the op discriminator + row count per the vocabulary + // lock. Build as a small hand-rolled JSON object to avoid pulling + // in System.Text.Json on the hot path. + string extra = op == "write" + ? $"{{\"op\":\"write\",\"rowsAffected\":{(rowsAffected ?? 0)}}}" + : $"{{\"op\":\"read\",\"rowsReturned\":{(rowsReturned ?? 0)}}}"; + + return new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), + Channel = AuditChannel.DbOutbound, + Kind = AuditKind.DbWrite, + CorrelationId = null, + SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId, + SourceInstanceId = _instanceName, + SourceScript = _sourceScript, + Actor = null, + Target = target, + Status = status, + HttpStatus = null, + DurationMs = durationMs, + ErrorMessage = thrown?.Message, + ErrorDetail = thrown?.ToString(), + RequestSummary = requestSummary, + ResponseSummary = null, + PayloadTruncated = false, + Extra = extra, + ForwardState = AuditForwardState.Pending, + }; + } + + /// + /// Compose a JSON request summary capturing the SQL statement and + /// parameter values. Parameter values are captured by default per the + /// M4 acceptance criteria — redaction is opt-in and deferred to M5. + /// + private string? BuildRequestSummary() + { + var sql = _inner.CommandText; + if (string.IsNullOrEmpty(sql)) + { + return null; + } + + // Hand-roll the JSON so we don't pull in System.Text.Json for a + // shape this small. Values are stringified with ToString() — fully + // structured serialisation arrives with the redaction work in M5. + var sb = new System.Text.StringBuilder(sql.Length + 64); + sb.Append("{\"sql\":"); + AppendJsonString(sb, sql); + + if (_inner.Parameters.Count > 0) + { + sb.Append(",\"parameters\":{"); + var first = true; + foreach (DbParameter p in _inner.Parameters) + { + if (!first) sb.Append(','); + first = false; + AppendJsonString(sb, p.ParameterName); + sb.Append(':'); + if (p.Value is null || p.Value is DBNull) + { + sb.Append("null"); + } + else + { + AppendJsonString(sb, p.Value.ToString() ?? string.Empty); + } + } + sb.Append('}'); + } + + sb.Append('}'); + return sb.ToString(); + } + + private static void AppendJsonString(System.Text.StringBuilder sb, string value) + { + sb.Append('"'); + foreach (var ch in value) + { + switch (ch) + { + case '"': sb.Append("\\\""); break; + case '\\': sb.Append("\\\\"); break; + case '\b': sb.Append("\\b"); break; + case '\f': sb.Append("\\f"); break; + case '\n': sb.Append("\\n"); break; + case '\r': sb.Append("\\r"); break; + case '\t': sb.Append("\\t"); break; + default: + if (ch < 0x20) + { + sb.Append("\\u").Append(((int)ch).ToString("x4")); + } + else + { + sb.Append(ch); + } + break; + } + } + sb.Append('"'); + } +} diff --git a/src/ScadaLink.SiteRuntime/Scripts/AuditingDbConnection.cs b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbConnection.cs new file mode 100644 index 0000000..a1e6121 --- /dev/null +++ b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbConnection.cs @@ -0,0 +1,116 @@ +using System.Data; +using System.Data.Common; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Interfaces.Services; + +namespace ScadaLink.SiteRuntime.Scripts; + +/// +/// Audit Log #23 — M4 Bundle A: thin decorator over the +/// returned by +/// . The decorator +/// itself does no audit work — it simply intercepts +/// so the handed back to +/// the script is wrapped in an that emits one +/// DbOutbound/DbWrite audit row per execution. +/// +/// +/// +/// All other members forward to the inner connection +/// unchanged so the script keeps full ADO.NET semantics (transactions, state +/// transitions, server-version queries, etc.). Disposing the wrapper disposes +/// the inner connection — the caller is still responsible for disposal per +/// the contract. +/// +/// +/// The audit-write failure contract (alog.md §7) is honoured at the +/// layer — see that class for the 3-layer +/// fail-safe pattern (build, write, observe). +/// +/// +internal sealed class AuditingDbConnection : DbConnection +{ + private readonly DbConnection _inner; + private readonly IAuditWriter _auditWriter; + private readonly string _connectionName; + private readonly string _siteId; + private readonly string _instanceName; + private readonly string? _sourceScript; + private readonly ILogger _logger; + + public AuditingDbConnection( + DbConnection inner, + IAuditWriter auditWriter, + string connectionName, + string siteId, + string instanceName, + string? sourceScript, + ILogger logger) + { + _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); + _connectionName = connectionName ?? throw new ArgumentNullException(nameof(connectionName)); + _siteId = siteId ?? string.Empty; + _instanceName = instanceName ?? string.Empty; + _sourceScript = sourceScript; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + // ConnectionString is settable on DbConnection — forward both halves. + public override string ConnectionString + { + // Some providers throw on get when the connection hasn't been opened + // with a string set explicitly. The wrapper has no opinion — forward. +#pragma warning disable CS8765 // nullability of overridden member parameter — base setter accepts null in practice + get => _inner.ConnectionString; + set => _inner.ConnectionString = value; +#pragma warning restore CS8765 + } + + public override string Database => _inner.Database; + public override string DataSource => _inner.DataSource; + public override string ServerVersion => _inner.ServerVersion; + public override ConnectionState State => _inner.State; + + public override void ChangeDatabase(string databaseName) => _inner.ChangeDatabase(databaseName); + public override void Close() => _inner.Close(); + public override void Open() => _inner.Open(); + public override Task OpenAsync(CancellationToken cancellationToken) => _inner.OpenAsync(cancellationToken); + + protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) + => _inner.BeginTransaction(isolationLevel); + + protected override DbCommand CreateDbCommand() + { + var innerCmd = _inner.CreateCommand(); + // Hand the script an auditing wrapper. The wrapper preserves the + // inner command's identity for parameters / type maps via delegation. + return new AuditingDbCommand( + innerCmd, + _auditWriter, + _connectionName, + _siteId, + _instanceName, + _sourceScript, + _logger); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + // DbConnection.DisposeAsync is virtual; calling base would run the + // synchronous Dispose path. Forward to the inner connection + // asynchronously and short-circuit the base. + var task = _inner.DisposeAsync(); + GC.SuppressFinalize(this); + return task; + } +} diff --git a/src/ScadaLink.SiteRuntime/Scripts/AuditingDbDataReader.cs b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbDataReader.cs new file mode 100644 index 0000000..3b1eae1 --- /dev/null +++ b/src/ScadaLink.SiteRuntime/Scripts/AuditingDbDataReader.cs @@ -0,0 +1,157 @@ +using System.Collections; +using System.Data.Common; + +namespace ScadaLink.SiteRuntime.Scripts; + +/// +/// Audit Log #23 — M4 Bundle A: decorator that +/// counts the number of rows read by the script and fires a single audit +/// emission callback when the reader closes. +/// +/// +/// +/// The wrapping reader counts each successful / +/// and invokes onClose +/// exactly once — on , , or +/// disposal — with the running tally. This lets +/// emit one +/// DbOutbound/DbWrite row per ExecuteReader with +/// Extra.rowsReturned populated, matching the M4 vocabulary lock. +/// +/// +/// Multiple result sets via are folded into a single +/// rowsReturned tally — the script sees one audit row per +/// ExecuteReader call, not per result set. +/// +/// +internal sealed class AuditingDbDataReader : DbDataReader +{ + private readonly DbDataReader _inner; + private readonly Action _onClose; + private int _rowsReturned; + private bool _closed; + + public AuditingDbDataReader(DbDataReader inner, Action onClose) + { + _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + _onClose = onClose ?? throw new ArgumentNullException(nameof(onClose)); + } + + // -- Row-count interception ------------------------------------------ + + public override bool Read() + { + var more = _inner.Read(); + if (more) _rowsReturned++; + return more; + } + + public override async Task ReadAsync(CancellationToken cancellationToken) + { + var more = await _inner.ReadAsync(cancellationToken).ConfigureAwait(false); + if (more) _rowsReturned++; + return more; + } + + public override void Close() + { + if (!_closed) + { + _closed = true; + try { _inner.Close(); } + finally { SafeFireOnClose(); } + } + } + + public override async Task CloseAsync() + { + if (!_closed) + { + _closed = true; + try { await _inner.CloseAsync().ConfigureAwait(false); } + finally { SafeFireOnClose(); } + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + // DbDataReader.Dispose calls Close on most providers, but we + // guard with _closed to ensure onClose fires exactly once. + if (!_closed) + { + _closed = true; + try { _inner.Dispose(); } + finally { SafeFireOnClose(); } + } + else + { + _inner.Dispose(); + } + } + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + if (!_closed) + { + _closed = true; + try { await _inner.DisposeAsync().ConfigureAwait(false); } + finally { SafeFireOnClose(); } + } + else + { + await _inner.DisposeAsync().ConfigureAwait(false); + } + GC.SuppressFinalize(this); + } + + private void SafeFireOnClose() + { + // The onClose callback runs the audit emission, which is itself + // best-effort and swallows internally — but defend the reader's own + // close path anyway so an audit fault never propagates out of + // Close/Dispose. + try { _onClose(_rowsReturned); } + catch { /* audit emission is best-effort by contract */ } + } + + // -- Forwarded surface ------------------------------------------------ + + public override object this[int ordinal] => _inner[ordinal]; + public override object this[string name] => _inner[name]; + public override int Depth => _inner.Depth; + public override int FieldCount => _inner.FieldCount; + public override bool HasRows => _inner.HasRows; + public override bool IsClosed => _inner.IsClosed; + public override int RecordsAffected => _inner.RecordsAffected; + public override int VisibleFieldCount => _inner.VisibleFieldCount; + public override bool GetBoolean(int ordinal) => _inner.GetBoolean(ordinal); + public override byte GetByte(int ordinal) => _inner.GetByte(ordinal); + public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length) + => _inner.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length); + public override char GetChar(int ordinal) => _inner.GetChar(ordinal); + public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length) + => _inner.GetChars(ordinal, dataOffset, buffer, bufferOffset, length); + public override string GetDataTypeName(int ordinal) => _inner.GetDataTypeName(ordinal); + public override DateTime GetDateTime(int ordinal) => _inner.GetDateTime(ordinal); + public override decimal GetDecimal(int ordinal) => _inner.GetDecimal(ordinal); + public override double GetDouble(int ordinal) => _inner.GetDouble(ordinal); + public override IEnumerator GetEnumerator() => ((IEnumerable)_inner).GetEnumerator(); + public override Type GetFieldType(int ordinal) => _inner.GetFieldType(ordinal); + public override float GetFloat(int ordinal) => _inner.GetFloat(ordinal); + public override Guid GetGuid(int ordinal) => _inner.GetGuid(ordinal); + public override short GetInt16(int ordinal) => _inner.GetInt16(ordinal); + public override int GetInt32(int ordinal) => _inner.GetInt32(ordinal); + public override long GetInt64(int ordinal) => _inner.GetInt64(ordinal); + public override string GetName(int ordinal) => _inner.GetName(ordinal); + public override int GetOrdinal(string name) => _inner.GetOrdinal(name); + public override string GetString(int ordinal) => _inner.GetString(ordinal); + public override object GetValue(int ordinal) => _inner.GetValue(ordinal); + public override int GetValues(object[] values) => _inner.GetValues(values); + public override bool IsDBNull(int ordinal) => _inner.IsDBNull(ordinal); + public override bool NextResult() => _inner.NextResult(); + public override Task NextResultAsync(CancellationToken cancellationToken) => _inner.NextResultAsync(cancellationToken); +} diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index b5eceaa..aed1532 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -252,7 +252,16 @@ public class ScriptRuntimeContext /// Database.CachedWrite("name", "sql", params) /// public DatabaseHelper Database => new( - _databaseGateway, _instanceName, _logger, _siteId, _sourceScript, + _databaseGateway, + _instanceName, + _logger, + // Audit Log #23 (M4 Bundle A): wire the IAuditWriter so + // Database.Connection(name) returns an auditing decorator that + // emits one DbOutbound/DbWrite row per script-initiated + // Execute / ExecuteScalar / ExecuteReader. + _auditWriter, + _siteId, + _sourceScript, // Audit Log #23 (M3 Bundle E — Task E6): emit CachedSubmit telemetry on // every Database.CachedWrite enqueue. _cachedForwarder); @@ -894,10 +903,23 @@ public class ScriptRuntimeContext private readonly string? _sourceScript; private readonly ICachedCallTelemetryForwarder? _cachedForwarder; + /// + /// Audit Log #23 (M4 Bundle A): best-effort emitter for synchronous + /// Database.Connection-routed Execute / ExecuteScalar / + /// ExecuteReader calls. When wired, returns + /// an that intercepts each command + /// execution and writes one DbOutbound/DbWrite audit + /// row. Optional — when null the helper falls back to the raw + /// inner the gateway + /// returns (tests / minimal hosts that don't wire audit). + /// + private readonly IAuditWriter? _auditWriter; + internal DatabaseHelper( IDatabaseGateway? gateway, string instanceName, ILogger logger, + IAuditWriter? auditWriter = null, string siteId = "", string? sourceScript = null, ICachedCallTelemetryForwarder? cachedForwarder = null) @@ -905,6 +927,7 @@ public class ScriptRuntimeContext _gateway = gateway; _instanceName = instanceName; _logger = logger; + _auditWriter = auditWriter; _siteId = siteId; _sourceScript = sourceScript; _cachedForwarder = cachedForwarder; @@ -917,7 +940,28 @@ public class ScriptRuntimeContext if (_gateway == null) throw new InvalidOperationException("Database gateway not available"); - return await _gateway.GetConnectionAsync(name, cancellationToken); + var inner = await _gateway.GetConnectionAsync(name, cancellationToken); + + // Audit Log #23 (M4 Bundle A): wrap in an auditing decorator so + // every script-initiated Execute* / ExecuteReader on the returned + // connection emits one DbOutbound/DbWrite audit row. The wrapper + // delegates all other ADO.NET behaviour to the inner connection + // unchanged — including disposal, so the caller's existing + // dispose pattern (await using var conn = ...) still releases + // the underlying connection to the pool. + if (_auditWriter == null) + { + return inner; + } + + return new AuditingDbConnection( + inner, + _auditWriter, + connectionName: name, + siteId: _siteId, + instanceName: _instanceName, + sourceScript: _sourceScript, + logger: _logger); } /// diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseSyncEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseSyncEmissionTests.cs new file mode 100644 index 0000000..f2ca536 --- /dev/null +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseSyncEmissionTests.cs @@ -0,0 +1,296 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.SiteRuntime.Scripts; + +namespace ScadaLink.SiteRuntime.Tests.Scripts; + +/// +/// Audit Log #23 — M4 Bundle A (Tasks A1+A2): every synchronous DB call made +/// through Database.Connection("name") emits exactly one +/// DbOutbound/DbWrite audit event with an Extra envelope +/// distinguishing writes (op="write", rowsAffected=N) from reads +/// (op="read", rowsReturned=N). The audit emission is +/// best-effort — a thrown must never +/// abort the script's call, and the original ADO.NET result (or original +/// exception) must surface to the caller unchanged. +/// +public class DatabaseSyncEmissionTests +{ + /// + /// In-memory mirroring the M2 Bundle F stub — + /// captures every event and may be configured to throw to verify the + /// 3-layer fail-safe (mirrors CapturingAuditWriter in + /// ExternalSystemCallAuditEmissionTests). + /// + private sealed class CapturingAuditWriter : IAuditWriter + { + public List Events { get; } = new(); + public Exception? ThrowOnWrite { get; set; } + + public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + if (ThrowOnWrite != null) + { + return Task.FromException(ThrowOnWrite); + } + + Events.Add(evt); + return Task.CompletedTask; + } + } + + private const string SiteId = "site-77"; + private const string InstanceName = "Plant.Pump42"; + private const string SourceScript = "ScriptActor:Sync"; + private const string ConnectionName = "machineData"; + + private static ScriptRuntimeContext.DatabaseHelper CreateHelper( + IDatabaseGateway gateway, + IAuditWriter? auditWriter) + { + return new ScriptRuntimeContext.DatabaseHelper( + gateway, + InstanceName, + NullLogger.Instance, + auditWriter: auditWriter, + siteId: SiteId, + sourceScript: SourceScript, + cachedForwarder: null); + } + + /// + /// Spin up a fresh in-memory SQLite database with a tiny single-table + /// schema we can write to and read from. The connection is returned in + /// the open state so the test only has to call Connection() via + /// the helper. SQLite in-memory databases live as long as the connection + /// holding them, so the keep-alive root must outlive any auditing + /// wrapper the test exercises. + /// + private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive) + { + // The shared-cache name is per-test (Guid) so concurrent tests don't + // collide. mode=memory keeps it RAM-only; cache=shared lets the + // keep-alive root and the gateway-returned connection see the same + // in-memory DB. The keepAlive connection must remain open for the + // duration of the test or the in-memory DB is discarded. + var dbName = $"db-{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + + keepAlive = new SqliteConnection(connStr); + keepAlive.Open(); + using (var seed = keepAlive.CreateCommand()) + { + seed.CommandText = + "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);" + + "INSERT INTO t (id, name) VALUES (1, 'alpha');" + + "INSERT INTO t (id, name) VALUES (2, 'beta');"; + seed.ExecuteNonQuery(); + } + + var live = new SqliteConnection(connStr); + live.Open(); + return live; + } + + [Fact] + public async Task Execute_InsertSuccess_EmitsOneEvent_KindDbWrite_StatusDelivered_OpWrite_RowsAffected() + { + using var keepAlive = new SqliteConnection("Data Source=k;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT INTO t (id, name) VALUES (3, 'gamma')"; + var rows = await cmd.ExecuteNonQueryAsync(); + + Assert.Equal(1, rows); + var evt = Assert.Single(writer.Events); + Assert.Equal(AuditChannel.DbOutbound, evt.Channel); + Assert.Equal(AuditKind.DbWrite, evt.Kind); + Assert.Equal(AuditStatus.Delivered, evt.Status); + Assert.Equal(AuditForwardState.Pending, evt.ForwardState); + Assert.NotNull(evt.Extra); + Assert.Contains("\"op\":\"write\"", evt.Extra); + Assert.Contains("\"rowsAffected\":1", evt.Extra); + Assert.Equal(DateTimeKind.Utc, evt.OccurredAtUtc.Kind); + Assert.NotEqual(Guid.Empty, evt.EventId); + Assert.StartsWith(ConnectionName, evt.Target); + } + + [Fact] + public async Task ExecuteScalar_Success_EmitsKindDbWrite_OpWrite() + { + using var keepAlive = new SqliteConnection("Data Source=k2;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM t"; + var scalar = await cmd.ExecuteScalarAsync(); + + Assert.NotNull(scalar); + var evt = Assert.Single(writer.Events); + Assert.Equal(AuditChannel.DbOutbound, evt.Channel); + Assert.Equal(AuditKind.DbWrite, evt.Kind); + Assert.Equal(AuditStatus.Delivered, evt.Status); + Assert.NotNull(evt.Extra); + // ExecuteScalar is classified as "write" per the M4 vocabulary lock + // (Channel=DbOutbound, Kind=DbWrite, Extra.op="write") — the + // rowsAffected for a SELECT-on-SqlCommand is -1 in ADO.NET; the audit + // wrapper records whatever DbCommand.ExecuteScalar returned via the + // built-in path, plus the rowsAffected counter the wrapper observed. + Assert.Contains("\"op\":\"write\"", evt.Extra); + Assert.Contains("rowsAffected", evt.Extra); + } + + [Fact] + public async Task Execute_Throws_EmitsEvent_StatusFailed_ErrorMessageSet() + { + using var keepAlive = new SqliteConnection("Data Source=k3;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + // Reference an undefined column — SQLite throws SqliteException synchronously. + cmd.CommandText = "INSERT INTO t (does_not_exist) VALUES (1)"; + await Assert.ThrowsAsync(() => cmd.ExecuteNonQueryAsync()); + + var evt = Assert.Single(writer.Events); + Assert.Equal(AuditStatus.Failed, evt.Status); + Assert.False(string.IsNullOrEmpty(evt.ErrorMessage)); + Assert.NotNull(evt.ErrorDetail); + Assert.Contains("does_not_exist", evt.ErrorDetail); + } + + [Fact] + public async Task ExecuteReader_Success_EmitsKindDbWrite_OpRead_RowsReturned() + { + using var keepAlive = new SqliteConnection("Data Source=k4;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, name FROM t ORDER BY id"; + await using var reader = await cmd.ExecuteReaderAsync(); + var rows = 0; + while (await reader.ReadAsync()) + { + rows++; + } + // Close the reader explicitly so the audit emission (deferred to + // reader-close per the wrapper contract) fires before assertion. + await reader.CloseAsync(); + + Assert.Equal(2, rows); + var evt = Assert.Single(writer.Events); + Assert.Equal(AuditChannel.DbOutbound, evt.Channel); + Assert.Equal(AuditKind.DbWrite, evt.Kind); + Assert.Equal(AuditStatus.Delivered, evt.Status); + Assert.NotNull(evt.Extra); + Assert.Contains("\"op\":\"read\"", evt.Extra); + Assert.Contains("\"rowsReturned\":2", evt.Extra); + } + + [Fact] + public async Task AuditWriter_Throws_ScriptCall_ReturnsOriginalResult() + { + using var keepAlive = new SqliteConnection("Data Source=k5;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter + { + ThrowOnWrite = new InvalidOperationException("audit writer down") + }; + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT INTO t (id, name) VALUES (4, 'delta')"; + var rows = await cmd.ExecuteNonQueryAsync(); + + // Original ADO.NET result must surface unchanged despite the audit + // writer faulting — the wrapper swallows + logs the audit failure. + Assert.Equal(1, rows); + Assert.Empty(writer.Events); + } + + [Fact] + public async Task Provenance_PopulatedFromContext() + { + using var keepAlive = new SqliteConnection("Data Source=k6;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT INTO t (id, name) VALUES (5, 'epsilon')"; + await cmd.ExecuteNonQueryAsync(); + + var evt = Assert.Single(writer.Events); + Assert.Equal(SiteId, evt.SourceSiteId); + Assert.Equal(InstanceName, evt.SourceInstanceId); + Assert.Equal(SourceScript, evt.SourceScript); + Assert.Null(evt.Actor); + Assert.Null(evt.CorrelationId); + Assert.NotEqual(Guid.Empty, evt.EventId); + } + + [Fact] + public async Task DurationMs_NonZero() + { + using var keepAlive = new SqliteConnection("Data Source=k7;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out var _); + var gateway = new Mock(); + gateway + .Setup(g => g.GetConnectionAsync(ConnectionName, It.IsAny())) + .ReturnsAsync(inner); + var writer = new CapturingAuditWriter(); + + var helper = CreateHelper(gateway.Object, writer); + await using var conn = await helper.Connection(ConnectionName); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT INTO t (id, name) VALUES (6, 'zeta')"; + await cmd.ExecuteNonQueryAsync(); + + var evt = Assert.Single(writer.Events); + Assert.NotNull(evt.DurationMs); + Assert.True(evt.DurationMs >= 0, $"DurationMs={evt.DurationMs} should be >= 0"); + Assert.True(evt.DurationMs <= 5000, $"DurationMs={evt.DurationMs} should be <= 5000"); + } +}