diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs new file mode 100644 index 0000000..1d67ed7 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs @@ -0,0 +1,202 @@ +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; +using ScadaLink.AuditLog.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.Communication.Grpc; +using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Bundle G G4 idempotency suite. Telemetry packets MUST round-trip safely +/// under retried delivery (at-least-once site→central) AND under out-of-order +/// arrival (a stale Submit packet arriving after the central row has already +/// advanced to Attempted must not regress the SiteCalls status, but must +/// still insert its own audit row because audit rows are append-only and the +/// lifecycle history is the source of truth for forensics). +/// +/// +/// Pushes packets directly through the +/// stub client (bypassing the local SQLite writer + tracking store) — the +/// scenario being modeled is a wire-level retry, not a fresh site call, so +/// the local stores' insert/no-op behaviour is already covered by the G2/G3 +/// happy-path tests. This suite focuses on the central ingest actor's +/// dual-write transaction's idempotency contract. +/// +public class CombinedTelemetryIdempotencyTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public CombinedTelemetryIdempotencyTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private static string NewSiteId() => + "test-g4-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static CachedTelemetryPacket BuildPacket( + Guid eventId, + TrackedOperationId trackedId, + string siteId, + AuditKind kind, + AuditStatus auditStatus, + string operationalStatus, + int retryCount, + DateTime nowUtc, + DateTime? terminalUtc = null, + string? lastError = null, + int? httpStatus = null) + { + var dto = new CachedTelemetryPacket + { + AuditEvent = AuditEventMapper.ToDto(new AuditEvent + { + EventId = eventId, + OccurredAtUtc = nowUtc, + Channel = AuditChannel.ApiOutbound, + Kind = kind, + CorrelationId = trackedId.Value, + SourceSiteId = siteId, + Target = "ERP.GetOrder", + Status = auditStatus, + HttpStatus = httpStatus, + ErrorMessage = lastError, + ForwardState = AuditForwardState.Pending, + }), + Operational = new SiteCallOperationalDto + { + TrackedOperationId = trackedId.Value.ToString("D"), + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = siteId, + Status = operationalStatus, + RetryCount = retryCount, + LastError = lastError ?? string.Empty, + CreatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)), + UpdatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)), + }, + }; + if (httpStatus.HasValue) + { + dto.Operational.HttpStatus = httpStatus.Value; + } + if (terminalUtc.HasValue) + { + dto.Operational.TerminalAtUtc = + Timestamp.FromDateTime(DateTime.SpecifyKind(terminalUtc.Value, DateTimeKind.Utc)); + } + return dto; + } + + private static CachedTelemetryBatch BatchOf(params CachedTelemetryPacket[] packets) + { + var batch = new CachedTelemetryBatch(); + batch.Packets.AddRange(packets); + return batch; + } + + [SkippableFact] + public async Task DuplicatePacket_AuditLogStaysAtOneRow_SiteCallsUpserted_Monotonically() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + var eventId = Guid.NewGuid(); + var t0 = new DateTime(2026, 5, 20, 15, 0, 0, DateTimeKind.Utc); + + await using var harness = new CombinedTelemetryHarness(_fixture, this); + + var packet = BuildPacket( + eventId, trackedId, siteId, + AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted", + retryCount: 0, nowUtc: t0); + + // First delivery + var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None); + Assert.Single(ack1.AcceptedEventIds); + + // Second delivery — the exact same packet (simulates a retried gRPC). + var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None); + // Central acks both deliveries because storage state is consistent — + // the site is free to treat its local row as Forwarded either way. + Assert.Single(ack2.AcceptedEventIds); + Assert.Equal(eventId.ToString(), ack2.AcceptedEventIds[0]); + + await using var read = harness.CreateReadContext(); + + // AuditLog: exactly ONE row for the EventId (insert-if-not-exists). + var auditCount = await read.Set() + .CountAsync(e => e.EventId == eventId); + Assert.Equal(1, auditCount); + + // SiteCalls: exactly ONE row for the TrackedOperationId. + var siteCalls = await read.Set() + .Where(s => s.TrackedOperationId == trackedId) + .ToListAsync(); + Assert.Single(siteCalls); + Assert.Equal("Submitted", siteCalls[0].Status); + Assert.Equal(0, siteCalls[0].RetryCount); + } + + [SkippableFact] + public async Task OutOfOrderPackets_OlderStatus_ArrivesAfterNewer_IsNoOp() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + var t0 = new DateTime(2026, 5, 20, 16, 0, 0, DateTimeKind.Utc); + + await using var harness = new CombinedTelemetryHarness(_fixture, this); + + // First: the Attempted (RetryCount=2) row arrives at central — perhaps + // the Submit packet got delayed in flight. SiteCalls advances straight + // to Attempted with retry count 2. + var attemptedEventId = Guid.NewGuid(); + var attemptedPacket = BuildPacket( + attemptedEventId, trackedId, siteId, + AuditKind.ApiCallCached, AuditStatus.Attempted, "Attempted", + retryCount: 2, nowUtc: t0.AddSeconds(10), + lastError: "HTTP 500", httpStatus: 500); + var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(attemptedPacket), CancellationToken.None); + Assert.Single(ack1.AcceptedEventIds); + + // Now the stale Submit packet shows up. The audit row should still be + // inserted (audit is append-only — preserve the lifecycle history), + // but SiteCalls must NOT regress to Submitted/RetryCount=0. + var submitEventId = Guid.NewGuid(); + var submitPacket = BuildPacket( + submitEventId, trackedId, siteId, + AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted", + retryCount: 0, nowUtc: t0); + var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(submitPacket), CancellationToken.None); + Assert.Single(ack2.AcceptedEventIds); + + await using var read = harness.CreateReadContext(); + + // AuditLog: TWO rows now exist for this lifecycle — the Submit and + // the Attempted. Their order is by OccurredAtUtc; the test doesn't + // assert ordering, only count + correlation. + var auditRows = await read.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(2, auditRows.Count); + Assert.All(auditRows, r => Assert.Equal(trackedId.Value, r.CorrelationId)); + + // SiteCalls: stuck at Attempted (monotonic — Submitted is rank 0, + // Attempted is rank 2, the upsert for the older row is a no-op). + var siteCall = await read.Set() + .SingleAsync(s => s.TrackedOperationId == trackedId); + Assert.Equal("Attempted", siteCall.Status); + Assert.Equal(2, siteCall.RetryCount); + Assert.Equal("HTTP 500", siteCall.LastError); + Assert.Equal(500, siteCall.HttpStatus); + } +}