From 56b26339caf3d2d319b30421664d03a363a49e99 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 16:43:55 -0400 Subject: [PATCH] test(auditlog): DB sync emission end-to-end (#23 M4) --- .../ScadaLink.SiteRuntime.csproj | 8 + .../DatabaseSyncEmissionEndToEndTests.cs | 297 ++++++++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs diff --git a/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj b/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj index bfa3f5b..da712a0 100644 --- a/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj +++ b/src/ScadaLink.SiteRuntime/ScadaLink.SiteRuntime.csproj @@ -22,6 +22,14 @@ + + diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs new file mode 100644 index 0000000..5a73256 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/DatabaseSyncEmissionEndToEndTests.cs @@ -0,0 +1,297 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Data.Sqlite; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.AuditLog.Central; +using ScadaLink.AuditLog.Site; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.SiteRuntime.Scripts; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Audit Log #23 — M4 Bundle E (Task E1) end-to-end suite verifying every +/// synchronous Database.Connection(name).Execute* / +/// ExecuteReader call made via the Bundle A +/// emits exactly one +/// / row +/// that materialises in the central MSSQL AuditLog via the production +/// site-SQLite + telemetry-actor + central ingest-actor pipeline. +/// +/// +/// +/// Composes the same pipeline as the M2 : +/// in-memory + + +/// on the site, drained by a real +/// through a +/// stub that short-circuits the +/// gRPC wire and Asks the central backed by +/// the real on the per-class +/// MSSQL database. +/// +/// +/// Drives the AuditingDbConnection wrapper directly via +/// 's internal ctor (the +/// AuditLog tests project has InternalsVisibleTo on SiteRuntime). No +/// script runtime, no Akka Instance Actor — the test wires the helper, opens +/// an in-memory SQLite connection through a stub , +/// runs one SQL statement, and waits for the central row to land. Each test +/// uses a unique SourceSiteId (Guid suffix) so concurrent tests +/// sharing the MSSQL fixture don't interfere with each other. +/// +/// +public class DatabaseSyncEmissionEndToEndTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public DatabaseSyncEmissionEndToEndTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private const string ConnectionName = "machineData"; + private const string InstanceName = "Plant.Pump42"; + private const string SourceScript = "ScriptActor:doDbWork"; + + private static string NewSiteId() => + "test-e1-db-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + /// + /// Per-test in-memory SQLite database with a tiny 2-row schema we can both + /// write to and select from. Mirrors the pattern from + /// DatabaseSyncEmissionTests — the keep-alive root keeps the + /// in-memory database file pinned for the duration of the test, while the + /// returned live connection is what the stub gateway hands back to + /// the auditing wrapper. + /// + private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive) + { + var dbName = $"db-{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + + keepAlive = new SqliteConnection(connStr); + keepAlive.Open(); + using (var seed = keepAlive.CreateCommand()) + { + seed.CommandText = + "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);" + + "INSERT INTO t (id, name) VALUES (1, 'alpha');" + + "INSERT INTO t (id, name) VALUES (2, 'beta');"; + seed.ExecuteNonQuery(); + } + + var live = new SqliteConnection(connStr); + live.Open(); + return live; + } + + private static SqliteAuditWriter CreateInMemorySqliteWriter() => + new( + Options.Create(new SqliteAuditWriterOptions + { + DatabasePath = "ignored", + BatchSize = 64, + ChannelCapacity = 1024, + }), + NullLogger.Instance, + connectionStringOverride: + $"Data Source=file:auditlog-e1-{Guid.NewGuid():N}?mode=memory&cache=shared"); + + private static IOptions FastTelemetryOptions() => + Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = 256, + // 1s on both intervals so the initial scheduled tick fires quickly + // — drains the SQLite Pending row and pushes it through the stub + // gRPC client into the central ingest actor. + BusyIntervalSeconds = 1, + IdleIntervalSeconds = 1, + }); + + private IActorRef CreateIngestActor(IAuditLogRepository repo) => + Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + repo, + NullLogger.Instance))); + + private IActorRef CreateTelemetryActor( + ISiteAuditQueue queue, + ISiteStreamAuditClient client) => + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + queue, + client, + FastTelemetryOptions(), + NullLogger.Instance))); + + /// + /// Wires the production + /// (internal ctor) onto + /// the supplied + + /// with the test's site id and source script. The returned helper's + /// Connection(...) hands back a real AuditingDbConnection. + /// + private static ScriptRuntimeContext.DatabaseHelper CreateHelper( + IDatabaseGateway gateway, + IAuditWriter writer, + string siteId) => + new( + gateway, + InstanceName, + NullLogger.Instance, + auditWriter: writer, + siteId: siteId, + sourceScript: SourceScript, + cachedForwarder: null); + + [SkippableFact] + public async Task DbWrite_Insert_Emits_OneCentralRow_WithExtraOpWrite_AndRowsAffected() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + // Central — repository + ingest actor backed by the MSSQL fixture. + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + // Site — SQLite audit writer + ring + fallback + telemetry actor that + // drains into the stub gRPC client which forwards to the ingest actor. + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + CreateTelemetryActor(sqliteWriter, stubClient); + + // SQLite-backed inner connection — the stub gateway hands it to the + // auditing wrapper as the DbConnection the script would have got. + using var keepAlive = new SqliteConnection("Data Source=k1;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out _); + var gateway = Substitute.For(); + gateway.GetConnectionAsync(ConnectionName, Arg.Any()) + .Returns(inner); + + // Act — one INSERT through the auditing wrapper. The wrapper emits a + // single DbOutbound/DbWrite event to the fallback writer; the + // telemetry actor's next tick drains it to central. + var helper = CreateHelper(gateway, fallback, siteId); + await using (var conn = await helper.Connection(ConnectionName)) + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "INSERT INTO t (id, name) VALUES (3, 'gamma')"; + var rows = await cmd.ExecuteNonQueryAsync(); + Assert.Equal(1, rows); + } + + // Assert — one central row, Kind=DbWrite, Status=Delivered, + // Extra.op="write", Extra.rowsAffected=1. 15s upper bound covers the + // initial 1s tick + SQLite drain + actor round-trip + EF/MSSQL latency. + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 10)); + var evt = Assert.Single(rows); + Assert.Equal(AuditChannel.DbOutbound, evt.Channel); + Assert.Equal(AuditKind.DbWrite, evt.Kind); + Assert.Equal(AuditStatus.Delivered, evt.Status); + Assert.Equal(siteId, evt.SourceSiteId); + Assert.Equal(InstanceName, evt.SourceInstanceId); + Assert.Equal(SourceScript, evt.SourceScript); + Assert.NotNull(evt.Extra); + Assert.Contains("\"op\":\"write\"", evt.Extra); + Assert.Contains("\"rowsAffected\":1", evt.Extra); + // Central stamps IngestedAtUtc; the site never sets it. + Assert.NotNull(evt.IngestedAtUtc); + Assert.StartsWith(ConnectionName, evt.Target); + }, TimeSpan.FromSeconds(15)); + } + + [SkippableFact] + public async Task DbWrite_Select_Emits_OneCentralRow_WithExtraOpRead_AndRowsReturned() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + CreateTelemetryActor(sqliteWriter, stubClient); + + using var keepAlive = new SqliteConnection("Data Source=k2;Mode=Memory;Cache=Shared"); + var inner = NewInMemoryDb(out _); + var gateway = Substitute.For(); + gateway.GetConnectionAsync(ConnectionName, Arg.Any()) + .Returns(inner); + + var helper = CreateHelper(gateway, fallback, siteId); + await using (var conn = await helper.Connection(ConnectionName)) + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "SELECT id, name FROM t ORDER BY id"; + await using var reader = await cmd.ExecuteReaderAsync(); + var seen = 0; + while (await reader.ReadAsync()) + { + seen++; + } + // Explicit close so the AuditingDbDataReader callback fires before + // the helper is disposed (Bundle A defers the audit emission to + // reader-close so rowsReturned is observable). + await reader.CloseAsync(); + Assert.Equal(2, seen); + } + + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 10)); + var evt = Assert.Single(rows); + Assert.Equal(AuditChannel.DbOutbound, evt.Channel); + Assert.Equal(AuditKind.DbWrite, evt.Kind); + Assert.Equal(AuditStatus.Delivered, evt.Status); + Assert.NotNull(evt.Extra); + Assert.Contains("\"op\":\"read\"", evt.Extra); + Assert.Contains("\"rowsReturned\":2", evt.Extra); + Assert.NotNull(evt.IngestedAtUtc); + }, TimeSpan.FromSeconds(15)); + } +}