test(auditlog): end-to-end ExecutionId correlation + docs

This commit is contained in:
Joseph Doherty
2026-05-21 16:06:40 -04:00
parent 24cdfe373c
commit fd76c19007
3 changed files with 298 additions and 0 deletions

View File

@@ -0,0 +1,274 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.SiteRuntime.Scripts;
namespace ScadaLink.AuditLog.Tests.Integration;
/// <summary>
/// Audit Log #23 — ExecutionId end-to-end correlation suite verifying the
/// universal per-run correlation promise: <b>every audit row produced by one
/// script execution carries the same non-null <see cref="AuditEvent.ExecutionId"/></b>.
/// </summary>
/// <remarks>
/// <para>
/// This is the integration-level counterpart to the unit-level
/// <c>ExecutionCorrelationContextTests</c> in <c>ScadaLink.SiteRuntime.Tests</c>:
/// where that test asserts the shared id on the in-memory captured rows, this
/// suite drives the rows all the way through the production pipeline — the real
/// <see cref="SqliteAuditWriter"/> site hot-path, the real
/// <see cref="SiteAuditTelemetryActor"/> drain loop, the real
/// <see cref="AuditLogIngestActor"/>, and the real <see cref="AuditLogRepository"/>
/// over the per-class <see cref="MsSqlMigrationFixture"/> MSSQL database — then
/// reads the rows back from the central store and asserts the shared id.
/// </para>
/// <para>
/// Composes the same pipeline as the M2 <see cref="SyncCallEmissionEndToEndTests"/>
/// and the M4 <see cref="DatabaseSyncEmissionEndToEndTests"/>: an in-memory
/// <see cref="SqliteAuditWriter"/> + <see cref="RingBufferFallback"/> +
/// <see cref="FallbackAuditWriter"/> on the site, drained by a real
/// <see cref="SiteAuditTelemetryActor"/> through the shared
/// <see cref="DirectActorSiteStreamAuditClient"/> stub that short-circuits the
/// gRPC wire and Asks the central ingest actor. The production
/// <see cref="ScriptRuntimeContext"/> is driven directly: one context performs
/// two distinct trust-boundary actions — a sync <c>ExternalSystem.Call</c> and a
/// sync <c>Database</c> write — so the two emitted audit rows originate from one
/// execution. Each test uses a unique <c>ExecutionId</c> + <c>SourceSiteId</c>
/// (Guid suffixes) so concurrent tests sharing the MSSQL fixture don't interfere.
/// </para>
/// </remarks>
public class ExecutionIdCorrelationTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public ExecutionIdCorrelationTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private const string ConnectionName = "machineData";
private const string InstanceName = "Plant.Pump42";
private const string SourceScript = "ScriptActor:OnTick";
private static string NewSiteId() =>
"test-execid-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
/// <summary>
/// Per-test in-memory SQLite database with a tiny single-table schema the
/// sync DB write targets. The keep-alive root pins the in-memory file for
/// the duration of the test; the returned <c>live</c> connection is what the
/// stub gateway hands back to the auditing wrapper. Mirrors
/// <c>DatabaseSyncEmissionEndToEndTests.NewInMemoryDb</c>.
/// </summary>
private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive)
{
var dbName = $"db-{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
keepAlive = new SqliteConnection(connStr);
keepAlive.Open();
using (var seed = keepAlive.CreateCommand())
{
seed.CommandText =
"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);";
seed.ExecuteNonQuery();
}
var live = new SqliteConnection(connStr);
live.Open();
return live;
}
private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
new(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
}),
NullLogger<SqliteAuditWriter>.Instance,
connectionStringOverride:
$"Data Source=file:auditlog-execid-{Guid.NewGuid():N}?mode=memory&cache=shared");
private static IOptions<SiteAuditTelemetryOptions> FastTelemetryOptions() =>
Options.Create(new SiteAuditTelemetryOptions
{
BatchSize = 256,
// 1s on both intervals so the initial scheduled tick fires quickly
// — drains the SQLite Pending rows and pushes them through the stub
// gRPC client into the central ingest actor.
BusyIntervalSeconds = 1,
IdleIntervalSeconds = 1,
});
private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
repo,
NullLogger<AuditLogIngestActor>.Instance)));
private IActorRef CreateTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client) =>
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
queue,
client,
FastTelemetryOptions(),
NullLogger<SiteAuditTelemetryActor>.Instance)));
[SkippableFact]
public async Task OneExecution_ApiCallAndDbWrite_AllCentralRows_ShareOneNonNullExecutionId()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
// An explicit per-run execution id — the value the test asserts on every
// audit row produced by the single script execution below.
var executionId = Guid.NewGuid();
// ── Central — repository + ingest actor backed by the MSSQL fixture ──
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
// ── Site — SQLite audit writer + ring + fallback + telemetry actor ───
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
CreateTelemetryActor(sqliteWriter, stubClient);
// Outbound API client — one successful CallAsync, one audit row.
var externalClient = Substitute.For<IExternalSystemClient>();
externalClient
.CallAsync("ERP", "GetOrder", Arg.Any<IReadOnlyDictionary<string, object?>?>(), Arg.Any<CancellationToken>())
.Returns(new ExternalCallResult(true, "{}", null));
// SQLite-backed inner DB connection — the stub gateway hands it to the
// auditing wrapper as the connection the script would have got.
using var keepAlive = new SqliteConnection("Data Source=execid-k1;Mode=Memory;Cache=Shared");
var innerDb = NewInMemoryDb(out _);
var gateway = Substitute.For<IDatabaseGateway>();
gateway.GetConnectionAsync(ConnectionName, Arg.Any<CancellationToken>())
.Returns(innerDb);
// ── Act — ONE script execution: a sync ExternalSystem.Call AND a sync
// Database write, both performed through a SINGLE ScriptRuntimeContext
// stamped with the explicit executionId. Each helper emits exactly one
// trust-boundary audit row to the fallback writer; the telemetry actor's
// next tick drains both to central.
var context = CreateScriptContext(externalClient, gateway, fallback, siteId, executionId);
await context.ExternalSystem.Call("ERP", "GetOrder");
await using (var conn = await context.Database.Connection(ConnectionName))
await using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "INSERT INTO t (id, name) VALUES (1, 'alpha')";
var affected = await cmd.ExecuteNonQueryAsync();
Assert.Equal(1, affected);
}
// ── Assert — read the rows back from the CENTRAL store filtered by the
// execution id; both the ApiCall and the DbWrite row must be present and
// every one must carry the SAME non-null ExecutionId we minted above.
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
// The ExecutionId filter dimension is the universal-correlation
// query an audit reader uses to pull every action of one run.
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(ExecutionId: executionId),
new AuditLogPaging(PageSize: 10));
// Both trust-boundary actions of the one execution have landed.
Assert.Equal(2, rows.Count);
// Every central row carries the SAME non-null ExecutionId — the
// core promise of the per-run correlation value.
Assert.All(rows, r =>
{
Assert.NotNull(r.ExecutionId);
Assert.Equal(executionId, r.ExecutionId);
Assert.Equal(siteId, r.SourceSiteId);
// Central stamps IngestedAtUtc; the site never sets it.
Assert.NotNull(r.IngestedAtUtc);
});
// The two rows are the two distinct trust-boundary actions — one
// outbound API call and one outbound DB write — proving the shared
// id spans different channels, not two rows of the same action.
Assert.Single(rows, r => r.Channel == AuditChannel.ApiOutbound && r.Kind == AuditKind.ApiCall);
Assert.Single(rows, r => r.Channel == AuditChannel.DbOutbound && r.Kind == AuditKind.DbWrite);
}, TimeSpan.FromSeconds(15));
}
/// <summary>
/// Builds a production <see cref="ScriptRuntimeContext"/> wired with the
/// outbound external-system client, the database gateway and the audit
/// writer, stamped with an explicit <paramref name="executionId"/>. The
/// actor refs are <see cref="ActorRefs.Nobody"/> — the ExternalSystem /
/// Database helpers exercised here never touch them.
/// </summary>
private static ScriptRuntimeContext CreateScriptContext(
IExternalSystemClient externalSystemClient,
IDatabaseGateway databaseGateway,
IAuditWriter auditWriter,
string siteId,
Guid executionId)
{
var compilationService = new ScriptCompilationService(
NullLogger<ScriptCompilationService>.Instance);
var sharedScriptLibrary = new SharedScriptLibrary(
compilationService, NullLogger<SharedScriptLibrary>.Instance);
return new ScriptRuntimeContext(
ActorRefs.Nobody,
ActorRefs.Nobody,
sharedScriptLibrary,
currentCallDepth: 0,
maxCallDepth: 10,
askTimeout: TimeSpan.FromSeconds(5),
instanceName: InstanceName,
logger: NullLogger.Instance,
externalSystemClient: externalSystemClient,
databaseGateway: databaseGateway,
storeAndForward: null,
siteCommunicationActor: null,
siteId: siteId,
sourceScript: SourceScript,
auditWriter: auditWriter,
operationTrackingStore: null,
cachedForwarder: null,
executionId: executionId);
}
}