From 66f6724c5df2a34f3592ca4b5038e96ded69e3dd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 19:32:01 -0400 Subject: [PATCH] test(auditlog): outage + reconciliation recovery end-to-end (#23 M6) --- .../Integration/OutageReconciliationTests.cs | 349 ++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100644 tests/ScadaLink.AuditLog.Tests/Integration/OutageReconciliationTests.cs diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/OutageReconciliationTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/OutageReconciliationTests.cs new file mode 100644 index 0000000..57295be --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/OutageReconciliationTests.cs @@ -0,0 +1,349 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Central; +using ScadaLink.AuditLog.Site; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Bundle F (#23 M6-T10) end-to-end test for the central-outage + reconciliation +/// recovery loop. Wires the real site SQLite hot-path +/// () and the central +/// with an backed by the real +/// on the per-test . +/// +/// +/// +/// The push path is deliberately omitted here: the brief models a sustained +/// central outage where the site queue grows unbounded in Pending, then a +/// reconciliation pull eventually drains everything once central comes back. +/// We reuse the production seam (Bundle B) +/// with a test-only stub that wraps the same +/// surface a real central-side gRPC client would hit, so the test is exercising +/// the actor's pull/ingest/mark-reconciled state machine end-to-end against +/// the real repository. +/// +/// +/// The from M3 is push-only — it has no +/// reconciliation puller — so we build the smaller stub inline rather than +/// retrofitting the shared harness with a code path it doesn't otherwise +/// need. +/// +/// +public class OutageReconciliationTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public OutageReconciliationTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + /// + /// Test-only that mirrors how the + /// production central-side gRPC client will hit the site: read a batch + /// from , then commit + /// via once the central + /// repository accepts the rows. The Ask-based central path is wired by + /// the caller — we just expose the queue surface. + /// + /// + /// The production wire shape will be: + /// central PullAuditEvents RPC → site SiteStreamGrpcServer.PullAuditEvents + /// → ISiteAuditQueue.ReadPendingSinceAsync → marshal proto → reply + /// followed by central InsertIfNotExistsAsync per row, then the site flips + /// the row to Reconciled on the next pull cycle. The stub collapses the + /// two halves (pull + commit) because the actor under test (the + /// reconciliation actor) is the side that drives both via the + /// IPullAuditEventsClient seam — committing back to the site after the + /// repository write is the reconciliation-actor invariant we want to + /// observe end-to-end. + /// + private sealed class QueueBackedPullClient : IPullAuditEventsClient + { + private readonly ISiteAuditQueue _siteQueue; + public int CallCount { get; private set; } + + public QueueBackedPullClient(ISiteAuditQueue siteQueue) + { + _siteQueue = siteQueue ?? throw new ArgumentNullException(nameof(siteQueue)); + } + + public async Task PullAsync( + string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) + { + CallCount++; + + var rows = await _siteQueue + .ReadPendingSinceAsync(sinceUtc, batchSize, ct) + .ConfigureAwait(false); + + // Commit immediately on the site side — once the actor has the + // batch in hand it will InsertIfNotExistsAsync centrally; if the + // central insert later throws on a specific row, idempotency + // guarantees the next pull cycle does NOT re-fetch the row (it's + // already Reconciled on the site) but also does not surface the + // failure here. The brief calls this "ack-after-persist" — the + // production gRPC server will flip to Reconciled inside its + // PullAuditEvents handler after the central side has acknowledged + // (per Bundle A's race-fix, central is idempotent on EventId). + // + // MoreAvailable is true iff the read filled the batch — the actor + // uses this to decide whether to follow up on the next tick. + if (rows.Count > 0) + { + var ids = rows.Select(e => e.EventId).ToList(); + await _siteQueue.MarkReconciledAsync(ids, ct).ConfigureAwait(false); + } + + return new PullAuditEventsResponse(rows, MoreAvailable: rows.Count >= batchSize); + } + } + + /// + /// In-memory enumerator returning a fixed single-site list — mirrors the + /// pattern used in SiteAuditReconciliationActorTests. + /// + private sealed class StaticEnumerator : ISiteEnumerator + { + private readonly IReadOnlyList _sites; + public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult(_sites); + } + + private ScadaLinkDbContext CreateContext() => + new(new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString).Options); + + private static AuditEvent NewEvent(string siteId, DateTime occurredAt) => new() + { + EventId = Guid.NewGuid(), + OccurredAtUtc = occurredAt, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = siteId, + Target = "external-system-a/method", + }; + + private SqliteAuditWriter CreateInMemorySqliteWriter() => + new SqliteAuditWriter( + Options.Create(new SqliteAuditWriterOptions + { + DatabasePath = "ignored", + BatchSize = 64, + ChannelCapacity = 4096, + }), + NullLogger.Instance, + connectionStringOverride: + $"Data Source=file:outage-{Guid.NewGuid():N}?mode=memory&cache=shared"); + + private (IServiceProvider Sp, IActorRef Ingest) BuildCentralPipeline() + { + var services = new ServiceCollection(); + services.AddDbContext(opts => + opts.UseSqlServer(_fixture.ConnectionString)); + services.AddScoped(sp => + new AuditLogRepository(sp.GetRequiredService())); + var sp = services.BuildServiceProvider(); + + var ingest = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + sp, + NullLogger.Instance))); + return (sp, ingest); + } + + private static SiteAuditReconciliationOptions FastTickOptions(int batchSize = 256) => new() + { + ReconciliationIntervalSeconds = 300, + ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100), + BatchSize = batchSize, + StalledAfterNonDrainingCycles = 2, + }; + + // --------------------------------------------------------------------- + // 1. CentralOutage_200Events_Buffer_Then_Reconciliation_Catches_Up_NoDuplicates + // --------------------------------------------------------------------- + + [SkippableFact] + public async Task CentralOutage_200Events_Buffer_Then_Reconciliation_Catches_Up_NoDuplicates() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = "outage-recon-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + // Step 1: site accumulates 200 audit events during the simulated + // central outage. The push path is NOT wired here — every row stays + // Pending in the site SQLite store until reconciliation runs. + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var baseOccurred = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc); + const int totalEvents = 200; + var written = new List(totalEvents); + + for (int i = 0; i < totalEvents; i++) + { + // Strictly monotonic OccurredAtUtc so the cursor can advance + // deterministically batch-by-batch — mirrors how a real script + // workload generates timestamps in wall-clock order. + var evt = NewEvent(siteId, baseOccurred.AddMilliseconds(i)); + written.Add(evt); + await sqliteWriter.WriteAsync(evt); + } + + // Sanity: every row is Pending (no push path wired, so nothing has + // been Forwarded or Reconciled yet). + var pending = await sqliteWriter.ReadPendingAsync(totalEvents + 10); + Assert.Equal(totalEvents, pending.Count); + + // Step 2: central comes online — wire the ingest actor + reconciliation + // actor. The pull client wraps the site queue directly (the production + // shape is one RPC call); each pull advances the actor's cursor and + // flips rows on the site to Reconciled. + var (sp, ingest) = BuildCentralPipeline(); + await using (sp as IAsyncDisposable ?? throw new InvalidOperationException()) + { + var pullClient = new QueueBackedPullClient(sqliteWriter); + var enumerator = new StaticEnumerator(new SiteEntry(siteId, "http://test:8083")); + + // BatchSize = 64 so the actor needs ~4 ticks to drain 200 rows. + // The "after 5 minutes" wording in the brief is satisfied by the + // fast-tick override (100 ms per tick) plus AwaitAssert giving + // the actor up to ~30 seconds to settle in real time. + var opts = FastTickOptions(batchSize: 64); + + // Standalone DI scope for the reconciliation actor (it shares the + // ingest actor's IServiceProvider so both writers see the same + // EF context configuration). + var reconciliationActor = Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor( + enumerator, + pullClient, + sp, + Options.Create(opts), + NullLogger.Instance))); + + // Step 3: assert central AuditLog has all 200 rows after the + // actor drains. Polling the real MSSQL repository — the test + // fixture has its own database so a count restricted to this + // SourceSiteId is exact. + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var count = await ctx.Set() + .Where(e => e.SourceSiteId == siteId) + .CountAsync(); + Assert.Equal(totalEvents, count); + }, + duration: TimeSpan.FromSeconds(30), + interval: TimeSpan.FromMilliseconds(200)); + + // Step 4: assert site rows flipped to Reconciled. + // ReadPendingAsync only returns Pending rows; after a full drain + // it must be empty. + await AwaitAssertAsync(async () => + { + var stillPending = await sqliteWriter.ReadPendingAsync(totalEvents + 10); + Assert.Empty(stillPending); + }, + duration: TimeSpan.FromSeconds(10), + interval: TimeSpan.FromMilliseconds(100)); + + // Step 5: assert no duplicates by EventId — central must have + // exactly the 200 rows we wrote at the site (one row per EventId). + await using var verify = CreateContext(); + var centralIds = await verify.Set() + .Where(e => e.SourceSiteId == siteId) + .Select(e => e.EventId) + .ToListAsync(); + Assert.Equal(totalEvents, centralIds.Count); + Assert.Equal(totalEvents, centralIds.Distinct().Count()); + // And every EventId we wrote at the site is present centrally. + Assert.True(written.All(w => centralIds.Contains(w.EventId)), + "every site-written EventId should be present centrally."); + + // Tear the actor down before disposing the harness; the actor's + // PostStop cancels its scheduled timer. + Sys.Stop(reconciliationActor); + } + } + + // --------------------------------------------------------------------- + // 2. ReconciliationPull_Idempotent_Across_Two_Cycles + // --------------------------------------------------------------------- + + [SkippableFact] + public async Task ReconciliationPull_Idempotent_Across_Two_Cycles() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = "outage-idem-" + Guid.NewGuid().ToString("N").Substring(0, 8); + const int totalEvents = 50; + + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var baseOccurred = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc); + for (int i = 0; i < totalEvents; i++) + { + await sqliteWriter.WriteAsync(NewEvent(siteId, baseOccurred.AddMilliseconds(i))); + } + + var (sp, _) = BuildCentralPipeline(); + await using (sp as IAsyncDisposable ?? throw new InvalidOperationException()) + { + var pullClient = new QueueBackedPullClient(sqliteWriter); + var enumerator = new StaticEnumerator(new SiteEntry(siteId, "http://test:8083")); + + var reconciliationActor = Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor( + enumerator, + pullClient, + sp, + Options.Create(FastTickOptions()), + NullLogger.Instance))); + + // Wait for the first drain cycle to complete. + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var count = await ctx.Set() + .Where(e => e.SourceSiteId == siteId) + .CountAsync(); + Assert.Equal(totalEvents, count); + }, + duration: TimeSpan.FromSeconds(30), + interval: TimeSpan.FromMilliseconds(200)); + + // Wait for additional pull cycles to fire — the actor ticks every + // 100 ms so a 1 s settle leaves the actor with at least ~5 ticks + // past the initial drain. Each subsequent tick must be a no-op + // because every row is now Reconciled and outside the + // ReadPendingSinceAsync filter. + var callsAfterDrain = pullClient.CallCount; + await Task.Delay(TimeSpan.FromMilliseconds(800)); + Assert.True(pullClient.CallCount > callsAfterDrain, + $"expected additional pull calls after drain to validate idempotency, got {pullClient.CallCount} after {callsAfterDrain}"); + + // Central count must still be exactly totalEvents — no duplicates + // even though the cursor + read-Reconciled-too semantics could + // theoretically re-fetch on the second cycle. + await using var verify = CreateContext(); + var rows = await verify.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(totalEvents, rows.Count); + Assert.Equal(totalEvents, rows.Select(r => r.EventId).Distinct().Count()); + + Sys.Stop(reconciliationActor); + } + } +}