From f063b356331efda139e2f26ffd1d245de44ef961 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 15:26:04 -0400 Subject: [PATCH] test(auditlog): cached DB write combined telemetry end-to-end (#23 M3) --- .../CachedWriteCombinedTelemetryTests.cs | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs new file mode 100644 index 0000000..b67488e --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs @@ -0,0 +1,196 @@ +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Bundle G G3 mirror of for +/// Database.CachedWrite. Same pipeline composition, same dual-write +/// transaction, but the lifecycle bridge maps channel "DbOutbound" to +/// on the per-attempt audit row (vs. +/// for API calls). The +/// on the audit row, the SiteCalls.Channel +/// column, and the per-attempt all need to come +/// through as the DB variants for this path to be considered exercised. +/// +/// +/// As with G2, the bridge is driven directly via the harness — we do not +/// stand up a real Database.CachedWrite caller. The site-side +/// unit-level emission for the DB path is exercised in +/// ScadaLink.SiteRuntime.Tests; this suite verifies the end-to-end +/// combined-telemetry path produces the right central rows. +/// +public class CachedWriteCombinedTelemetryTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public CachedWriteCombinedTelemetryTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private static string NewSiteId() => + "test-g3-cachedwrite-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static CachedCallTelemetry DbSubmitPacket( + TrackedOperationId id, string siteId, DateTime nowUtc, string target = "OperationsDb.UpdateOrder") => + new( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = nowUtc, + Channel = AuditChannel.DbOutbound, + Kind = AuditKind.CachedSubmit, + CorrelationId = id.Value, + SourceSiteId = siteId, + SourceInstanceId = "Plant.Pump42", + SourceScript = "ScriptActor:doStuff", + Target = target, + Status = AuditStatus.Submitted, + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: id, + Channel: "DbOutbound", + Target: target, + SourceSite: siteId, + Status: "Submitted", + RetryCount: 0, + LastError: null, + HttpStatus: null, + CreatedAtUtc: nowUtc, + UpdatedAtUtc: nowUtc, + TerminalAtUtc: null)); + + private static CachedCallAttemptContext DbAttemptContext( + TrackedOperationId id, + string siteId, + CachedCallAttemptOutcome outcome, + int retryCount, + string? lastError, + DateTime createdUtc, + DateTime occurredUtc, + string target = "OperationsDb.UpdateOrder") => + new( + TrackedOperationId: id, + Channel: "DbOutbound", + Target: target, + SourceSite: siteId, + Outcome: outcome, + RetryCount: retryCount, + LastError: lastError, + HttpStatus: null, + CreatedAtUtc: createdUtc, + OccurredAtUtc: occurredUtc, + DurationMs: 12, + SourceInstanceId: "Plant.Pump42"); + + [SkippableFact] + public async Task CachedWrite_Success_Emits_Delivered_Lifecycle() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc); + + await using var harness = new CombinedTelemetryHarness(_fixture, this); + + // Submit + immediate delivered attempt. + await harness.EmitSubmitAsync(DbSubmitPacket(trackedId, siteId, t0)); + await harness.EmitAttemptAsync(DbAttemptContext( + trackedId, siteId, + CachedCallAttemptOutcome.Delivered, + retryCount: 0, lastError: null, + createdUtc: t0, occurredUtc: t0.AddMilliseconds(50))); + + await using var read = harness.CreateReadContext(); + + // Central SiteCalls row — DbOutbound channel, Delivered. + var siteCall = await read.Set() + .SingleAsync(s => s.TrackedOperationId == trackedId); + Assert.Equal("DbOutbound", siteCall.Channel); + Assert.Equal("Delivered", siteCall.Status); + Assert.Equal(0, siteCall.RetryCount); + Assert.NotNull(siteCall.TerminalAtUtc); + + var auditRows = await read.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(3, auditRows.Count); + // Submit row: CachedSubmit + DbOutbound channel. + var submit = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedSubmit); + Assert.Equal(AuditChannel.DbOutbound, submit.Channel); + // Per-attempt row: DbWriteCached (NOT ApiCallCached). + var attempt = Assert.Single(auditRows, r => r.Kind == AuditKind.DbWriteCached); + Assert.Equal(AuditStatus.Attempted, attempt.Status); + Assert.Equal(AuditChannel.DbOutbound, attempt.Channel); + // Terminal: CachedResolve Delivered. + var resolve = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedResolve); + Assert.Equal(AuditStatus.Delivered, resolve.Status); + Assert.Equal(AuditChannel.DbOutbound, resolve.Channel); + + // Site-local tracking row mirrors the same outcome. + var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); + Assert.NotNull(snapshot); + Assert.Equal("Delivered", snapshot!.Status); + } + + [SkippableFact] + public async Task CachedWrite_Parked_Emits_Terminal_Parked() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + var t0 = new DateTime(2026, 5, 20, 14, 0, 0, DateTimeKind.Utc); + + await using var harness = new CombinedTelemetryHarness(_fixture, this); + + await harness.EmitSubmitAsync(DbSubmitPacket(trackedId, siteId, t0)); + + // Two transient SQL-error attempts... + for (int i = 1; i <= 2; i++) + { + await harness.EmitAttemptAsync(DbAttemptContext( + trackedId, siteId, + CachedCallAttemptOutcome.TransientFailure, + retryCount: i, lastError: "Deadlock victim", + createdUtc: t0, occurredUtc: t0.AddSeconds(i * 5))); + } + + // ...then permanent failure → Parked terminal. + await harness.EmitAttemptAsync(DbAttemptContext( + trackedId, siteId, + CachedCallAttemptOutcome.PermanentFailure, + retryCount: 3, lastError: "ConstraintViolation: FK_Orders_Customer", + createdUtc: t0, occurredUtc: t0.AddSeconds(20))); + + await using var read = harness.CreateReadContext(); + + var siteCall = await read.Set() + .SingleAsync(s => s.TrackedOperationId == trackedId); + Assert.Equal("DbOutbound", siteCall.Channel); + Assert.Equal("Parked", siteCall.Status); + Assert.NotNull(siteCall.TerminalAtUtc); + + var resolve = await read.Set() + .Where(e => e.SourceSiteId == siteId && e.Kind == AuditKind.CachedResolve) + .SingleAsync(); + Assert.Equal(AuditStatus.Parked, resolve.Status); + Assert.Equal(AuditChannel.DbOutbound, resolve.Channel); + + // Tracking store mirrors Parked. + var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); + Assert.NotNull(snapshot); + Assert.Equal("Parked", snapshot!.Status); + Assert.NotNull(snapshot.TerminalAtUtc); + } +}