test(auditlog): combined telemetry idempotency on retried packets (#23 M3)
This commit is contained in:
@@ -0,0 +1,202 @@
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
||||
using ScadaLink.AuditLog.Telemetry;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Integration;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle G G4 idempotency suite. Telemetry packets MUST round-trip safely
|
||||
/// under retried delivery (at-least-once site→central) AND under out-of-order
|
||||
/// arrival (a stale Submit packet arriving after the central row has already
|
||||
/// advanced to Attempted must not regress the SiteCalls status, but must
|
||||
/// still insert its own audit row because audit rows are append-only and the
|
||||
/// lifecycle history is the source of truth for forensics).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Pushes <see cref="CachedTelemetryBatch"/> packets directly through the
|
||||
/// stub client (bypassing the local SQLite writer + tracking store) — the
|
||||
/// scenario being modeled is a wire-level retry, not a fresh site call, so
|
||||
/// the local stores' insert/no-op behaviour is already covered by the G2/G3
|
||||
/// happy-path tests. This suite focuses on the central ingest actor's
|
||||
/// dual-write transaction's idempotency contract.
|
||||
/// </remarks>
|
||||
public class CombinedTelemetryIdempotencyTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||
{
|
||||
private readonly MsSqlMigrationFixture _fixture;
|
||||
|
||||
public CombinedTelemetryIdempotencyTests(MsSqlMigrationFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
}
|
||||
|
||||
private static string NewSiteId() =>
|
||||
"test-g4-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||
|
||||
private static CachedTelemetryPacket BuildPacket(
|
||||
Guid eventId,
|
||||
TrackedOperationId trackedId,
|
||||
string siteId,
|
||||
AuditKind kind,
|
||||
AuditStatus auditStatus,
|
||||
string operationalStatus,
|
||||
int retryCount,
|
||||
DateTime nowUtc,
|
||||
DateTime? terminalUtc = null,
|
||||
string? lastError = null,
|
||||
int? httpStatus = null)
|
||||
{
|
||||
var dto = new CachedTelemetryPacket
|
||||
{
|
||||
AuditEvent = AuditEventMapper.ToDto(new AuditEvent
|
||||
{
|
||||
EventId = eventId,
|
||||
OccurredAtUtc = nowUtc,
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = kind,
|
||||
CorrelationId = trackedId.Value,
|
||||
SourceSiteId = siteId,
|
||||
Target = "ERP.GetOrder",
|
||||
Status = auditStatus,
|
||||
HttpStatus = httpStatus,
|
||||
ErrorMessage = lastError,
|
||||
ForwardState = AuditForwardState.Pending,
|
||||
}),
|
||||
Operational = new SiteCallOperationalDto
|
||||
{
|
||||
TrackedOperationId = trackedId.Value.ToString("D"),
|
||||
Channel = "ApiOutbound",
|
||||
Target = "ERP.GetOrder",
|
||||
SourceSite = siteId,
|
||||
Status = operationalStatus,
|
||||
RetryCount = retryCount,
|
||||
LastError = lastError ?? string.Empty,
|
||||
CreatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)),
|
||||
UpdatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)),
|
||||
},
|
||||
};
|
||||
if (httpStatus.HasValue)
|
||||
{
|
||||
dto.Operational.HttpStatus = httpStatus.Value;
|
||||
}
|
||||
if (terminalUtc.HasValue)
|
||||
{
|
||||
dto.Operational.TerminalAtUtc =
|
||||
Timestamp.FromDateTime(DateTime.SpecifyKind(terminalUtc.Value, DateTimeKind.Utc));
|
||||
}
|
||||
return dto;
|
||||
}
|
||||
|
||||
private static CachedTelemetryBatch BatchOf(params CachedTelemetryPacket[] packets)
|
||||
{
|
||||
var batch = new CachedTelemetryBatch();
|
||||
batch.Packets.AddRange(packets);
|
||||
return batch;
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task DuplicatePacket_AuditLogStaysAtOneRow_SiteCallsUpserted_Monotonically()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var trackedId = TrackedOperationId.New();
|
||||
var eventId = Guid.NewGuid();
|
||||
var t0 = new DateTime(2026, 5, 20, 15, 0, 0, DateTimeKind.Utc);
|
||||
|
||||
await using var harness = new CombinedTelemetryHarness(_fixture, this);
|
||||
|
||||
var packet = BuildPacket(
|
||||
eventId, trackedId, siteId,
|
||||
AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted",
|
||||
retryCount: 0, nowUtc: t0);
|
||||
|
||||
// First delivery
|
||||
var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None);
|
||||
Assert.Single(ack1.AcceptedEventIds);
|
||||
|
||||
// Second delivery — the exact same packet (simulates a retried gRPC).
|
||||
var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None);
|
||||
// Central acks both deliveries because storage state is consistent —
|
||||
// the site is free to treat its local row as Forwarded either way.
|
||||
Assert.Single(ack2.AcceptedEventIds);
|
||||
Assert.Equal(eventId.ToString(), ack2.AcceptedEventIds[0]);
|
||||
|
||||
await using var read = harness.CreateReadContext();
|
||||
|
||||
// AuditLog: exactly ONE row for the EventId (insert-if-not-exists).
|
||||
var auditCount = await read.Set<AuditEvent>()
|
||||
.CountAsync(e => e.EventId == eventId);
|
||||
Assert.Equal(1, auditCount);
|
||||
|
||||
// SiteCalls: exactly ONE row for the TrackedOperationId.
|
||||
var siteCalls = await read.Set<SiteCall>()
|
||||
.Where(s => s.TrackedOperationId == trackedId)
|
||||
.ToListAsync();
|
||||
Assert.Single(siteCalls);
|
||||
Assert.Equal("Submitted", siteCalls[0].Status);
|
||||
Assert.Equal(0, siteCalls[0].RetryCount);
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task OutOfOrderPackets_OlderStatus_ArrivesAfterNewer_IsNoOp()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var trackedId = TrackedOperationId.New();
|
||||
var t0 = new DateTime(2026, 5, 20, 16, 0, 0, DateTimeKind.Utc);
|
||||
|
||||
await using var harness = new CombinedTelemetryHarness(_fixture, this);
|
||||
|
||||
// First: the Attempted (RetryCount=2) row arrives at central — perhaps
|
||||
// the Submit packet got delayed in flight. SiteCalls advances straight
|
||||
// to Attempted with retry count 2.
|
||||
var attemptedEventId = Guid.NewGuid();
|
||||
var attemptedPacket = BuildPacket(
|
||||
attemptedEventId, trackedId, siteId,
|
||||
AuditKind.ApiCallCached, AuditStatus.Attempted, "Attempted",
|
||||
retryCount: 2, nowUtc: t0.AddSeconds(10),
|
||||
lastError: "HTTP 500", httpStatus: 500);
|
||||
var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(attemptedPacket), CancellationToken.None);
|
||||
Assert.Single(ack1.AcceptedEventIds);
|
||||
|
||||
// Now the stale Submit packet shows up. The audit row should still be
|
||||
// inserted (audit is append-only — preserve the lifecycle history),
|
||||
// but SiteCalls must NOT regress to Submitted/RetryCount=0.
|
||||
var submitEventId = Guid.NewGuid();
|
||||
var submitPacket = BuildPacket(
|
||||
submitEventId, trackedId, siteId,
|
||||
AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted",
|
||||
retryCount: 0, nowUtc: t0);
|
||||
var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(submitPacket), CancellationToken.None);
|
||||
Assert.Single(ack2.AcceptedEventIds);
|
||||
|
||||
await using var read = harness.CreateReadContext();
|
||||
|
||||
// AuditLog: TWO rows now exist for this lifecycle — the Submit and
|
||||
// the Attempted. Their order is by OccurredAtUtc; the test doesn't
|
||||
// assert ordering, only count + correlation.
|
||||
var auditRows = await read.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.ToListAsync();
|
||||
Assert.Equal(2, auditRows.Count);
|
||||
Assert.All(auditRows, r => Assert.Equal(trackedId.Value, r.CorrelationId));
|
||||
|
||||
// SiteCalls: stuck at Attempted (monotonic — Submitted is rank 0,
|
||||
// Attempted is rank 2, the upsert for the older row is a no-op).
|
||||
var siteCall = await read.Set<SiteCall>()
|
||||
.SingleAsync(s => s.TrackedOperationId == trackedId);
|
||||
Assert.Equal("Attempted", siteCall.Status);
|
||||
Assert.Equal(2, siteCall.RetryCount);
|
||||
Assert.Equal("HTTP 500", siteCall.LastError);
|
||||
Assert.Equal(500, siteCall.HttpStatus);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user