using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using ScadaLink.AuditLog.Tests.Integration.Infrastructure; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.AuditLog.Tests.Integration; /// /// Bundle G G2 end-to-end suite for cached ExternalSystem.CachedCall /// lifecycle telemetry (Audit Log #23 / M3). Wires the full M3 pipeline: /// site-local SQLite audit writer + operation tracking store + the production /// + the test-side /// that ALSO pushes each combined /// packet through the stub gRPC client into the central /// AuditLogIngestActor's dual-write transaction against a per-test /// MSSQL database. Asserts the audit rows + the SiteCalls row + the /// site-local tracking row converge to the expected shape for each lifecycle. /// /// /// /// The bridge is driven directly via /// — these tests do NOT spin up the actual S&F retry loop; that would /// require a full SiteRuntime host and is out of scope for M3 (the S&F /// observer hooks are exercised in ScadaLink.StoreAndForward.Tests at /// unit level). The submit row is emitted via /// because the /// production submit emission happens at the script-call site, not inside the /// S&F loop. /// /// /// Each test uses a unique SourceSite id (Guid suffix) so concurrent /// tests sharing the per-fixture MSSQL database don't interfere with each /// other. /// /// public class CachedCallCombinedTelemetryTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public CachedCallCombinedTelemetryTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private static string NewSiteId() => "test-g2-cached-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static CachedCallTelemetry SubmitPacket( TrackedOperationId id, string siteId, DateTime nowUtc, string target = "ERP.GetOrder") => new( Audit: new AuditEvent { EventId = Guid.NewGuid(), OccurredAtUtc = nowUtc, Channel = AuditChannel.ApiOutbound, Kind = AuditKind.CachedSubmit, CorrelationId = id.Value, SourceSiteId = siteId, SourceInstanceId = "Plant.Pump42", SourceScript = "ScriptActor:doStuff", Target = target, Status = AuditStatus.Submitted, ForwardState = AuditForwardState.Pending, }, Operational: new SiteCallOperational( TrackedOperationId: id, Channel: "ApiOutbound", Target: target, SourceSite: siteId, Status: "Submitted", RetryCount: 0, LastError: null, HttpStatus: null, CreatedAtUtc: nowUtc, UpdatedAtUtc: nowUtc, TerminalAtUtc: null)); private static CachedCallAttemptContext AttemptContext( TrackedOperationId id, string siteId, CachedCallAttemptOutcome outcome, int retryCount, string? lastError, int? httpStatus, DateTime createdUtc, DateTime occurredUtc, string target = "ERP.GetOrder", string channel = "ApiOutbound") => new( TrackedOperationId: id, Channel: channel, Target: target, SourceSite: siteId, Outcome: outcome, RetryCount: retryCount, LastError: lastError, HttpStatus: httpStatus, CreatedAtUtc: createdUtc, OccurredAtUtc: occurredUtc, DurationMs: 42, SourceInstanceId: "Plant.Pump42"); [SkippableFact] public async Task CachedCall_FailFailSuccess_Emits_5_AuditRows_AND_1_SiteCall_Delivered() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); // Submit await harness.EmitSubmitAsync(SubmitPacket(trackedId, siteId, t0)); // Attempt 1: transient HTTP 500 await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.TransientFailure, retryCount: 1, lastError: "HTTP 500", httpStatus: 500, createdUtc: t0, occurredUtc: t0.AddSeconds(5))); // Attempt 2: transient HTTP 500 await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.TransientFailure, retryCount: 2, lastError: "HTTP 500", httpStatus: 500, createdUtc: t0, occurredUtc: t0.AddSeconds(15))); // Attempt 3: delivered (terminal — emits Attempted + CachedResolve) await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.Delivered, retryCount: 3, lastError: null, httpStatus: 200, createdUtc: t0, occurredUtc: t0.AddSeconds(25))); // Central side: each forward through the dispatcher round-trips // through the stub client + ingest actor, so by the time the awaits // complete the rows are visible in MSSQL. await using var read = harness.CreateReadContext(); // 1 Submit + 2 transient Attempted + 1 terminal Attempted + 1 // CachedResolve = 5 audit rows. The plan allows 4-5; this is the // happy path emitting exactly 5. var auditRows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.InRange(auditRows.Count, 4, 5); // All audit rows must share the same CorrelationId (= TrackedOperationId). Assert.All(auditRows, r => Assert.Equal(trackedId.Value, r.CorrelationId)); // Exactly one CachedSubmit row. Assert.Single(auditRows, r => r.Kind == AuditKind.CachedSubmit); // Exactly one terminal CachedResolve row, status Delivered. var resolve = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedResolve); Assert.Equal(AuditStatus.Delivered, resolve.Status); // SiteCalls row: Delivered, RetryCount=3, TerminalAtUtc set. var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("Delivered", siteCall.Status); Assert.Equal(3, siteCall.RetryCount); Assert.NotNull(siteCall.TerminalAtUtc); // Site-local Tracking.Status mirrors the same outcome. var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); Assert.NotNull(snapshot); Assert.Equal("Delivered", snapshot!.Status); Assert.NotNull(snapshot.TerminalAtUtc); } [SkippableFact] public async Task CachedCall_AllAttemptsFailedAndParked_Emits_Terminal_Parked() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 11, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); await harness.EmitSubmitAsync(SubmitPacket(trackedId, siteId, t0)); // Three transient failures... for (int i = 1; i <= 3; i++) { await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.TransientFailure, retryCount: i, lastError: "HTTP 500", httpStatus: 500, createdUtc: t0, occurredUtc: t0.AddSeconds(i * 5))); } // ...then S&F gives up — ParkedMaxRetries. await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.ParkedMaxRetries, retryCount: 4, lastError: "HTTP 500", httpStatus: 500, createdUtc: t0, occurredUtc: t0.AddSeconds(30))); await using var read = harness.CreateReadContext(); var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("Parked", siteCall.Status); Assert.NotNull(siteCall.TerminalAtUtc); // Terminal audit row should also be Parked. var resolve = await read.Set() .Where(e => e.SourceSiteId == siteId && e.Kind == AuditKind.CachedResolve) .SingleAsync(); Assert.Equal(AuditStatus.Parked, resolve.Status); // Site-local tracking matches. var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); Assert.NotNull(snapshot); Assert.Equal("Parked", snapshot!.Status); Assert.NotNull(snapshot.TerminalAtUtc); } [SkippableFact] public async Task CachedCall_ImmediateSuccess_NoSF_Emits_Attempted_And_Resolve_Delivered() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); // Submit + immediate delivered attempt (RetryCount = 0). await harness.EmitSubmitAsync(SubmitPacket(trackedId, siteId, t0)); await harness.EmitAttemptAsync(AttemptContext( trackedId, siteId, CachedCallAttemptOutcome.Delivered, retryCount: 0, lastError: null, httpStatus: 200, createdUtc: t0, occurredUtc: t0.AddMilliseconds(50))); await using var read = harness.CreateReadContext(); var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("Delivered", siteCall.Status); Assert.Equal(0, siteCall.RetryCount); Assert.NotNull(siteCall.TerminalAtUtc); // 1 Submit + 1 Attempted + 1 CachedResolve = 3 audit rows. var auditRows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(3, auditRows.Count); Assert.Single(auditRows, r => r.Kind == AuditKind.CachedSubmit); Assert.Single(auditRows, r => r.Kind == AuditKind.ApiCallCached); var resolve = Assert.Single(auditRows, r => r.Kind == AuditKind.CachedResolve); Assert.Equal(AuditStatus.Delivered, resolve.Status); var snapshot = await harness.TrackingStore.GetStatusAsync(trackedId); Assert.NotNull(snapshot); Assert.Equal("Delivered", snapshot!.Status); } }