test(auditlog): DB sync emission end-to-end (#23 M4)
This commit is contained in:
@@ -0,0 +1,297 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NSubstitute;
|
||||
using ScadaLink.AuditLog.Central;
|
||||
using ScadaLink.AuditLog.Site;
|
||||
using ScadaLink.AuditLog.Site.Telemetry;
|
||||
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Types.Audit;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.ConfigurationDatabase;
|
||||
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||
using ScadaLink.SiteRuntime.Scripts;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Integration;
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log #23 — M4 Bundle E (Task E1) end-to-end suite verifying every
|
||||
/// synchronous <c>Database.Connection(name).Execute*</c> /
|
||||
/// <c>ExecuteReader</c> call made via the Bundle A
|
||||
/// <see cref="ScriptRuntimeContext.DatabaseHelper"/> emits exactly one
|
||||
/// <see cref="AuditChannel.DbOutbound"/>/<see cref="AuditKind.DbWrite"/> row
|
||||
/// that materialises in the central MSSQL <c>AuditLog</c> via the production
|
||||
/// site-SQLite + telemetry-actor + central ingest-actor pipeline.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Composes the same pipeline as the M2 <see cref="SyncCallEmissionEndToEndTests"/>:
|
||||
/// in-memory <see cref="SqliteAuditWriter"/> + <see cref="RingBufferFallback"/> +
|
||||
/// <see cref="FallbackAuditWriter"/> on the site, drained by a real
|
||||
/// <see cref="SiteAuditTelemetryActor"/> through a
|
||||
/// <see cref="DirectActorSiteStreamAuditClient"/> stub that short-circuits the
|
||||
/// gRPC wire and Asks the central <see cref="AuditLogIngestActor"/> backed by
|
||||
/// the real <see cref="AuditLogRepository"/> on the per-class
|
||||
/// <see cref="MsSqlMigrationFixture"/> MSSQL database.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Drives the AuditingDbConnection wrapper directly via
|
||||
/// <see cref="ScriptRuntimeContext.DatabaseHelper"/>'s internal ctor (the
|
||||
/// AuditLog tests project has <c>InternalsVisibleTo</c> on SiteRuntime). No
|
||||
/// script runtime, no Akka Instance Actor — the test wires the helper, opens
|
||||
/// an in-memory SQLite connection through a stub <see cref="IDatabaseGateway"/>,
|
||||
/// runs one SQL statement, and waits for the central row to land. Each test
|
||||
/// uses a unique <c>SourceSiteId</c> (Guid suffix) so concurrent tests
|
||||
/// sharing the MSSQL fixture don't interfere with each other.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class DatabaseSyncEmissionEndToEndTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||
{
|
||||
private readonly MsSqlMigrationFixture _fixture;
|
||||
|
||||
public DatabaseSyncEmissionEndToEndTests(MsSqlMigrationFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
}
|
||||
|
||||
private const string ConnectionName = "machineData";
|
||||
private const string InstanceName = "Plant.Pump42";
|
||||
private const string SourceScript = "ScriptActor:doDbWork";
|
||||
|
||||
private static string NewSiteId() =>
|
||||
"test-e1-db-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||
|
||||
private ScadaLinkDbContext CreateContext()
|
||||
{
|
||||
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||
.UseSqlServer(_fixture.ConnectionString)
|
||||
.Options;
|
||||
return new ScadaLinkDbContext(options);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Per-test in-memory SQLite database with a tiny 2-row schema we can both
|
||||
/// write to and select from. Mirrors the pattern from
|
||||
/// <c>DatabaseSyncEmissionTests</c> — the keep-alive root keeps the
|
||||
/// in-memory database file pinned for the duration of the test, while the
|
||||
/// returned <c>live</c> connection is what the stub gateway hands back to
|
||||
/// the auditing wrapper.
|
||||
/// </summary>
|
||||
private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive)
|
||||
{
|
||||
var dbName = $"db-{Guid.NewGuid():N}";
|
||||
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||
|
||||
keepAlive = new SqliteConnection(connStr);
|
||||
keepAlive.Open();
|
||||
using (var seed = keepAlive.CreateCommand())
|
||||
{
|
||||
seed.CommandText =
|
||||
"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);" +
|
||||
"INSERT INTO t (id, name) VALUES (1, 'alpha');" +
|
||||
"INSERT INTO t (id, name) VALUES (2, 'beta');";
|
||||
seed.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
var live = new SqliteConnection(connStr);
|
||||
live.Open();
|
||||
return live;
|
||||
}
|
||||
|
||||
private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
|
||||
new(
|
||||
Options.Create(new SqliteAuditWriterOptions
|
||||
{
|
||||
DatabasePath = "ignored",
|
||||
BatchSize = 64,
|
||||
ChannelCapacity = 1024,
|
||||
}),
|
||||
NullLogger<SqliteAuditWriter>.Instance,
|
||||
connectionStringOverride:
|
||||
$"Data Source=file:auditlog-e1-{Guid.NewGuid():N}?mode=memory&cache=shared");
|
||||
|
||||
private static IOptions<SiteAuditTelemetryOptions> FastTelemetryOptions() =>
|
||||
Options.Create(new SiteAuditTelemetryOptions
|
||||
{
|
||||
BatchSize = 256,
|
||||
// 1s on both intervals so the initial scheduled tick fires quickly
|
||||
// — drains the SQLite Pending row and pushes it through the stub
|
||||
// gRPC client into the central ingest actor.
|
||||
BusyIntervalSeconds = 1,
|
||||
IdleIntervalSeconds = 1,
|
||||
});
|
||||
|
||||
private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
|
||||
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
|
||||
repo,
|
||||
NullLogger<AuditLogIngestActor>.Instance)));
|
||||
|
||||
private IActorRef CreateTelemetryActor(
|
||||
ISiteAuditQueue queue,
|
||||
ISiteStreamAuditClient client) =>
|
||||
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
|
||||
queue,
|
||||
client,
|
||||
FastTelemetryOptions(),
|
||||
NullLogger<SiteAuditTelemetryActor>.Instance)));
|
||||
|
||||
/// <summary>
|
||||
/// Wires the production
|
||||
/// <see cref="ScriptRuntimeContext.DatabaseHelper"/> (internal ctor) onto
|
||||
/// the supplied <see cref="IDatabaseGateway"/> + <see cref="IAuditWriter"/>
|
||||
/// with the test's site id and source script. The returned helper's
|
||||
/// <c>Connection(...)</c> hands back a real <c>AuditingDbConnection</c>.
|
||||
/// </summary>
|
||||
private static ScriptRuntimeContext.DatabaseHelper CreateHelper(
|
||||
IDatabaseGateway gateway,
|
||||
IAuditWriter writer,
|
||||
string siteId) =>
|
||||
new(
|
||||
gateway,
|
||||
InstanceName,
|
||||
NullLogger.Instance,
|
||||
auditWriter: writer,
|
||||
siteId: siteId,
|
||||
sourceScript: SourceScript,
|
||||
cachedForwarder: null);
|
||||
|
||||
[SkippableFact]
|
||||
public async Task DbWrite_Insert_Emits_OneCentralRow_WithExtraOpWrite_AndRowsAffected()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
|
||||
// Central — repository + ingest actor backed by the MSSQL fixture.
|
||||
await using var ingestContext = CreateContext();
|
||||
var ingestRepo = new AuditLogRepository(ingestContext);
|
||||
var ingestActor = CreateIngestActor(ingestRepo);
|
||||
|
||||
// Site — SQLite audit writer + ring + fallback + telemetry actor that
|
||||
// drains into the stub gRPC client which forwards to the ingest actor.
|
||||
await using var sqliteWriter = CreateInMemorySqliteWriter();
|
||||
var ring = new RingBufferFallback();
|
||||
var fallback = new FallbackAuditWriter(
|
||||
sqliteWriter,
|
||||
ring,
|
||||
new NoOpAuditWriteFailureCounter(),
|
||||
NullLogger<FallbackAuditWriter>.Instance);
|
||||
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
|
||||
CreateTelemetryActor(sqliteWriter, stubClient);
|
||||
|
||||
// SQLite-backed inner connection — the stub gateway hands it to the
|
||||
// auditing wrapper as the DbConnection the script would have got.
|
||||
using var keepAlive = new SqliteConnection("Data Source=k1;Mode=Memory;Cache=Shared");
|
||||
var inner = NewInMemoryDb(out _);
|
||||
var gateway = Substitute.For<IDatabaseGateway>();
|
||||
gateway.GetConnectionAsync(ConnectionName, Arg.Any<CancellationToken>())
|
||||
.Returns(inner);
|
||||
|
||||
// Act — one INSERT through the auditing wrapper. The wrapper emits a
|
||||
// single DbOutbound/DbWrite event to the fallback writer; the
|
||||
// telemetry actor's next tick drains it to central.
|
||||
var helper = CreateHelper(gateway, fallback, siteId);
|
||||
await using (var conn = await helper.Connection(ConnectionName))
|
||||
await using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "INSERT INTO t (id, name) VALUES (3, 'gamma')";
|
||||
var rows = await cmd.ExecuteNonQueryAsync();
|
||||
Assert.Equal(1, rows);
|
||||
}
|
||||
|
||||
// Assert — one central row, Kind=DbWrite, Status=Delivered,
|
||||
// Extra.op="write", Extra.rowsAffected=1. 15s upper bound covers the
|
||||
// initial 1s tick + SQLite drain + actor round-trip + EF/MSSQL latency.
|
||||
await AwaitAssertAsync(async () =>
|
||||
{
|
||||
await using var readContext = CreateContext();
|
||||
var readRepo = new AuditLogRepository(readContext);
|
||||
var rows = await readRepo.QueryAsync(
|
||||
new AuditLogQueryFilter(SourceSiteId: siteId),
|
||||
new AuditLogPaging(PageSize: 10));
|
||||
var evt = Assert.Single(rows);
|
||||
Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
|
||||
Assert.Equal(AuditKind.DbWrite, evt.Kind);
|
||||
Assert.Equal(AuditStatus.Delivered, evt.Status);
|
||||
Assert.Equal(siteId, evt.SourceSiteId);
|
||||
Assert.Equal(InstanceName, evt.SourceInstanceId);
|
||||
Assert.Equal(SourceScript, evt.SourceScript);
|
||||
Assert.NotNull(evt.Extra);
|
||||
Assert.Contains("\"op\":\"write\"", evt.Extra);
|
||||
Assert.Contains("\"rowsAffected\":1", evt.Extra);
|
||||
// Central stamps IngestedAtUtc; the site never sets it.
|
||||
Assert.NotNull(evt.IngestedAtUtc);
|
||||
Assert.StartsWith(ConnectionName, evt.Target);
|
||||
}, TimeSpan.FromSeconds(15));
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task DbWrite_Select_Emits_OneCentralRow_WithExtraOpRead_AndRowsReturned()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
|
||||
await using var ingestContext = CreateContext();
|
||||
var ingestRepo = new AuditLogRepository(ingestContext);
|
||||
var ingestActor = CreateIngestActor(ingestRepo);
|
||||
|
||||
await using var sqliteWriter = CreateInMemorySqliteWriter();
|
||||
var ring = new RingBufferFallback();
|
||||
var fallback = new FallbackAuditWriter(
|
||||
sqliteWriter,
|
||||
ring,
|
||||
new NoOpAuditWriteFailureCounter(),
|
||||
NullLogger<FallbackAuditWriter>.Instance);
|
||||
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
|
||||
CreateTelemetryActor(sqliteWriter, stubClient);
|
||||
|
||||
using var keepAlive = new SqliteConnection("Data Source=k2;Mode=Memory;Cache=Shared");
|
||||
var inner = NewInMemoryDb(out _);
|
||||
var gateway = Substitute.For<IDatabaseGateway>();
|
||||
gateway.GetConnectionAsync(ConnectionName, Arg.Any<CancellationToken>())
|
||||
.Returns(inner);
|
||||
|
||||
var helper = CreateHelper(gateway, fallback, siteId);
|
||||
await using (var conn = await helper.Connection(ConnectionName))
|
||||
await using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT id, name FROM t ORDER BY id";
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
var seen = 0;
|
||||
while (await reader.ReadAsync())
|
||||
{
|
||||
seen++;
|
||||
}
|
||||
// Explicit close so the AuditingDbDataReader callback fires before
|
||||
// the helper is disposed (Bundle A defers the audit emission to
|
||||
// reader-close so rowsReturned is observable).
|
||||
await reader.CloseAsync();
|
||||
Assert.Equal(2, seen);
|
||||
}
|
||||
|
||||
await AwaitAssertAsync(async () =>
|
||||
{
|
||||
await using var readContext = CreateContext();
|
||||
var readRepo = new AuditLogRepository(readContext);
|
||||
var rows = await readRepo.QueryAsync(
|
||||
new AuditLogQueryFilter(SourceSiteId: siteId),
|
||||
new AuditLogPaging(PageSize: 10));
|
||||
var evt = Assert.Single(rows);
|
||||
Assert.Equal(AuditChannel.DbOutbound, evt.Channel);
|
||||
Assert.Equal(AuditKind.DbWrite, evt.Kind);
|
||||
Assert.Equal(AuditStatus.Delivered, evt.Status);
|
||||
Assert.NotNull(evt.Extra);
|
||||
Assert.Contains("\"op\":\"read\"", evt.Extra);
|
||||
Assert.Contains("\"rowsReturned\":2", evt.Extra);
|
||||
Assert.NotNull(evt.IngestedAtUtc);
|
||||
}, TimeSpan.FromSeconds(15));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user