using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using ScadaLink.AuditLog.Tests.Integration.Infrastructure; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.AuditLog.Tests.Integration; /// /// Bundle G G3 mirror of for /// Database.CachedWrite. Same pipeline composition, same dual-write /// transaction, but the lifecycle bridge maps channel "DbOutbound" to /// on the per-attempt audit row (vs. /// for API calls). The /// on the audit row, the SiteCalls.Channel /// column, and the per-attempt all need to come /// through as the DB variants for this path to be considered exercised. /// /// /// As with G2, the bridge is driven directly via the harness — we do not /// stand up a real Database.CachedWrite caller. The site-side /// unit-level emission for the DB path is exercised in /// ScadaLink.SiteRuntime.Tests; this suite verifies the end-to-end /// combined-telemetry path produces the right central rows. /// public class CachedWriteCombinedTelemetryTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public CachedWriteCombinedTelemetryTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private static string NewSiteId() => "test-g3-cachedwrite-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static CachedCallTelemetry DbSubmitPacket( TrackedOperationId id, string siteId, DateTime nowUtc, string target = "OperationsDb.UpdateOrder") => new( Audit: new AuditEvent { EventId = Guid.NewGuid(), OccurredAtUtc = nowUtc, Channel = AuditChannel.DbOutbound, Kind = AuditKind.CachedSubmit, CorrelationId = id.Value, SourceSiteId = siteId, SourceInstanceId = "Plant.Pump42", SourceScript = "ScriptActor:doStuff", Target = target, Status = AuditStatus.Submitted, ForwardState = AuditForwardState.Pending, }, Operational: new SiteCallOperational( TrackedOperationId: id, Channel: "DbOutbound", Target: target, SourceSite: siteId, Status: "Submitted", RetryCount: 0, LastError: null, HttpStatus: null, CreatedAtUtc: nowUtc, UpdatedAtUtc: nowUtc, TerminalAtUtc: null)); private static CachedCallAttemptContext DbAttemptContext( TrackedOperationId id, string siteId, CachedCallAttemptOutcome outcome, int retryCount, string? lastError, DateTime createdUtc, DateTime occurredUtc, string target = "OperationsDb.UpdateOrder") => new( TrackedOperationId: id, Channel: "DbOutbound", Target: target, SourceSite: siteId, Outcome: outcome, RetryCount: retryCount, LastError: lastError, HttpStatus: null, CreatedAtUtc: createdUtc, OccurredAtUtc: occurredUtc, DurationMs: 12, SourceInstanceId: "Plant.Pump42"); [SkippableFact] public async Task CachedWrite_Success_Emits_Delivered_Lifecycle() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); // Submit + immediate delivered attempt. await harness.EmitSubmitAsync(DbSubmitPacket(trackedId, siteId, t0)); await harness.EmitAttemptAsync(DbAttemptContext( trackedId, siteId, CachedCallAttemptOutcome.Delivered, retryCount: 0, lastError: null, createdUtc: t0, occurredUtc: t0.AddMilliseconds(50))); await using var read = harness.CreateReadContext(); // Central SiteCalls row — DbOutbound channel, Delivered. var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("DbOutbound", siteCall.Channel); Assert.Equal("Delivered", siteCall.Status); Assert.Equal(0, siteCall.RetryCount); Assert.NotNull(siteCall.TerminalAtUtc); var auditRows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(3, auditRows.Count); // Submit row: CachedSubmit + DbOutbound channel. var submit = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedSubmit); Assert.Equal(AuditChannel.DbOutbound, submit.Channel); // Per-attempt row: DbWriteCached (NOT ApiCallCached). var attempt = Assert.Single(auditRows, r => r.Kind == AuditKind.DbWriteCached); Assert.Equal(AuditStatus.Attempted, attempt.Status); Assert.Equal(AuditChannel.DbOutbound, attempt.Channel); // Terminal: CachedResolve Delivered. var resolve = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedResolve); Assert.Equal(AuditStatus.Delivered, resolve.Status); Assert.Equal(AuditChannel.DbOutbound, resolve.Channel); // Site-local tracking row mirrors the same outcome. var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); Assert.NotNull(snapshot); Assert.Equal("Delivered", snapshot!.Status); } [SkippableFact] public async Task CachedWrite_Parked_Emits_Terminal_Parked() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 14, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); await harness.EmitSubmitAsync(DbSubmitPacket(trackedId, siteId, t0)); // Two transient SQL-error attempts... for (int i = 1; i <= 2; i++) { await harness.EmitAttemptAsync(DbAttemptContext( trackedId, siteId, CachedCallAttemptOutcome.TransientFailure, retryCount: i, lastError: "Deadlock victim", createdUtc: t0, occurredUtc: t0.AddSeconds(i * 5))); } // ...then permanent failure → Parked terminal. await harness.EmitAttemptAsync(DbAttemptContext( trackedId, siteId, CachedCallAttemptOutcome.PermanentFailure, retryCount: 3, lastError: "ConstraintViolation: FK_Orders_Customer", createdUtc: t0, occurredUtc: t0.AddSeconds(20))); await using var read = harness.CreateReadContext(); var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("DbOutbound", siteCall.Channel); Assert.Equal("Parked", siteCall.Status); Assert.NotNull(siteCall.TerminalAtUtc); var resolve = await read.Set() .Where(e => e.SourceSiteId == siteId && e.Kind == AuditKind.CachedResolve) .SingleAsync(); Assert.Equal(AuditStatus.Parked, resolve.Status); Assert.Equal(AuditChannel.DbOutbound, resolve.Channel); // Tracking store mirrors Parked. var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); Assert.NotNull(snapshot); Assert.Equal("Parked", snapshot!.Status); Assert.NotNull(snapshot.TerminalAtUtc); } }