using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using ScadaLink.AuditLog.Tests.Integration.Infrastructure; using ScadaLink.AuditLog.Telemetry; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase.Tests.Migrations; using ScadaLink.Communication.Grpc; using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp; namespace ScadaLink.AuditLog.Tests.Integration; /// /// Bundle G G4 idempotency suite. Telemetry packets MUST round-trip safely /// under retried delivery (at-least-once site→central) AND under out-of-order /// arrival (a stale Submit packet arriving after the central row has already /// advanced to Attempted must not regress the SiteCalls status, but must /// still insert its own audit row because audit rows are append-only and the /// lifecycle history is the source of truth for forensics). /// /// /// Pushes packets directly through the /// stub client (bypassing the local SQLite writer + tracking store) — the /// scenario being modeled is a wire-level retry, not a fresh site call, so /// the local stores' insert/no-op behaviour is already covered by the G2/G3 /// happy-path tests. This suite focuses on the central ingest actor's /// dual-write transaction's idempotency contract. /// public class CombinedTelemetryIdempotencyTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public CombinedTelemetryIdempotencyTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private static string NewSiteId() => "test-g4-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8); private static CachedTelemetryPacket BuildPacket( Guid eventId, TrackedOperationId trackedId, string siteId, AuditKind kind, AuditStatus auditStatus, string operationalStatus, int retryCount, DateTime nowUtc, DateTime? terminalUtc = null, string? lastError = null, int? httpStatus = null) { var dto = new CachedTelemetryPacket { AuditEvent = AuditEventMapper.ToDto(new AuditEvent { EventId = eventId, OccurredAtUtc = nowUtc, Channel = AuditChannel.ApiOutbound, Kind = kind, CorrelationId = trackedId.Value, SourceSiteId = siteId, Target = "ERP.GetOrder", Status = auditStatus, HttpStatus = httpStatus, ErrorMessage = lastError, ForwardState = AuditForwardState.Pending, }), Operational = new SiteCallOperationalDto { TrackedOperationId = trackedId.Value.ToString("D"), Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = siteId, Status = operationalStatus, RetryCount = retryCount, LastError = lastError ?? string.Empty, CreatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)), UpdatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)), }, }; if (httpStatus.HasValue) { dto.Operational.HttpStatus = httpStatus.Value; } if (terminalUtc.HasValue) { dto.Operational.TerminalAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(terminalUtc.Value, DateTimeKind.Utc)); } return dto; } private static CachedTelemetryBatch BatchOf(params CachedTelemetryPacket[] packets) { var batch = new CachedTelemetryBatch(); batch.Packets.AddRange(packets); return batch; } [SkippableFact] public async Task DuplicatePacket_AuditLogStaysAtOneRow_SiteCallsUpserted_Monotonically() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var eventId = Guid.NewGuid(); var t0 = new DateTime(2026, 5, 20, 15, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); var packet = BuildPacket( eventId, trackedId, siteId, AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted", retryCount: 0, nowUtc: t0); // First delivery var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None); Assert.Single(ack1.AcceptedEventIds); // Second delivery — the exact same packet (simulates a retried gRPC). var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None); // Central acks both deliveries because storage state is consistent — // the site is free to treat its local row as Forwarded either way. Assert.Single(ack2.AcceptedEventIds); Assert.Equal(eventId.ToString(), ack2.AcceptedEventIds[0]); await using var read = harness.CreateReadContext(); // AuditLog: exactly ONE row for the EventId (insert-if-not-exists). var auditCount = await read.Set() .CountAsync(e => e.EventId == eventId); Assert.Equal(1, auditCount); // SiteCalls: exactly ONE row for the TrackedOperationId. var siteCalls = await read.Set() .Where(s => s.TrackedOperationId == trackedId) .ToListAsync(); Assert.Single(siteCalls); Assert.Equal("Submitted", siteCalls[0].Status); Assert.Equal(0, siteCalls[0].RetryCount); } [SkippableFact] public async Task OutOfOrderPackets_OlderStatus_ArrivesAfterNewer_IsNoOp() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = NewSiteId(); var trackedId = TrackedOperationId.New(); var t0 = new DateTime(2026, 5, 20, 16, 0, 0, DateTimeKind.Utc); await using var harness = new CombinedTelemetryHarness(_fixture, this); // First: the Attempted (RetryCount=2) row arrives at central — perhaps // the Submit packet got delayed in flight. SiteCalls advances straight // to Attempted with retry count 2. var attemptedEventId = Guid.NewGuid(); var attemptedPacket = BuildPacket( attemptedEventId, trackedId, siteId, AuditKind.ApiCallCached, AuditStatus.Attempted, "Attempted", retryCount: 2, nowUtc: t0.AddSeconds(10), lastError: "HTTP 500", httpStatus: 500); var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(attemptedPacket), CancellationToken.None); Assert.Single(ack1.AcceptedEventIds); // Now the stale Submit packet shows up. The audit row should still be // inserted (audit is append-only — preserve the lifecycle history), // but SiteCalls must NOT regress to Submitted/RetryCount=0. var submitEventId = Guid.NewGuid(); var submitPacket = BuildPacket( submitEventId, trackedId, siteId, AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted", retryCount: 0, nowUtc: t0); var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(submitPacket), CancellationToken.None); Assert.Single(ack2.AcceptedEventIds); await using var read = harness.CreateReadContext(); // AuditLog: TWO rows now exist for this lifecycle — the Submit and // the Attempted. Their order is by OccurredAtUtc; the test doesn't // assert ordering, only count + correlation. var auditRows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(2, auditRows.Count); Assert.All(auditRows, r => Assert.Equal(trackedId.Value, r.CorrelationId)); // SiteCalls: stuck at Attempted (monotonic — Submitted is rank 0, // Attempted is rank 2, the upsert for the older row is a no-op). var siteCall = await read.Set() .SingleAsync(s => s.TrackedOperationId == trackedId); Assert.Equal("Attempted", siteCall.Status); Assert.Equal(2, siteCall.RetryCount); Assert.Equal("HTTP 500", siteCall.LastError); Assert.Equal(500, siteCall.HttpStatus); } }