From fd76c19007eb13bdd937d4d49f42e35587e7fcb1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 16:06:40 -0400 Subject: [PATCH] test(auditlog): end-to-end ExecutionId correlation + docs --- CLAUDE.md | 1 + docs/requirements/Component-AuditLog.md | 23 ++ .../ExecutionIdCorrelationTests.cs | 274 ++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 tests/ScadaLink.AuditLog.Tests/Integration/ExecutionIdCorrelationTests.cs diff --git a/CLAUDE.md b/CLAUDE.md index 6e22243..f60e607 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -132,6 +132,7 @@ This project contains design documentation for a distributed SCADA system built - Layered design — append-only `AuditLog` (#23) sits alongside operational `Notifications` (#21) and `SiteCalls` (#22), not replacing them. - Scope = script trust boundary: outbound API (sync + cached), outbound DB (sync + cached), notifications, inbound API. Framework/internal traffic is explicitly excluded. - One row per lifecycle event; cached calls produce 4+ rows per operation (`Submitted`, `Forwarded`, `Attempted`, `Delivered`/`Parked`/`Discarded`). +- `ExecutionId` (`uniqueidentifier NULL`) is the universal per-run correlation value — every audit row emitted by one script execution / inbound request shares it; `CorrelationId` remains the per-operation lifecycle id (NULL for sync one-shots). - Site SQLite hot-path first, then gRPC telemetry to central; ingest is idempotent on `EventId`; periodic reconciliation pull as fallback when telemetry is lost. - Cached operations: site emits a single additively-extended `CachedCallTelemetry` packet carrying both audit events and operational state; central writes `AuditLog` + `SiteCalls` in one transaction. - Payload cap 8 KB by default / 64 KB on error rows; auth headers redacted by default; SQL parameter values captured by default; per-target redaction opt-in. diff --git a/docs/requirements/Component-AuditLog.md b/docs/requirements/Component-AuditLog.md index e3f6ed5..b8627d8 100644 --- a/docs/requirements/Component-AuditLog.md +++ b/docs/requirements/Component-AuditLog.md @@ -83,6 +83,7 @@ row per lifecycle event across all channels. | `Channel` | `varchar(32)` | `ApiOutbound` \| `DbOutbound` \| `Notification` \| `ApiInbound`. | | `Kind` | `varchar(32)` | Event kind discriminator (see kinds list below). | | `CorrelationId` | `uniqueidentifier` NULL | Ties multi-event operations together. `TrackedOperationId` for cached calls, `NotificationId` for notifications, request-id for inbound API. NULL for sync one-shot calls. | +| `ExecutionId` | `uniqueidentifier` NULL | The originating script execution / inbound request — the universal per-run correlation value; distinct from `CorrelationId`, which is the per-operation lifecycle id. Stamped on *every* audit row emitted by one execution. | | `SourceSiteId` | `varchar(64)` NULL | NULL for central-originated events. | | `SourceInstanceId` | `varchar(128)` NULL | Instance whose script initiated the action (when applicable). | | `SourceScript` | `varchar(128)` NULL | Script name within the instance. | @@ -103,6 +104,7 @@ row per lifecycle event across all channels. - `IX_AuditLog_OccurredAtUtc` — primary time-range index for global scans. - `IX_AuditLog_Site_Occurred (SourceSiteId, OccurredAtUtc)` — per-site filters. - `IX_AuditLog_Correlation (CorrelationId)` — drilldown from a single operation. +- `IX_AuditLog_Execution (ExecutionId)` — drilldown to every action of one script execution / inbound request. - `IX_AuditLog_Channel_Status_Occurred (Channel, Status, OccurredAtUtc)` — KPI / dashboard tiles. - `IX_AuditLog_Target_Occurred (Target, OccurredAtUtc)` — "what did we send to system X". - Monthly partitioning on `OccurredAtUtc` from day one; purge is a partition switch (see Retention & Purge). @@ -126,6 +128,27 @@ Inbound API is intentionally collapsed to a single `InboundRequest` (or `InboundAuthFailure` for auth rejections) row per request rather than a multi-event lifecycle. +### `ExecutionId` vs `CorrelationId` + +The table carries two correlation columns at different granularities: + +- **`ExecutionId`** is the *universal per-run* value: one id per script + execution (tag-change / timer-triggered or otherwise) or per inbound API + request. It is stamped on **every** audit row that run produces — the sync + `ApiCall` and `DbWrite` rows, the full cached-call lifecycle, the + `NotifySend` / `NotifyDeliver` rows, and the inbound row alike. A run that + performs no trust-boundary action emits no rows, but any run that emits + multiple rows ties them all together under one `ExecutionId`. This lets an + audit reader pull the complete trust-boundary footprint of a single script + run with one `ExecutionId` filter. +- **`CorrelationId`** is the *per-operation lifecycle* id — it groups the + multiple events of one long-running operation (`TrackedOperationId` for a + cached call, `NotificationId` for a notification, request-id for inbound + API) and is NULL for sync one-shot calls that have no operation lifecycle. + +The two are orthogonal: one execution may touch several operations (each with +its own `CorrelationId`) yet every resulting row shares the one `ExecutionId`. + ## The Site-Local `AuditLog` (SQLite) A SQLite database file on each site node, alongside the Store-and-Forward diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/ExecutionIdCorrelationTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/ExecutionIdCorrelationTests.cs new file mode 100644 index 0000000..6ca8b77 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/ExecutionIdCorrelationTests.cs @@ -0,0 +1,274 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Data.Sqlite; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.AuditLog.Central; +using ScadaLink.AuditLog.Site; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.SiteRuntime.Scripts; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Audit Log #23 — ExecutionId end-to-end correlation suite verifying the +/// universal per-run correlation promise: every audit row produced by one +/// script execution carries the same non-null . +/// +/// +/// +/// This is the integration-level counterpart to the unit-level +/// ExecutionCorrelationContextTests in ScadaLink.SiteRuntime.Tests: +/// where that test asserts the shared id on the in-memory captured rows, this +/// suite drives the rows all the way through the production pipeline — the real +/// site hot-path, the real +/// drain loop, the real +/// , and the real +/// over the per-class MSSQL database — then +/// reads the rows back from the central store and asserts the shared id. +/// +/// +/// Composes the same pipeline as the M2 +/// and the M4 : an in-memory +/// + + +/// on the site, drained by a real +/// through the shared +/// stub that short-circuits the +/// gRPC wire and Asks the central ingest actor. The production +/// is driven directly: one context performs +/// two distinct trust-boundary actions — a sync ExternalSystem.Call and a +/// sync Database write — so the two emitted audit rows originate from one +/// execution. Each test uses a unique ExecutionId + SourceSiteId +/// (Guid suffixes) so concurrent tests sharing the MSSQL fixture don't interfere. +/// +/// +public class ExecutionIdCorrelationTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public ExecutionIdCorrelationTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private const string ConnectionName = "machineData"; + private const string InstanceName = "Plant.Pump42"; + private const string SourceScript = "ScriptActor:OnTick"; + + private static string NewSiteId() => + "test-execid-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + /// + /// Per-test in-memory SQLite database with a tiny single-table schema the + /// sync DB write targets. The keep-alive root pins the in-memory file for + /// the duration of the test; the returned live connection is what the + /// stub gateway hands back to the auditing wrapper. Mirrors + /// DatabaseSyncEmissionEndToEndTests.NewInMemoryDb. + /// + private static SqliteConnection NewInMemoryDb(out SqliteConnection keepAlive) + { + var dbName = $"db-{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + + keepAlive = new SqliteConnection(connStr); + keepAlive.Open(); + using (var seed = keepAlive.CreateCommand()) + { + seed.CommandText = + "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);"; + seed.ExecuteNonQuery(); + } + + var live = new SqliteConnection(connStr); + live.Open(); + return live; + } + + private static SqliteAuditWriter CreateInMemorySqliteWriter() => + new( + Options.Create(new SqliteAuditWriterOptions + { + DatabasePath = "ignored", + BatchSize = 64, + ChannelCapacity = 1024, + }), + NullLogger.Instance, + connectionStringOverride: + $"Data Source=file:auditlog-execid-{Guid.NewGuid():N}?mode=memory&cache=shared"); + + private static IOptions FastTelemetryOptions() => + Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = 256, + // 1s on both intervals so the initial scheduled tick fires quickly + // — drains the SQLite Pending rows and pushes them through the stub + // gRPC client into the central ingest actor. + BusyIntervalSeconds = 1, + IdleIntervalSeconds = 1, + }); + + private IActorRef CreateIngestActor(IAuditLogRepository repo) => + Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + repo, + NullLogger.Instance))); + + private IActorRef CreateTelemetryActor( + ISiteAuditQueue queue, + ISiteStreamAuditClient client) => + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + queue, + client, + FastTelemetryOptions(), + NullLogger.Instance))); + + [SkippableFact] + public async Task OneExecution_ApiCallAndDbWrite_AllCentralRows_ShareOneNonNullExecutionId() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + // An explicit per-run execution id — the value the test asserts on every + // audit row produced by the single script execution below. + var executionId = Guid.NewGuid(); + + // ── Central — repository + ingest actor backed by the MSSQL fixture ── + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + // ── Site — SQLite audit writer + ring + fallback + telemetry actor ─── + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + CreateTelemetryActor(sqliteWriter, stubClient); + + // Outbound API client — one successful CallAsync, one audit row. + var externalClient = Substitute.For(); + externalClient + .CallAsync("ERP", "GetOrder", Arg.Any?>(), Arg.Any()) + .Returns(new ExternalCallResult(true, "{}", null)); + + // SQLite-backed inner DB connection — the stub gateway hands it to the + // auditing wrapper as the connection the script would have got. + using var keepAlive = new SqliteConnection("Data Source=execid-k1;Mode=Memory;Cache=Shared"); + var innerDb = NewInMemoryDb(out _); + var gateway = Substitute.For(); + gateway.GetConnectionAsync(ConnectionName, Arg.Any()) + .Returns(innerDb); + + // ── Act — ONE script execution: a sync ExternalSystem.Call AND a sync + // Database write, both performed through a SINGLE ScriptRuntimeContext + // stamped with the explicit executionId. Each helper emits exactly one + // trust-boundary audit row to the fallback writer; the telemetry actor's + // next tick drains both to central. + var context = CreateScriptContext(externalClient, gateway, fallback, siteId, executionId); + + await context.ExternalSystem.Call("ERP", "GetOrder"); + + await using (var conn = await context.Database.Connection(ConnectionName)) + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "INSERT INTO t (id, name) VALUES (1, 'alpha')"; + var affected = await cmd.ExecuteNonQueryAsync(); + Assert.Equal(1, affected); + } + + // ── Assert — read the rows back from the CENTRAL store filtered by the + // execution id; both the ApiCall and the DbWrite row must be present and + // every one must carry the SAME non-null ExecutionId we minted above. + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + + // The ExecutionId filter dimension is the universal-correlation + // query an audit reader uses to pull every action of one run. + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(ExecutionId: executionId), + new AuditLogPaging(PageSize: 10)); + + // Both trust-boundary actions of the one execution have landed. + Assert.Equal(2, rows.Count); + + // Every central row carries the SAME non-null ExecutionId — the + // core promise of the per-run correlation value. + Assert.All(rows, r => + { + Assert.NotNull(r.ExecutionId); + Assert.Equal(executionId, r.ExecutionId); + Assert.Equal(siteId, r.SourceSiteId); + // Central stamps IngestedAtUtc; the site never sets it. + Assert.NotNull(r.IngestedAtUtc); + }); + + // The two rows are the two distinct trust-boundary actions — one + // outbound API call and one outbound DB write — proving the shared + // id spans different channels, not two rows of the same action. + Assert.Single(rows, r => r.Channel == AuditChannel.ApiOutbound && r.Kind == AuditKind.ApiCall); + Assert.Single(rows, r => r.Channel == AuditChannel.DbOutbound && r.Kind == AuditKind.DbWrite); + }, TimeSpan.FromSeconds(15)); + } + + /// + /// Builds a production wired with the + /// outbound external-system client, the database gateway and the audit + /// writer, stamped with an explicit . The + /// actor refs are — the ExternalSystem / + /// Database helpers exercised here never touch them. + /// + private static ScriptRuntimeContext CreateScriptContext( + IExternalSystemClient externalSystemClient, + IDatabaseGateway databaseGateway, + IAuditWriter auditWriter, + string siteId, + Guid executionId) + { + var compilationService = new ScriptCompilationService( + NullLogger.Instance); + var sharedScriptLibrary = new SharedScriptLibrary( + compilationService, NullLogger.Instance); + + return new ScriptRuntimeContext( + ActorRefs.Nobody, + ActorRefs.Nobody, + sharedScriptLibrary, + currentCallDepth: 0, + maxCallDepth: 10, + askTimeout: TimeSpan.FromSeconds(5), + instanceName: InstanceName, + logger: NullLogger.Instance, + externalSystemClient: externalSystemClient, + databaseGateway: databaseGateway, + storeAndForward: null, + siteCommunicationActor: null, + siteId: siteId, + sourceScript: SourceScript, + auditWriter: auditWriter, + operationTrackingStore: null, + cachedForwarder: null, + executionId: executionId); + } +}