203 lines
8.5 KiB
C#
203 lines
8.5 KiB
C#
using Akka.TestKit.Xunit2;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
|
using ScadaLink.AuditLog.Telemetry;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
|
using ScadaLink.Communication.Grpc;
|
|
using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Integration;
|
|
|
|
/// <summary>
|
|
/// Bundle G G4 idempotency suite. Telemetry packets MUST round-trip safely
|
|
/// under retried delivery (at-least-once site→central) AND under out-of-order
|
|
/// arrival (a stale Submit packet arriving after the central row has already
|
|
/// advanced to Attempted must not regress the SiteCalls status, but must
|
|
/// still insert its own audit row because audit rows are append-only and the
|
|
/// lifecycle history is the source of truth for forensics).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Pushes <see cref="CachedTelemetryBatch"/> packets directly through the
|
|
/// stub client (bypassing the local SQLite writer + tracking store) — the
|
|
/// scenario being modeled is a wire-level retry, not a fresh site call, so
|
|
/// the local stores' insert/no-op behaviour is already covered by the G2/G3
|
|
/// happy-path tests. This suite focuses on the central ingest actor's
|
|
/// dual-write transaction's idempotency contract.
|
|
/// </remarks>
|
|
public class CombinedTelemetryIdempotencyTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
|
{
|
|
private readonly MsSqlMigrationFixture _fixture;
|
|
|
|
public CombinedTelemetryIdempotencyTests(MsSqlMigrationFixture fixture)
|
|
{
|
|
_fixture = fixture;
|
|
}
|
|
|
|
private static string NewSiteId() =>
|
|
"test-g4-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
|
|
|
private static CachedTelemetryPacket BuildPacket(
|
|
Guid eventId,
|
|
TrackedOperationId trackedId,
|
|
string siteId,
|
|
AuditKind kind,
|
|
AuditStatus auditStatus,
|
|
string operationalStatus,
|
|
int retryCount,
|
|
DateTime nowUtc,
|
|
DateTime? terminalUtc = null,
|
|
string? lastError = null,
|
|
int? httpStatus = null)
|
|
{
|
|
var dto = new CachedTelemetryPacket
|
|
{
|
|
AuditEvent = AuditEventMapper.ToDto(new AuditEvent
|
|
{
|
|
EventId = eventId,
|
|
OccurredAtUtc = nowUtc,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = kind,
|
|
CorrelationId = trackedId.Value,
|
|
SourceSiteId = siteId,
|
|
Target = "ERP.GetOrder",
|
|
Status = auditStatus,
|
|
HttpStatus = httpStatus,
|
|
ErrorMessage = lastError,
|
|
ForwardState = AuditForwardState.Pending,
|
|
}),
|
|
Operational = new SiteCallOperationalDto
|
|
{
|
|
TrackedOperationId = trackedId.Value.ToString("D"),
|
|
Channel = "ApiOutbound",
|
|
Target = "ERP.GetOrder",
|
|
SourceSite = siteId,
|
|
Status = operationalStatus,
|
|
RetryCount = retryCount,
|
|
LastError = lastError ?? string.Empty,
|
|
CreatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)),
|
|
UpdatedAtUtc = Timestamp.FromDateTime(DateTime.SpecifyKind(nowUtc, DateTimeKind.Utc)),
|
|
},
|
|
};
|
|
if (httpStatus.HasValue)
|
|
{
|
|
dto.Operational.HttpStatus = httpStatus.Value;
|
|
}
|
|
if (terminalUtc.HasValue)
|
|
{
|
|
dto.Operational.TerminalAtUtc =
|
|
Timestamp.FromDateTime(DateTime.SpecifyKind(terminalUtc.Value, DateTimeKind.Utc));
|
|
}
|
|
return dto;
|
|
}
|
|
|
|
private static CachedTelemetryBatch BatchOf(params CachedTelemetryPacket[] packets)
|
|
{
|
|
var batch = new CachedTelemetryBatch();
|
|
batch.Packets.AddRange(packets);
|
|
return batch;
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task DuplicatePacket_AuditLogStaysAtOneRow_SiteCallsUpserted_Monotonically()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
var trackedId = TrackedOperationId.New();
|
|
var eventId = Guid.NewGuid();
|
|
var t0 = new DateTime(2026, 5, 20, 15, 0, 0, DateTimeKind.Utc);
|
|
|
|
await using var harness = new CombinedTelemetryHarness(_fixture, this);
|
|
|
|
var packet = BuildPacket(
|
|
eventId, trackedId, siteId,
|
|
AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted",
|
|
retryCount: 0, nowUtc: t0);
|
|
|
|
// First delivery
|
|
var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None);
|
|
Assert.Single(ack1.AcceptedEventIds);
|
|
|
|
// Second delivery — the exact same packet (simulates a retried gRPC).
|
|
var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(packet), CancellationToken.None);
|
|
// Central acks both deliveries because storage state is consistent —
|
|
// the site is free to treat its local row as Forwarded either way.
|
|
Assert.Single(ack2.AcceptedEventIds);
|
|
Assert.Equal(eventId.ToString(), ack2.AcceptedEventIds[0]);
|
|
|
|
await using var read = harness.CreateReadContext();
|
|
|
|
// AuditLog: exactly ONE row for the EventId (insert-if-not-exists).
|
|
var auditCount = await read.Set<AuditEvent>()
|
|
.CountAsync(e => e.EventId == eventId);
|
|
Assert.Equal(1, auditCount);
|
|
|
|
// SiteCalls: exactly ONE row for the TrackedOperationId.
|
|
var siteCalls = await read.Set<SiteCall>()
|
|
.Where(s => s.TrackedOperationId == trackedId)
|
|
.ToListAsync();
|
|
Assert.Single(siteCalls);
|
|
Assert.Equal("Submitted", siteCalls[0].Status);
|
|
Assert.Equal(0, siteCalls[0].RetryCount);
|
|
}
|
|
|
|
[SkippableFact]
|
|
public async Task OutOfOrderPackets_OlderStatus_ArrivesAfterNewer_IsNoOp()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = NewSiteId();
|
|
var trackedId = TrackedOperationId.New();
|
|
var t0 = new DateTime(2026, 5, 20, 16, 0, 0, DateTimeKind.Utc);
|
|
|
|
await using var harness = new CombinedTelemetryHarness(_fixture, this);
|
|
|
|
// First: the Attempted (RetryCount=2) row arrives at central — perhaps
|
|
// the Submit packet got delayed in flight. SiteCalls advances straight
|
|
// to Attempted with retry count 2.
|
|
var attemptedEventId = Guid.NewGuid();
|
|
var attemptedPacket = BuildPacket(
|
|
attemptedEventId, trackedId, siteId,
|
|
AuditKind.ApiCallCached, AuditStatus.Attempted, "Attempted",
|
|
retryCount: 2, nowUtc: t0.AddSeconds(10),
|
|
lastError: "HTTP 500", httpStatus: 500);
|
|
var ack1 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(attemptedPacket), CancellationToken.None);
|
|
Assert.Single(ack1.AcceptedEventIds);
|
|
|
|
// Now the stale Submit packet shows up. The audit row should still be
|
|
// inserted (audit is append-only — preserve the lifecycle history),
|
|
// but SiteCalls must NOT regress to Submitted/RetryCount=0.
|
|
var submitEventId = Guid.NewGuid();
|
|
var submitPacket = BuildPacket(
|
|
submitEventId, trackedId, siteId,
|
|
AuditKind.CachedSubmit, AuditStatus.Submitted, "Submitted",
|
|
retryCount: 0, nowUtc: t0);
|
|
var ack2 = await harness.StubClient.IngestCachedTelemetryAsync(BatchOf(submitPacket), CancellationToken.None);
|
|
Assert.Single(ack2.AcceptedEventIds);
|
|
|
|
await using var read = harness.CreateReadContext();
|
|
|
|
// AuditLog: TWO rows now exist for this lifecycle — the Submit and
|
|
// the Attempted. Their order is by OccurredAtUtc; the test doesn't
|
|
// assert ordering, only count + correlation.
|
|
var auditRows = await read.Set<AuditEvent>()
|
|
.Where(e => e.SourceSiteId == siteId)
|
|
.ToListAsync();
|
|
Assert.Equal(2, auditRows.Count);
|
|
Assert.All(auditRows, r => Assert.Equal(trackedId.Value, r.CorrelationId));
|
|
|
|
// SiteCalls: stuck at Attempted (monotonic — Submitted is rank 0,
|
|
// Attempted is rank 2, the upsert for the older row is a no-op).
|
|
var siteCall = await read.Set<SiteCall>()
|
|
.SingleAsync(s => s.TrackedOperationId == trackedId);
|
|
Assert.Equal("Attempted", siteCall.Status);
|
|
Assert.Equal(2, siteCall.RetryCount);
|
|
Assert.Equal("HTTP 500", siteCall.LastError);
|
|
Assert.Equal(500, siteCall.HttpStatus);
|
|
}
|
|
}
|