diff --git a/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj b/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj
index bfa3f5b..da712a0 100644
--- a/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj
+++ b/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj
@@ -22,6 +22,14 @@
+
+
diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs
new file mode 100644
index 0000000..5a73256
--- /dev/null
+++ b/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs
@@ -0,0 +1,297 @@
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using Microsoft.Data.Sqlite;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using NSubstitute;
+using ScadaLink.AuditLog.Central;
+using ScadaLink.AuditLog.Site;
+using ScadaLink.AuditLog.Site.Telemetry;
+using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Repositories;
+using ScadaLink.Commons.Interfaces.Services;
+using ScadaLink.Commons.Types.Audit;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.ConfigurationDatabase;
+using ScadaLink.ConfigurationDatabase.Repositories;
+using ScadaLink.ConfigurationDatabase.Tests.Migrations;
+using ScadaLink.SiteRuntime.Scripts;
+
+namespace ScadaLink.AuditLog.Tests.Integration;
+
+///
+/// Audit Log #23 — M4 Bundle E (Task E1) end-to-end suite verifying every
+/// synchronous Database.Connection(name).Execute* /
+/// ExecuteReader call made via the Bundle A
+/// emits exactly one
+/// / row
+/// that materialises in the central MSSQL AuditLog via the production
+/// site-SQLite + telemetry-actor + central ingest-actor pipeline.
+///
+///
+///
+/// Composes the same pipeline as the M2 :
+/// in-memory + +
+/// on the site, drained by a real
+/// through a
+/// stub that short-circuits the
+/// gRPC wire and Asks the central backed by
+/// the real on the per-class
+/// MSSQL database.
+///
+///
+/// Drives the AuditingDbConnection wrapper directly via
+/// 's internal ctor (the
+/// AuditLog tests project has InternalsVisibleTo on SiteRuntime). No
+/// script runtime, no Akka Instance Actor — the test wires the helper, opens
+/// an in-memory SQLite connection through a stub ,
+/// runs one SQL statement, and waits for the central row to land. Each test
+/// uses a unique SourceSiteId (Guid suffix) so concurrent tests
+/// sharing the MSSQL fixture don't interfere with each other.
+///
+///
+public class DatabaseSyncEmissionEndToEndTests : TestKit, IClassFixture
+{
+ private readonly MsSqlMigrationFixture _fixture;
+
+ public DatabaseSyncEmissionEndToEndTests(MsSqlMigrationFixture fixture)
+ {
+ _fixture = fixture;
+ }
+
+ private const string ConnectionName = "machineData";
+ private const string InstanceName = "Plant.Pump42";
+ private const string SourceScript = "ScriptActor:doDbWork";
+
+ private static string NewSiteId() =>
+ "test-e1-db-" + Guid.NewGuid().ToString("N").Substring(0, 8);
+
+ private ScadaLinkDbContext CreateContext()
+ {
+ var options = new DbContextOptionsBuilder()
+ .UseSqlServer(_fixture.ConnectionString)
+ .Options;
+ return new ScadaLinkDbContext(options);
+ }
+
+ ///
+ /// Per-test in-memory SQLite database with a tiny 2-row schema we can both
+ /// write to and select from. Mirrors the pattern from
+ /// DatabaseSyncEmissionTests — the keep-alive root keeps the
+ /// in-memory database file pinned for the duration of the test, while the
+ /// returned live connection is what the stub gateway hands back to
+ /// the auditing wrapper.
+ ///
+ private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive)
+ {
+ var dbName = $"db-{Guid.NewGuid():N}";
+ var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
+
+ keepAlive = new SqliteConnection(connStr);
+ keepAlive.Open();
+ using (var seed = keepAlive.CreateCommand())
+ {
+ seed.CommandText =
+ "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);" +
+ "INSERT INTO t (id, name) VALUES (1, 'alpha');" +
+ "INSERT INTO t (id, name) VALUES (2, 'beta');";
+ seed.ExecuteNonQuery();
+ }
+
+ var live = new SqliteConnection(connStr);
+ live.Open();
+ return live;
+ }
+
+ private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
+ new(
+ Options.Create(new SqliteAuditWriterOptions
+ {
+ DatabasePath = "ignored",
+ BatchSize = 64,
+ ChannelCapacity = 1024,
+ }),
+ NullLogger.Instance,
+ connectionStringOverride:
+ $"Data Source=file:auditlog-e1-{Guid.NewGuid():N}?mode=memory&cache=shared");
+
+ private static IOptions FastTelemetryOptions() =>
+ Options.Create(new SiteAuditTelemetryOptions
+ {
+ BatchSize = 256,
+ // 1s on both intervals so the initial scheduled tick fires quickly
+ // — drains the SQLite Pending row and pushes it through the stub
+ // gRPC client into the central ingest actor.
+ BusyIntervalSeconds = 1,
+ IdleIntervalSeconds = 1,
+ });
+
+ private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
+ Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
+ repo,
+ NullLogger.Instance)));
+
+ private IActorRef CreateTelemetryActor(
+ ISiteAuditQueue queue,
+ ISiteStreamAuditClient client) =>
+ Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
+ queue,
+ client,
+ FastTelemetryOptions(),
+ NullLogger.Instance)));
+
+ ///
+ /// Wires the production
+ /// (internal ctor) onto
+ /// the supplied +
+ /// with the test's site id and source script. The returned helper's
+ /// Connection(...) hands back a real AuditingDbConnection.
+ ///
+ private static ScriptRuntimeContext.DatabaseHelper CreateHelper(
+ IDatabaseGateway gateway,
+ IAuditWriter writer,
+ string siteId) =>
+ new(
+ gateway,
+ InstanceName,
+ NullLogger.Instance,
+ auditWriter: writer,
+ siteId: siteId,
+ sourceScript: SourceScript,
+ cachedForwarder: null);
+
+ [SkippableFact]
+ public async Task DbWrite_Insert_Emits_OneCentralRow_WithExtraOpWrite_AndRowsAffected()
+ {
+ Skip.IfNot(_fixture.Available, _fixture.SkipReason);
+
+ var siteId = NewSiteId();
+
+ // Central — repository + ingest actor backed by the MSSQL fixture.
+ await using var ingestContext = CreateContext();
+ var ingestRepo = new AuditLogRepository(ingestContext);
+ var ingestActor = CreateIngestActor(ingestRepo);
+
+ // Site — SQLite audit writer + ring + fallback + telemetry actor that
+ // drains into the stub gRPC client which forwards to the ingest actor.
+ await using var sqliteWriter = CreateInMemorySqliteWriter();
+ var ring = new RingBufferFallback();
+ var fallback = new FallbackAuditWriter(
+ sqliteWriter,
+ ring,
+ new NoOpAuditWriteFailureCounter(),
+ NullLogger.Instance);
+ var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
+ CreateTelemetryActor(sqliteWriter, stubClient);
+
+ // SQLite-backed inner connection — the stub gateway hands it to the
+ // auditing wrapper as the DbConnection the script would have got.
+ using var keepAlive = new SqliteConnection("Data Source=k1;Mode=Memory;Cache=Shared");
+ var inner = NewInMemoryDb(out _);
+ var gateway = Substitute.For();
+ gateway.GetConnectionAsync(ConnectionName, Arg.Any())
+ .Returns(inner);
+
+ // Act — one INSERT through the auditing wrapper. The wrapper emits a
+ // single DbOutbound/DbWrite event to the fallback writer; the
+ // telemetry actor's next tick drains it to central.
+ var helper = CreateHelper(gateway, fallback, siteId);
+ await using (var conn = await helper.Connection(ConnectionName))
+ await using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "INSERT INTO t (id, name) VALUES (3, 'gamma')";
+ var rows = await cmd.ExecuteNonQueryAsync();
+ Assert.Equal(1, rows);
+ }
+
+ // Assert — one central row, Kind=DbWrite, Status=Delivered,
+ // Extra.op="write", Extra.rowsAffected=1. 15s upper bound covers the
+ // initial 1s tick + SQLite drain + actor round-trip + EF/MSSQL latency.
+ await AwaitAssertAsync(async () =>
+ {
+ await using var readContext = CreateContext();
+ var readRepo = new AuditLogRepository(readContext);
+ var rows = await readRepo.QueryAsync(
+ new AuditLogQueryFilter(SourceSiteId: siteId),
+ new AuditLogPaging(PageSize: 10));
+ var evt = Assert.Single(rows);
+ Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
+ Assert.Equal(AuditKind.DbWrite, evt.Kind);
+ Assert.Equal(AuditStatus.Delivered, evt.Status);
+ Assert.Equal(siteId, evt.SourceSiteId);
+ Assert.Equal(InstanceName, evt.SourceInstanceId);
+ Assert.Equal(SourceScript, evt.SourceScript);
+ Assert.NotNull(evt.Extra);
+ Assert.Contains("\"op\":\"write\"", evt.Extra);
+ Assert.Contains("\"rowsAffected\":1", evt.Extra);
+ // Central stamps IngestedAtUtc; the site never sets it.
+ Assert.NotNull(evt.IngestedAtUtc);
+ Assert.StartsWith(ConnectionName, evt.Target);
+ }, TimeSpan.FromSeconds(15));
+ }
+
+ [SkippableFact]
+ public async Task DbWrite_Select_Emits_OneCentralRow_WithExtraOpRead_AndRowsReturned()
+ {
+ Skip.IfNot(_fixture.Available, _fixture.SkipReason);
+
+ var siteId = NewSiteId();
+
+ await using var ingestContext = CreateContext();
+ var ingestRepo = new AuditLogRepository(ingestContext);
+ var ingestActor = CreateIngestActor(ingestRepo);
+
+ await using var sqliteWriter = CreateInMemorySqliteWriter();
+ var ring = new RingBufferFallback();
+ var fallback = new FallbackAuditWriter(
+ sqliteWriter,
+ ring,
+ new NoOpAuditWriteFailureCounter(),
+ NullLogger.Instance);
+ var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
+ CreateTelemetryActor(sqliteWriter, stubClient);
+
+ using var keepAlive = new SqliteConnection("Data Source=k2;Mode=Memory;Cache=Shared");
+ var inner = NewInMemoryDb(out _);
+ var gateway = Substitute.For();
+ gateway.GetConnectionAsync(ConnectionName, Arg.Any())
+ .Returns(inner);
+
+ var helper = CreateHelper(gateway, fallback, siteId);
+ await using (var conn = await helper.Connection(ConnectionName))
+ await using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "SELECT id, name FROM t ORDER BY id";
+ await using var reader = await cmd.ExecuteReaderAsync();
+ var seen = 0;
+ while (await reader.ReadAsync())
+ {
+ seen++;
+ }
+ // Explicit close so the AuditingDbDataReader callback fires before
+ // the helper is disposed (Bundle A defers the audit emission to
+ // reader-close so rowsReturned is observable).
+ await reader.CloseAsync();
+ Assert.Equal(2, seen);
+ }
+
+ await AwaitAssertAsync(async () =>
+ {
+ await using var readContext = CreateContext();
+ var readRepo = new AuditLogRepository(readContext);
+ var rows = await readRepo.QueryAsync(
+ new AuditLogQueryFilter(SourceSiteId: siteId),
+ new AuditLogPaging(PageSize: 10));
+ var evt = Assert.Single(rows);
+ Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
+ Assert.Equal(AuditKind.DbWrite, evt.Kind);
+ Assert.Equal(AuditStatus.Delivered, evt.Status);
+ Assert.NotNull(evt.Extra);
+ Assert.Contains("\"op\":\"read\"", evt.Extra);
+ Assert.Contains("\"rowsReturned\":2", evt.Extra);
+ Assert.NotNull(evt.IngestedAtUtc);
+ }, TimeSpan.FromSeconds(15));
+ }
+}