test(auditlog): outage + reconciliation recovery end-to-end (#23 M6)

This commit is contained in:
Joseph Doherty
2026-05-20 19:32:01 -04:00
parent ef49b55cf6
commit 66f6724c5d

View File

@@ -0,0 +1,349 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
namespace ScadaLink.AuditLog.Tests.Integration;
/// <summary>
/// Bundle F (#23 M6-T10) end-to-end test for the central-outage + reconciliation
/// recovery loop. Wires the real site SQLite hot-path
/// (<see cref="SqliteAuditWriter"/>) and the central <see cref="SiteAuditReconciliationActor"/>
/// with an <see cref="AuditLogIngestActor"/> backed by the real
/// <see cref="AuditLogRepository"/> on the per-test <see cref="MsSqlMigrationFixture"/>.
/// </summary>
/// <remarks>
/// <para>
/// The push path is deliberately omitted here: the brief models a sustained
/// central outage where the site queue grows unbounded in Pending, then a
/// reconciliation pull eventually drains everything once central comes back.
/// We reuse the production <see cref="IPullAuditEventsClient"/> seam (Bundle B)
/// with a test-only stub that wraps the same <see cref="ISiteAuditQueue.ReadPendingSinceAsync"/>
/// surface a real central-side gRPC client would hit, so the test is exercising
/// the actor's pull/ingest/mark-reconciled state machine end-to-end against
/// the real repository.
/// </para>
/// <para>
/// The <see cref="CombinedTelemetryHarness"/> from M3 is push-only — it has no
/// reconciliation puller — so we build the smaller stub inline rather than
/// retrofitting the shared harness with a code path it doesn't otherwise
/// need.
/// </para>
/// </remarks>
public class OutageReconciliationTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public OutageReconciliationTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
/// <summary>
/// Test-only <see cref="IPullAuditEventsClient"/> that mirrors how the
/// production central-side gRPC client will hit the site: read a batch
/// from <see cref="ISiteAuditQueue.ReadPendingSinceAsync"/>, then commit
/// via <see cref="ISiteAuditQueue.MarkReconciledAsync"/> once the central
/// repository accepts the rows. The Ask-based central path is wired by
/// the caller — we just expose the queue surface.
/// </summary>
/// <remarks>
/// The production wire shape will be:
/// central PullAuditEvents RPC → site SiteStreamGrpcServer.PullAuditEvents
/// → ISiteAuditQueue.ReadPendingSinceAsync → marshal proto → reply
/// followed by central InsertIfNotExistsAsync per row, then the site flips
/// the row to Reconciled on the next pull cycle. The stub collapses the
/// two halves (pull + commit) because the actor under test (the
/// reconciliation actor) is the side that drives both via the
/// IPullAuditEventsClient seam — committing back to the site after the
/// repository write is the reconciliation-actor invariant we want to
/// observe end-to-end.
/// </remarks>
private sealed class QueueBackedPullClient : IPullAuditEventsClient
{
private readonly ISiteAuditQueue _siteQueue;
public int CallCount { get; private set; }
public QueueBackedPullClient(ISiteAuditQueue siteQueue)
{
_siteQueue = siteQueue ?? throw new ArgumentNullException(nameof(siteQueue));
}
public async Task<PullAuditEventsResponse> PullAsync(
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
{
CallCount++;
var rows = await _siteQueue
.ReadPendingSinceAsync(sinceUtc, batchSize, ct)
.ConfigureAwait(false);
// Commit immediately on the site side — once the actor has the
// batch in hand it will InsertIfNotExistsAsync centrally; if the
// central insert later throws on a specific row, idempotency
// guarantees the next pull cycle does NOT re-fetch the row (it's
// already Reconciled on the site) but also does not surface the
// failure here. The brief calls this "ack-after-persist" — the
// production gRPC server will flip to Reconciled inside its
// PullAuditEvents handler after the central side has acknowledged
// (per Bundle A's race-fix, central is idempotent on EventId).
//
// MoreAvailable is true iff the read filled the batch — the actor
// uses this to decide whether to follow up on the next tick.
if (rows.Count > 0)
{
var ids = rows.Select(e => e.EventId).ToList();
await _siteQueue.MarkReconciledAsync(ids, ct).ConfigureAwait(false);
}
return new PullAuditEventsResponse(rows, MoreAvailable: rows.Count >= batchSize);
}
}
/// <summary>
/// In-memory enumerator returning a fixed single-site list — mirrors the
/// pattern used in <c>SiteAuditReconciliationActorTests</c>.
/// </summary>
private sealed class StaticEnumerator : ISiteEnumerator
{
private readonly IReadOnlyList<SiteEntry> _sites;
public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
Task.FromResult(_sites);
}
private ScadaLinkDbContext CreateContext() =>
new(new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString).Options);
private static AuditEvent NewEvent(string siteId, DateTime occurredAt) => new()
{
EventId = Guid.NewGuid(),
OccurredAtUtc = occurredAt,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
SourceSiteId = siteId,
Target = "external-system-a/method",
};
private SqliteAuditWriter CreateInMemorySqliteWriter() =>
new SqliteAuditWriter(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 4096,
}),
NullLogger<SqliteAuditWriter>.Instance,
connectionStringOverride:
$"Data Source=file:outage-{Guid.NewGuid():N}?mode=memory&cache=shared");
private (IServiceProvider Sp, IActorRef Ingest) BuildCentralPipeline()
{
var services = new ServiceCollection();
services.AddDbContext<ScadaLinkDbContext>(opts =>
opts.UseSqlServer(_fixture.ConnectionString));
services.AddScoped<IAuditLogRepository>(sp =>
new AuditLogRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
var sp = services.BuildServiceProvider();
var ingest = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
sp,
NullLogger<AuditLogIngestActor>.Instance)));
return (sp, ingest);
}
private static SiteAuditReconciliationOptions FastTickOptions(int batchSize = 256) => new()
{
ReconciliationIntervalSeconds = 300,
ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
BatchSize = batchSize,
StalledAfterNonDrainingCycles = 2,
};
// ---------------------------------------------------------------------
// 1. CentralOutage_200Events_Buffer_Then_Reconciliation_Catches_Up_NoDuplicates
// ---------------------------------------------------------------------
[SkippableFact]
public async Task CentralOutage_200Events_Buffer_Then_Reconciliation_Catches_Up_NoDuplicates()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = "outage-recon-" + Guid.NewGuid().ToString("N").Substring(0, 8);
// Step 1: site accumulates 200 audit events during the simulated
// central outage. The push path is NOT wired here — every row stays
// Pending in the site SQLite store until reconciliation runs.
await using var sqliteWriter = CreateInMemorySqliteWriter();
var baseOccurred = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
const int totalEvents = 200;
var written = new List<AuditEvent>(totalEvents);
for (int i = 0; i < totalEvents; i++)
{
// Strictly monotonic OccurredAtUtc so the cursor can advance
// deterministically batch-by-batch — mirrors how a real script
// workload generates timestamps in wall-clock order.
var evt = NewEvent(siteId, baseOccurred.AddMilliseconds(i));
written.Add(evt);
await sqliteWriter.WriteAsync(evt);
}
// Sanity: every row is Pending (no push path wired, so nothing has
// been Forwarded or Reconciled yet).
var pending = await sqliteWriter.ReadPendingAsync(totalEvents + 10);
Assert.Equal(totalEvents, pending.Count);
// Step 2: central comes online — wire the ingest actor + reconciliation
// actor. The pull client wraps the site queue directly (the production
// shape is one RPC call); each pull advances the actor's cursor and
// flips rows on the site to Reconciled.
var (sp, ingest) = BuildCentralPipeline();
await using (sp as IAsyncDisposable ?? throw new InvalidOperationException())
{
var pullClient = new QueueBackedPullClient(sqliteWriter);
var enumerator = new StaticEnumerator(new SiteEntry(siteId, "http://test:8083"));
// BatchSize = 64 so the actor needs ~4 ticks to drain 200 rows.
// The "after 5 minutes" wording in the brief is satisfied by the
// fast-tick override (100 ms per tick) plus AwaitAssert giving
// the actor up to ~30 seconds to settle in real time.
var opts = FastTickOptions(batchSize: 64);
// Standalone DI scope for the reconciliation actor (it shares the
// ingest actor's IServiceProvider so both writers see the same
// EF context configuration).
var reconciliationActor = Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor(
enumerator,
pullClient,
sp,
Options.Create(opts),
NullLogger<SiteAuditReconciliationActor>.Instance)));
// Step 3: assert central AuditLog has all 200 rows after the
// actor drains. Polling the real MSSQL repository — the test
// fixture has its own database so a count restricted to this
// SourceSiteId is exact.
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var count = await ctx.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.CountAsync();
Assert.Equal(totalEvents, count);
},
duration: TimeSpan.FromSeconds(30),
interval: TimeSpan.FromMilliseconds(200));
// Step 4: assert site rows flipped to Reconciled.
// ReadPendingAsync only returns Pending rows; after a full drain
// it must be empty.
await AwaitAssertAsync(async () =>
{
var stillPending = await sqliteWriter.ReadPendingAsync(totalEvents + 10);
Assert.Empty(stillPending);
},
duration: TimeSpan.FromSeconds(10),
interval: TimeSpan.FromMilliseconds(100));
// Step 5: assert no duplicates by EventId — central must have
// exactly the 200 rows we wrote at the site (one row per EventId).
await using var verify = CreateContext();
var centralIds = await verify.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.Select(e => e.EventId)
.ToListAsync();
Assert.Equal(totalEvents, centralIds.Count);
Assert.Equal(totalEvents, centralIds.Distinct().Count());
// And every EventId we wrote at the site is present centrally.
Assert.True(written.All(w => centralIds.Contains(w.EventId)),
"every site-written EventId should be present centrally.");
// Tear the actor down before disposing the harness; the actor's
// PostStop cancels its scheduled timer.
Sys.Stop(reconciliationActor);
}
}
// ---------------------------------------------------------------------
// 2. ReconciliationPull_Idempotent_Across_Two_Cycles
// ---------------------------------------------------------------------
[SkippableFact]
public async Task ReconciliationPull_Idempotent_Across_Two_Cycles()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = "outage-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8);
const int totalEvents = 50;
await using var sqliteWriter = CreateInMemorySqliteWriter();
var baseOccurred = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc);
for (int i = 0; i < totalEvents; i++)
{
await sqliteWriter.WriteAsync(NewEvent(siteId, baseOccurred.AddMilliseconds(i)));
}
var (sp, _) = BuildCentralPipeline();
await using (sp as IAsyncDisposable ?? throw new InvalidOperationException())
{
var pullClient = new QueueBackedPullClient(sqliteWriter);
var enumerator = new StaticEnumerator(new SiteEntry(siteId, "http://test:8083"));
var reconciliationActor = Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor(
enumerator,
pullClient,
sp,
Options.Create(FastTickOptions()),
NullLogger<SiteAuditReconciliationActor>.Instance)));
// Wait for the first drain cycle to complete.
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var count = await ctx.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.CountAsync();
Assert.Equal(totalEvents, count);
},
duration: TimeSpan.FromSeconds(30),
interval: TimeSpan.FromMilliseconds(200));
// Wait for additional pull cycles to fire — the actor ticks every
// 100 ms so a 1 s settle leaves the actor with at least ~5 ticks
// past the initial drain. Each subsequent tick must be a no-op
// because every row is now Reconciled and outside the
// ReadPendingSinceAsync filter.
var callsAfterDrain = pullClient.CallCount;
await Task.Delay(TimeSpan.FromMilliseconds(800));
Assert.True(pullClient.CallCount > callsAfterDrain,
$"expected additional pull calls after drain to validate idempotency, got {pullClient.CallCount} after {callsAfterDrain}");
// Central count must still be exactly totalEvents — no duplicates
// even though the cursor + read-Reconciled-too semantics could
// theoretically re-fetch on the second cycle.
await using var verify = CreateContext();
var rows = await verify.Set<AuditEvent>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
Assert.Equal(totalEvents, rows.Count);
Assert.Equal(totalEvents, rows.Select(r => r.EventId).Distinct().Count());
Sys.Stop(reconciliationActor);
}
}
}