using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.SiteRuntime.Scripts;
namespace ScadaLink.AuditLog.Tests.Integration;
///
/// Audit Log #23 — M4 Bundle E (Task E1) end-to-end suite verifying every
/// synchronous Database.Connection(name).Execute* /
/// ExecuteReader call made via the Bundle A
/// emits exactly one
/// / row
/// that materialises in the central MSSQL AuditLog via the production
/// site-SQLite + telemetry-actor + central ingest-actor pipeline.
///
///
///
/// Composes the same pipeline as the M2 :
/// in-memory + +
/// on the site, drained by a real
/// through a
/// stub that short-circuits the
/// gRPC wire and Asks the central backed by
/// the real on the per-class
/// MSSQL database.
///
///
/// Drives the AuditingDbConnection wrapper directly via
/// 's internal ctor (the
/// AuditLog tests project has InternalsVisibleTo on SiteRuntime). No
/// script runtime, no Akka Instance Actor — the test wires the helper, opens
/// an in-memory SQLite connection through a stub ,
/// runs one SQL statement, and waits for the central row to land. Each test
/// uses a unique SourceSiteId (Guid suffix) so concurrent tests
/// sharing the MSSQL fixture don't interfere with each other.
///
///
public class DatabaseSyncEmissionEndToEndTests : TestKit, IClassFixture
{
private readonly MsSqlMigrationFixture _fixture;
public DatabaseSyncEmissionEndToEndTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private const string ConnectionName = "machineData";
private const string InstanceName = "Plant.Pump42";
private const string SourceScript = "ScriptActor:doDbWork";
private static string NewSiteId() =>
"test-e1-db-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
///
/// Per-test in-memory SQLite database with a tiny 2-row schema we can both
/// write to and select from. Mirrors the pattern from
/// DatabaseSyncEmissionTests — the keep-alive root keeps the
/// in-memory database file pinned for the duration of the test, while the
/// returned live connection is what the stub gateway hands back to
/// the auditing wrapper.
///
private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive)
{
var dbName = $"db-{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
keepAlive = new SqliteConnection(connStr);
keepAlive.Open();
using (var seed = keepAlive.CreateCommand())
{
seed.CommandText =
"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);" +
"INSERT INTO t (id, name) VALUES (1, 'alpha');" +
"INSERT INTO t (id, name) VALUES (2, 'beta');";
seed.ExecuteNonQuery();
}
var live = new SqliteConnection(connStr);
live.Open();
return live;
}
private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
new(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
}),
NullLogger.Instance,
connectionStringOverride:
$"Data Source=file:auditlog-e1-{Guid.NewGuid():N}?mode=memory&cache=shared");
private static IOptions FastTelemetryOptions() =>
Options.Create(new SiteAuditTelemetryOptions
{
BatchSize = 256,
// 1s on both intervals so the initial scheduled tick fires quickly
// — drains the SQLite Pending row and pushes it through the stub
// gRPC client into the central ingest actor.
BusyIntervalSeconds = 1,
IdleIntervalSeconds = 1,
});
private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
repo,
NullLogger.Instance)));
private IActorRef CreateTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client) =>
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
queue,
client,
FastTelemetryOptions(),
NullLogger.Instance)));
///
/// Wires the production
/// (internal ctor) onto
/// the supplied +
/// with the test's site id and source script. The returned helper's
/// Connection(...) hands back a real AuditingDbConnection.
///
private static ScriptRuntimeContext.DatabaseHelper CreateHelper(
IDatabaseGateway gateway,
IAuditWriter writer,
string siteId) =>
new(
gateway,
InstanceName,
NullLogger.Instance,
Guid.NewGuid(),
auditWriter: writer,
siteId: siteId,
sourceScript: SourceScript,
cachedForwarder: null);
[SkippableFact]
public async Task DbWrite_Insert_Emits_OneCentralRow_WithExtraOpWrite_AndRowsAffected()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
// Central — repository + ingest actor backed by the MSSQL fixture.
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
// Site — SQLite audit writer + ring + fallback + telemetry actor that
// drains into the stub gRPC client which forwards to the ingest actor.
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
CreateTelemetryActor(sqliteWriter, stubClient);
// SQLite-backed inner connection — the stub gateway hands it to the
// auditing wrapper as the DbConnection the script would have got.
using var keepAlive = new SqliteConnection("Data Source=k1;Mode=Memory;Cache=Shared");
var inner = NewInMemoryDb(out _);
var gateway = Substitute.For();
gateway.GetConnectionAsync(ConnectionName, Arg.Any())
.Returns(inner);
// Act — one INSERT through the auditing wrapper. The wrapper emits a
// single DbOutbound/DbWrite event to the fallback writer; the
// telemetry actor's next tick drains it to central.
var helper = CreateHelper(gateway, fallback, siteId);
await using (var conn = await helper.Connection(ConnectionName))
await using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "INSERT INTO t (id, name) VALUES (3, 'gamma')";
var rows = await cmd.ExecuteNonQueryAsync();
Assert.Equal(1, rows);
}
// Assert — one central row, Kind=DbWrite, Status=Delivered,
// Extra.op="write", Extra.rowsAffected=1. 15s upper bound covers the
// initial 1s tick + SQLite drain + actor round-trip + EF/MSSQL latency.
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }),
new AuditLogPaging(PageSize: 10));
var evt = Assert.Single(rows);
Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
Assert.Equal(AuditKind.DbWrite, evt.Kind);
Assert.Equal(AuditStatus.Delivered, evt.Status);
Assert.Equal(siteId, evt.SourceSiteId);
Assert.Equal(InstanceName, evt.SourceInstanceId);
Assert.Equal(SourceScript, evt.SourceScript);
Assert.NotNull(evt.Extra);
Assert.Contains("\"op\":\"write\"", evt.Extra);
Assert.Contains("\"rowsAffected\":1", evt.Extra);
// Central stamps IngestedAtUtc; the site never sets it.
Assert.NotNull(evt.IngestedAtUtc);
Assert.StartsWith(ConnectionName, evt.Target);
}, TimeSpan.FromSeconds(15));
}
[SkippableFact]
public async Task DbWrite_Select_Emits_OneCentralRow_WithExtraOpRead_AndRowsReturned()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
CreateTelemetryActor(sqliteWriter, stubClient);
using var keepAlive = new SqliteConnection("Data Source=k2;Mode=Memory;Cache=Shared");
var inner = NewInMemoryDb(out _);
var gateway = Substitute.For();
gateway.GetConnectionAsync(ConnectionName, Arg.Any())
.Returns(inner);
var helper = CreateHelper(gateway, fallback, siteId);
await using (var conn = await helper.Connection(ConnectionName))
await using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT id, name FROM t ORDER BY id";
await using var reader = await cmd.ExecuteReaderAsync();
var seen = 0;
while (await reader.ReadAsync())
{
seen++;
}
// Explicit close so the AuditingDbDataReader callback fires before
// the helper is disposed (Bundle A defers the audit emission to
// reader-close so rowsReturned is observable).
await reader.CloseAsync();
Assert.Equal(2, seen);
}
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }),
new AuditLogPaging(PageSize: 10));
var evt = Assert.Single(rows);
Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
Assert.Equal(AuditKind.DbWrite, evt.Kind);
Assert.Equal(AuditStatus.Delivered, evt.Status);
Assert.NotNull(evt.Extra);
Assert.Contains("\"op\":\"read\"", evt.Extra);
Assert.Contains("\"rowsReturned\":2", evt.Extra);
Assert.NotNull(evt.IngestedAtUtc);
}, TimeSpan.FromSeconds(15));
}
}