using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.AuditLog.Central; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; namespace ScadaLink.AuditLog.Tests.Central; /// /// Bundle B (M6-T3) tests for . Most /// tests substitute the with an in-memory /// recording stub so the actor's tick / cursor / stalled state machinery can /// be exercised in milliseconds without an MSSQL container. The duplicate / /// idempotency assertion uses the real against /// the so we verify InsertIfNotExistsAsync /// actually swallows duplicate-key collisions (the M2 Bundle A race-fix the /// reconciliation puller depends on). /// public class SiteAuditReconciliationActorTests : TestKit, IClassFixture { private readonly MsSqlMigrationFixture _fixture; public SiteAuditReconciliationActorTests(MsSqlMigrationFixture fixture) { _fixture = fixture; } private static AuditEvent NewEvent( string siteId, DateTime? occurredAt = null, Guid? id = null) => new() { EventId = id ?? Guid.NewGuid(), OccurredAtUtc = occurredAt ?? new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), Channel = AuditChannel.ApiOutbound, Kind = AuditKind.ApiCall, Status = AuditStatus.Delivered, SourceSiteId = siteId, }; private static SiteAuditReconciliationOptions FastTickOptions( int batchSize = 256, int stalledAfter = 2) => new() { // 100 ms tick keeps each test under a second. AwaitAssert covers // schedule jitter so a 100 ms tick has up to ~3 s to fire. ReconciliationIntervalSeconds = 300, ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100), BatchSize = batchSize, StalledAfterNonDrainingCycles = stalledAfter, }; /// /// In-memory recording stub used for non-MSSQL tests. Captures every /// call AND deduplicates on /// so duplicate-handling assertions don't /// need a real database for the simple cases. /// private sealed class RecordingRepo : IAuditLogRepository { public List Inserted { get; } = new(); private readonly HashSet _seen = new(); public int InsertCallCount { get; private set; } public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) { InsertCallCount++; if (_seen.Add(evt.EventId)) { Inserted.Add(evt); } return Task.CompletedTask; } public Task> QueryAsync( AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => Task.FromResult>(Inserted); public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => Task.FromResult(0L); public Task> GetPartitionBoundariesOlderThanAsync( DateTime threshold, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); public Task GetKpiSnapshotAsync( TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) => Task.FromResult(new ScadaLink.Commons.Types.AuditLogKpiSnapshot(0L, 0L, 0L, nowUtc ?? DateTime.UtcNow)); } /// /// In-memory enumerator returning a static list of sites. /// private sealed class StaticEnumerator : ISiteEnumerator { private readonly IReadOnlyList _sites; public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; public Task> EnumerateAsync(CancellationToken ct = default) => Task.FromResult(_sites); } /// /// Scripted pull client — returns the next queued response for the site /// on each call, looping the last entry if the queue is exhausted. Also /// records every invocation so tests can assert call counts + arguments. /// private sealed class ScriptedPullClient : IPullAuditEventsClient { public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new(); private readonly Dictionary> _scripted = new(); private readonly Dictionary _throwOnSite = new(); public ScriptedPullClient Script(string siteId, params PullAuditEventsResponse[] responses) { _scripted[siteId] = new Queue(responses); return this; } public ScriptedPullClient ThrowFor(string siteId, Exception ex) { _throwOnSite[siteId] = ex; return this; } public Task PullAsync( string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) { Calls.Add((siteId, sinceUtc, batchSize)); if (_throwOnSite.TryGetValue(siteId, out var ex)) { throw ex; } if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0) { return Task.FromResult(queue.Dequeue()); } return Task.FromResult( new PullAuditEventsResponse(Array.Empty(), MoreAvailable: false)); } } private IServiceProvider BuildScopedProvider(IAuditLogRepository repo) { var services = new ServiceCollection(); // The actor opens a scope per tick and resolves IAuditLogRepository // from that scope; registering as scoped mirrors how // AddConfigurationDatabase wires the real repository. services.AddScoped(_ => repo); return services.BuildServiceProvider(); } private IActorRef CreateActor( ISiteEnumerator sites, IPullAuditEventsClient client, IAuditLogRepository repo, SiteAuditReconciliationOptions options) { var sp = BuildScopedProvider(repo); return Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor( sites, client, sp, Options.Create(options), NullLogger.Instance))); } /// /// Subscribes to the EventStream and collects every /// publication into a list /// the test can assert on. Uses a probe actor so the stream's /// fire-and-forget delivery is observable from the test thread. /// private (Akka.TestKit.TestProbe Probe, List Captured) SubscribeStalled() { var probe = CreateTestProbe(); Sys.EventStream.Subscribe(probe.Ref, typeof(SiteAuditTelemetryStalledChanged)); var captured = new List(); return (probe, captured); } // --------------------------------------------------------------------- // 1. Timer_Fires_OnConfiguredInterval // --------------------------------------------------------------------- [Fact] public void Timer_Fires_OnConfiguredInterval() { var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); var client = new ScriptedPullClient(); var repo = new RecordingRepo(); var opts = FastTickOptions(); CreateActor(sites, client, repo, opts); // The first scheduled tick fires after `ReconciliationIntervalSeconds`, // which is 0 for the test — Akka's scheduler still respects the // ScheduleTellRepeatedlyCancelable contract that issues a Tell on the // scheduler thread, so we await visible side effects (a PullAsync call) // rather than racing on internal state. AwaitAssert( () => Assert.True(client.Calls.Count >= 1, $"expected >= 1 pull call, got {client.Calls.Count}"), duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } // --------------------------------------------------------------------- // 2. Tick_PullsFromEachKnownSite // --------------------------------------------------------------------- [Fact] public void Tick_PullsFromEachKnownSite() { var sites = new StaticEnumerator( new SiteEntry("siteA", "http://siteA:8083"), new SiteEntry("siteB", "http://siteB:8083")); var client = new ScriptedPullClient(); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert(() => { Assert.Contains(client.Calls, c => c.SiteId == "siteA"); Assert.Contains(client.Calls, c => c.SiteId == "siteB"); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } // --------------------------------------------------------------------- // 3. Tick_IngestEvents_ViaInsertIfNotExistsAsync // --------------------------------------------------------------------- [Fact] public void Tick_IngestEvents_ViaInsertIfNotExistsAsync() { var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); var e1 = NewEvent("siteA"); var e2 = NewEvent("siteA"); var client = new ScriptedPullClient().Script("siteA", new PullAuditEventsResponse(new[] { e1, e2 }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert(() => Assert.Equal(2, repo.InsertCallCount), duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); Assert.Contains(repo.Inserted, e => e.EventId == e1.EventId); Assert.Contains(repo.Inserted, e => e.EventId == e2.EventId); } // --------------------------------------------------------------------- // 4. Tick_Duplicates_NotDoubleInserted (real MSSQL idempotency) // --------------------------------------------------------------------- private ScadaLinkDbContext CreateContext() => new(new DbContextOptionsBuilder() .UseSqlServer(_fixture.ConnectionString).Options); [SkippableFact] public async Task Tick_Duplicates_NotDoubleInserted() { Skip.IfNot(_fixture.Available, _fixture.SkipReason); var siteId = "bundle-b-" + Guid.NewGuid().ToString("N").Substring(0, 8); var pre = NewEvent(siteId); // Seed the row directly so the actor sees it already present when the // pull returns it. await using (var seedContext = CreateContext()) { await new AuditLogRepository(seedContext).InsertIfNotExistsAsync(pre); } // Stack one new and the pre-existing row in the pull response. The // second-pull script returns empty so the actor settles. var fresh = NewEvent(siteId); var sites = new StaticEnumerator(new SiteEntry(siteId, "http://x:8083")); var client = new ScriptedPullClient().Script(siteId, new PullAuditEventsResponse(new[] { pre, fresh }, MoreAvailable: false)); await using var context = CreateContext(); var repo = new AuditLogRepository(context); CreateActor(sites, client, repo, FastTickOptions()); // Wait for the actor to ingest both rows. await Task.Delay(TimeSpan.FromSeconds(1)); AwaitAssert(() => Assert.True(client.Calls.Count >= 1), duration: TimeSpan.FromSeconds(3)); // Even though the pull returned 2 events, only 1 fresh row should // exist in MSSQL alongside the pre-existing one — InsertIfNotExistsAsync // is first-write-wins on EventId. await using var read = CreateContext(); var rows = await read.Set() .Where(e => e.SourceSiteId == siteId) .ToListAsync(); Assert.Equal(2, rows.Count); Assert.Contains(rows, r => r.EventId == pre.EventId); Assert.Contains(rows, r => r.EventId == fresh.EventId); } // --------------------------------------------------------------------- // 5. Cursor_Advances_ToMaxOccurredAtUtc // --------------------------------------------------------------------- [Fact] public void Cursor_Advances_ToMaxOccurredAtUtc() { var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc); var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc); var e1 = NewEvent("siteA", t1); var e2 = NewEvent("siteA", t2); var e3 = NewEvent("siteA", t3); // First pull returns three events with t1, t2, t3. Subsequent pulls // return empty — but the test asserts the SECOND pull's since argument // is t3 (the max OccurredAtUtc from the first pull). var client = new ScriptedPullClient().Script("siteA", new PullAuditEventsResponse(new[] { e1, e2, e3 }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); // Wait until we have at least two pulls — the second one must use t3 // as its `since` argument because that was the max OccurredAtUtc in // the first response. AwaitAssert(() => Assert.True(client.Calls.Count >= 2, $"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"), duration: TimeSpan.FromSeconds(5), interval: TimeSpan.FromMilliseconds(50)); Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc); Assert.Equal(t3, client.Calls[1].SinceUtc); } // --------------------------------------------------------------------- // 6. Tick_OneSiteThrows_OtherSitesStillProcessed // --------------------------------------------------------------------- [Fact] public void Tick_OneSiteThrows_OtherSitesStillProcessed() { var sites = new StaticEnumerator( new SiteEntry("siteA", "http://siteA:8083"), new SiteEntry("siteB", "http://siteB:8083")); var bEvent = NewEvent("siteB"); var client = new ScriptedPullClient() .ThrowFor("siteA", new InvalidOperationException("simulated transport failure")) .Script("siteB", new PullAuditEventsResponse(new[] { bEvent }, MoreAvailable: false)); var repo = new RecordingRepo(); CreateActor(sites, client, repo, FastTickOptions()); AwaitAssert(() => { Assert.Contains(client.Calls, c => c.SiteId == "siteA"); Assert.Contains(repo.Inserted, e => e.EventId == bEvent.EventId); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } // --------------------------------------------------------------------- // 7. StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue // --------------------------------------------------------------------- [Fact] public void StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue() { var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); // Two scripted responses that each return events AND MoreAvailable=true // — the second pull triggers the stalled transition. var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); var client = new ScriptedPullClient().Script("siteA", new PullAuditEventsResponse(batch1, MoreAvailable: true), new PullAuditEventsResponse(batch2, MoreAvailable: true)); var repo = new RecordingRepo(); var (probe, _) = SubscribeStalled(); CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2)); // Expect Stalled=true after the second non-draining tick. The probe // waits with its own timeout (a few seconds gives the 0 s repeat // interval ample slack). var msg = probe.ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("siteA", msg.SiteId); Assert.True(msg.Stalled); } // --------------------------------------------------------------------- // 8. StalledDetection_DrainingCycle_PublishesStalledFalse // --------------------------------------------------------------------- [Fact] public void StalledDetection_DrainingCycle_PublishesStalledFalse() { var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); // Two non-draining responses get the actor into Stalled=true, then a // draining response (events but MoreAvailable=false) flips it back. var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); var batch3 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); var client = new ScriptedPullClient().Script("siteA", new PullAuditEventsResponse(batch1, MoreAvailable: true), new PullAuditEventsResponse(batch2, MoreAvailable: true), new PullAuditEventsResponse(batch3, MoreAvailable: false)); var repo = new RecordingRepo(); var (probe, _) = SubscribeStalled(); CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2)); // First publication is the stalled=true transition; second is the // back-to-draining flip. The actor publishes ONLY on transitions so we // expect exactly these two messages in order. var first = probe.ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(first.Stalled); var second = probe.ExpectMsg(TimeSpan.FromSeconds(5)); Assert.False(second.Stalled); Assert.Equal("siteA", second.SiteId); } }