From c763bd9a047bf5712afed088e3c010cb2b06f411 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 18:10:42 -0400 Subject: [PATCH] feat(auditlog): SiteAuditReconciliationActor central singleton (#23 M6) --- .../Central/IPullAuditEventsClient.cs | 45 ++ .../Central/ISiteEnumerator.cs | 34 ++ .../Central/SiteAuditReconciliationActor.cs | 324 +++++++++++++ .../Central/SiteAuditReconciliationOptions.cs | 60 +++ .../SiteAuditReconciliationActorTests.cs | 438 ++++++++++++++++++ 5 files changed, 901 insertions(+) create mode 100644 src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs create mode 100644 src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs create mode 100644 src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs create mode 100644 src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs diff --git a/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs b/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs new file mode 100644 index 0000000..e094e48 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs @@ -0,0 +1,45 @@ +using ScadaLink.Commons.Messages.Integration; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Mockable abstraction over the central-side PullAuditEvents gRPC +/// client surface that uses to +/// fetch the next reconciliation batch from a specific site. Extracted so the +/// actor can be unit-tested against an in-memory stub without standing up a +/// real GrpcChannel per site. +/// +/// +/// +/// The production implementation (host wiring task) wraps the auto-generated +/// SiteStreamService.SiteStreamServiceClient, multiplexing one +/// GrpcChannel per site keyed on +/// . Until that wiring lands the DI +/// composition root binds a NoOp default that returns an empty response — the +/// reconciliation tick is still scheduled and the cursor logic still runs, so +/// regressions in the actor itself are caught even before the real client +/// arrives. +/// +/// +/// Implementations MUST NOT throw on transport faults that the actor can +/// tolerate (connection refused, deadline exceeded). The actor's contract is +/// "one site's failure doesn't sink the rest of the tick"; an exception still +/// won't crash the actor (the per-site try/catch catches it), but returning +/// an empty response on a known-recoverable error keeps the logs cleaner. +/// +/// +public interface IPullAuditEventsClient +{ + /// + /// Issues a PullAuditEvents RPC against the site whose endpoint + /// is registered against . Returns the next + /// batch of + /// rows ordered oldest-first AND a MoreAvailable flag the actor + /// uses to decide whether to fire another pull immediately. + /// + Task PullAsync( + string siteId, + DateTime sinceUtc, + int batchSize, + CancellationToken ct); +} diff --git a/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs b/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs new file mode 100644 index 0000000..9e9607c --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs @@ -0,0 +1,34 @@ +namespace ScadaLink.AuditLog.Central; + +/// +/// Enumeration surface consumed by to +/// discover which sites to poll on each reconciliation tick. Extracted so the +/// actor can be unit-tested against a static list without depending on the +/// production ISiteRepository + EF Core DbContext. +/// +/// +/// The production implementation wraps ISiteRepository.GetAllSitesAsync +/// and projects each Site to a using the +/// site's configured GrpcNodeAAddress (falling back to +/// GrpcNodeBAddress when NodeA is unset). Sites with NO gRPC address +/// configured are silently skipped — the reconciliation pull cannot reach +/// them, but absence of an address is a configuration decision, not a runtime +/// error. +/// +public interface ISiteEnumerator +{ + /// + /// Returns the current set of sites the reconciliation puller should visit + /// on the next tick. Implementations should reflect adds/removes promptly + /// — the actor calls this once per tick. + /// + Task> EnumerateAsync(CancellationToken ct = default); +} + +/// +/// One reconciliation target: the site identifier the actor uses as the +/// cursor key and the gRPC endpoint dials +/// to issue the pull. Endpoint is the bare authority (e.g. http://siteA:8083); +/// transport selection (TLS, keepalive, etc.) is the client's concern. +/// +public sealed record SiteEntry(string SiteId, string GrpcEndpoint); diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs new file mode 100644 index 0000000..6460c4d --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs @@ -0,0 +1,324 @@ +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Central singleton (M6 Bundle B) that drives the audit-log reconciliation +/// pull loop. On a configurable timer (default 5 minutes) the actor walks every +/// known site, asks the site for any rows with +/// >= the site's last reconciled +/// cursor, ingests them idempotently into the central +/// , and advances the cursor. +/// +/// +/// +/// Self-healing telemetry, not a dispatcher. The push path +/// ( + +/// IngestAuditEvents) is the primary mechanism. This actor exists so a +/// missed push (gRPC blip, central restart, site offline) is eventually +/// repaired by central re-pulling whatever the site still has in +/// Pending/Forwarded state. Idempotency on +/// (M2 Bundle A's race-fix) makes duplicate +/// arrivals from both paths a silent no-op. +/// +/// +/// Cursor lifetime. The per-site LastReconciledAt watermark is +/// kept in-memory for the actor's lifetime. The cluster singleton normally +/// survives the host process; on a deliberate failover OR a singleton restart +/// the cursors reset to . That is conservative +/// but correct — the next tick simply asks for everything the site still has, +/// and idempotent ingest swallows the dupes. Persisting cursors to MS SQL was +/// considered and rejected for M6: the cost of a write per tick outweighs the +/// rare benefit of avoiding one over-broad pull after a restart. +/// +/// +/// Stalled detection. The brief calls a site "stalled" when two +/// consecutive pull cycles BOTH return non-empty AND MoreAvailable=true +/// — i.e. the backlog isn't draining. The actor publishes +/// on the actor system's +/// EventStream so a future ICentralHealthCollector bridge (M6 Bundle E) +/// can flip the health metric without coupling this actor to the health +/// collection surface today. +/// +/// +/// Failure isolation. A single site that throws (DNS, transport, +/// repository write) must NOT prevent other sites from being polled on the +/// same tick. The per-site work runs inside its own try/catch; the actor's +/// supervisor strategy keeps it alive across any leaked exception with +/// 's Restart +/// semantics — restart resets the in-memory cursors, but as noted above that's +/// a safe (over-pull, idempotent) recovery. +/// +/// +/// DI scopes. is a scoped EF Core +/// service registered by AddConfigurationDatabase. The singleton actor +/// opens one DI scope per tick and reuses the same repository across all +/// sites in that tick — one DbContext per tick mirrors the +/// AuditLogIngestActor + NotificationOutboxActor pattern. +/// +/// +public class SiteAuditReconciliationActor : ReceiveActor +{ + private readonly ISiteEnumerator _sites; + private readonly IPullAuditEventsClient _client; + private readonly IServiceProvider _services; + private readonly SiteAuditReconciliationOptions _options; + private readonly ILogger _logger; + + /// + /// Per-site reconciliation watermark — the highest + /// seen for that site on a previous + /// tick. Asking for OccurredAtUtc >= cursor rather than > + /// is the site contract (); + /// duplicate-with-same-timestamp rows are filtered out by the idempotent + /// repository write. + /// + private readonly Dictionary _cursors = new(); + + /// + /// Per-site count of consecutive non-draining cycles. Resets to zero on the + /// first draining (or empty) cycle. + /// + private readonly Dictionary _nonDrainingCycles = new(); + + /// + /// Per-site latched stalled state — used so the actor only publishes a + /// transition when the + /// stalled flag actually changes, not on every tick while stalled. + /// + private readonly Dictionary _stalled = new(); + + private ICancelable? _timer; + + public SiteAuditReconciliationActor( + ISiteEnumerator sites, + IPullAuditEventsClient client, + IServiceProvider services, + IOptions options, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(sites); + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(logger); + + _sites = sites; + _client = client; + _services = services; + _options = options.Value; + _logger = logger; + + ReceiveAsync(_ => OnTickAsync()); + } + + protected override void PreStart() + { + base.PreStart(); + var interval = _options.ReconciliationInterval; + _timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + initialDelay: interval, + interval: interval, + receiver: Self, + message: ReconciliationTick.Instance, + sender: Self); + } + + protected override void PostStop() + { + _timer?.Cancel(); + base.PostStop(); + } + + private async Task OnTickAsync() + { + IReadOnlyList sites; + try + { + sites = await _sites.EnumerateAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Site enumeration failed; skipping reconciliation tick."); + return; + } + + if (sites.Count == 0) + { + return; + } + + IServiceScope? scope = null; + IAuditLogRepository repository; + try + { + scope = _services.CreateScope(); + repository = scope.ServiceProvider.GetRequiredService(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to resolve IAuditLogRepository for reconciliation tick."); + scope?.Dispose(); + return; + } + + try + { + foreach (var site in sites) + { + try + { + await PullSiteAsync(site, repository).ConfigureAwait(false); + } + catch (Exception ex) + { + // Catch-all per the failure-isolation invariant: one site's + // fault must not sink the rest of the tick. The cursor for + // the failing site is left at its previous value so the + // next tick retries the same window. + _logger.LogWarning( + ex, + "Reconciliation pull failed for site {SiteId}; other sites continue.", + site.SiteId); + } + } + } + finally + { + scope.Dispose(); + } + } + + /// + /// Issues one PullAuditEvents RPC against the site, ingests the + /// returned rows idempotently into the central repository, and advances + /// the cursor based on the maximum + /// observed. The brief's "saturate until backlog clears" intent is met by + /// the natural cadence — each tick issues one pull, and a backed-up site + /// drains across consecutive ticks. The stalled signal (two non-draining + /// ticks in a row) surfaces when that drain isn't keeping up. + /// + private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository) + { + var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue; + var response = await _client.PullAsync( + site.SiteId, since, _options.BatchSize, CancellationToken.None) + .ConfigureAwait(false); + + var maxOccurred = since; + var nowUtc = DateTime.UtcNow; + foreach (var evt in response.Events) + { + try + { + // Idempotent repository write: duplicate EventIds (from a + // concurrent push, or a retry of this very pull) collapse to + // a no-op courtesy of M2 Bundle A's race-fix on + // InsertIfNotExistsAsync. + var ingested = evt with { IngestedAtUtc = nowUtc }; + await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false); + } + catch (Exception ex) + { + // Per-row catch so one bad event does not abandon the rest of + // the batch. The cursor still advances based on OccurredAtUtc + // — the row was returned by the site, so the next tick won't + // re-fetch it; if it permanently fails to persist, that's an + // operational concern surfaced by the log, not a hot-loop + // trigger. + _logger.LogError( + ex, + "Reconciliation ingest failed for AuditEvent {EventId} from site {SiteId}.", + evt.EventId, + site.SiteId); + } + + if (evt.OccurredAtUtc > maxOccurred) + { + maxOccurred = evt.OccurredAtUtc; + } + } + + _cursors[site.SiteId] = maxOccurred; + + var nonDraining = response.MoreAvailable && response.Events.Count > 0; + UpdateStalledState(site.SiteId, draining: !nonDraining); + } + + /// + /// Flips the per-site stalled flag based on whether this tick drained the + /// queue. A "draining" cycle is one where the server reported no more rows + /// available OR returned zero events. A "non-draining" cycle is the + /// inverse (events returned AND MoreAvailable=true). + /// + /// + /// The state machine: counter increments on each consecutive non-draining + /// tick. On reaching + /// the actor latches Stalled=true and publishes the transition; on + /// any subsequent draining tick the counter resets to zero AND, if the + /// latch is currently true, the actor publishes Stalled=false. Only + /// transitions are published — repeated ticks in the same state are + /// silent so a downstream subscriber doesn't see a flood of redundant + /// notifications. + /// + private void UpdateStalledState(string siteId, bool draining) + { + var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior; + + if (draining) + { + _nonDrainingCycles[siteId] = 0; + if (wasStalled) + { + _stalled[siteId] = false; + Context.System.EventStream.Publish( + new SiteAuditTelemetryStalledChanged(siteId, Stalled: false)); + } + return; + } + + var consecutive = _nonDrainingCycles.GetValueOrDefault(siteId) + 1; + _nonDrainingCycles[siteId] = consecutive; + + if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled) + { + _stalled[siteId] = true; + Context.System.EventStream.Publish( + new SiteAuditTelemetryStalledChanged(siteId, Stalled: true)); + } + } + + /// + /// Resume on any unhandled exception inside the receive — the singleton + /// MUST stay alive even if the per-tick try/catch leaks. Restart would + /// reset the cursors (safe but wasteful); Resume preserves them. + /// + protected override SupervisorStrategy SupervisorStrategy() + { + return new OneForOneStrategy( + maxNrOfRetries: 0, + withinTimeRange: TimeSpan.Zero, + decider: Akka.Actor.SupervisorStrategy.DefaultDecider); + } + + /// Self-tick triggering a reconciliation pass across all sites. + internal sealed class ReconciliationTick + { + public static readonly ReconciliationTick Instance = new(); + private ReconciliationTick() { } + } +} + +/// +/// Published on the actor system EventStream when a site's reconciliation +/// puller transitions into or out of the "stalled" state (backlog not +/// draining across multiple cycles). The M6 Bundle E central health collector +/// will subscribe to this and surface +/// SiteAuditTelemetryStalled on the health-report payload. +/// +public sealed record SiteAuditTelemetryStalledChanged(string SiteId, bool Stalled); diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs new file mode 100644 index 0000000..d32c5e6 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs @@ -0,0 +1,60 @@ +namespace ScadaLink.AuditLog.Central; + +/// +/// Tuning knobs for the central singleton. +/// Defaults mirror the M6 Bundle B brief: pull every 5 minutes per site, 256 rows per +/// batch, declare a site "stalled" after two consecutive pull cycles return non-empty +/// AND MoreAvailable=true (the backlog is not draining). +/// +/// +/// +/// Per the M6 plan the reconciliation actor is the fallback when push telemetry is +/// lost; it is intentionally low-frequency. Lowering +/// in production trades MS SQL load for +/// fresher self-healing — keep the default unless a deployment can prove the extra +/// load is acceptable. +/// +/// +/// = 2 because a single non-draining +/// cycle can happen on a surge (e.g. a backed-up site replays its hot queue); the +/// stalled signal should only fire when the backlog persists across cycles, which is +/// the symptom the central health surface is asking us to detect. +/// +/// +public sealed class SiteAuditReconciliationOptions +{ + /// + /// Period of the reconciliation tick. Each tick visits every known site once. + /// + public int ReconciliationIntervalSeconds { get; set; } = 300; + + /// + /// Test-only override for finer control over the tick cadence than + /// whole-second resolution allows. When non-null, takes precedence over + /// . Not bound from config — + /// production config exposes + /// only. + /// + public TimeSpan? ReconciliationIntervalOverride { get; set; } + + /// + /// Resolves the effective tick interval, honouring the test override when + /// set. Falls back to . + /// + public TimeSpan ReconciliationInterval => + ReconciliationIntervalOverride ?? TimeSpan.FromSeconds(ReconciliationIntervalSeconds); + + /// + /// Maximum number of + /// rows requested in a single PullAuditEvents RPC call. + /// + public int BatchSize { get; set; } = 256; + + /// + /// Number of consecutive non-draining cycles (events returned AND + /// MoreAvailable=true) that must accumulate for a site before the actor + /// publishes SiteAuditTelemetryStalledChanged(Stalled: true) on the + /// EventStream. + /// + public int StalledAfterNonDrainingCycles { get; set; } = 2; +} diff --git a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs new file mode 100644 index 0000000..2d77dcd --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs @@ -0,0 +1,438 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Central; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.AuditLog.Tests.Central; + +/// +/// Bundle B (M6-T3) tests for . Most +/// tests substitute the with an in-memory +/// recording stub so the actor's tick / cursor / stalled state machinery can +/// be exercised in milliseconds without an MSSQL container. The duplicate / +/// idempotency assertion uses the real against +/// the so we verify InsertIfNotExistsAsync +/// actually swallows duplicate-key collisions (the M2 Bundle A race-fix the +/// reconciliation puller depends on). +/// +public class SiteAuditReconciliationActorTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public SiteAuditReconciliationActorTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private static AuditEvent NewEvent( + string siteId, + DateTime? occurredAt = null, + Guid? id = null) => new() + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = occurredAt ?? new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = siteId, + }; + + private static SiteAuditReconciliationOptions FastTickOptions( + int batchSize = 256, + int stalledAfter = 2) => + new() + { + // 100 ms tick keeps each test under a second. AwaitAssert covers + // schedule jitter so a 100 ms tick has up to ~3 s to fire. + ReconciliationIntervalSeconds = 300, + ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100), + BatchSize = batchSize, + StalledAfterNonDrainingCycles = stalledAfter, + }; + + /// + /// In-memory recording stub used for non-MSSQL tests. Captures every + /// call AND deduplicates on + /// so duplicate-handling assertions don't + /// need a real database for the simple cases. + /// + private sealed class RecordingRepo : IAuditLogRepository + { + public List Inserted { get; } = new(); + private readonly HashSet _seen = new(); + public int InsertCallCount { get; private set; } + + public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) + { + InsertCallCount++; + if (_seen.Add(evt.EventId)) + { + Inserted.Add(evt); + } + return Task.CompletedTask; + } + + public Task> QueryAsync( + AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => + Task.FromResult>(Inserted); + + public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => + Task.CompletedTask; + } + + /// + /// In-memory enumerator returning a static list of sites. + /// + private sealed class StaticEnumerator : ISiteEnumerator + { + private readonly IReadOnlyList _sites; + public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult(_sites); + } + + /// + /// Scripted pull client — returns the next queued response for the site + /// on each call, looping the last entry if the queue is exhausted. Also + /// records every invocation so tests can assert call counts + arguments. + /// + private sealed class ScriptedPullClient : IPullAuditEventsClient + { + public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new(); + private readonly Dictionary> _scripted = new(); + private readonly Dictionary _throwOnSite = new(); + + public ScriptedPullClient Script(string siteId, params PullAuditEventsResponse[] responses) + { + _scripted[siteId] = new Queue(responses); + return this; + } + + public ScriptedPullClient ThrowFor(string siteId, Exception ex) + { + _throwOnSite[siteId] = ex; + return this; + } + + public Task PullAsync( + string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct) + { + Calls.Add((siteId, sinceUtc, batchSize)); + if (_throwOnSite.TryGetValue(siteId, out var ex)) + { + throw ex; + } + if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0) + { + return Task.FromResult(queue.Dequeue()); + } + return Task.FromResult( + new PullAuditEventsResponse(Array.Empty(), MoreAvailable: false)); + } + } + + private IServiceProvider BuildScopedProvider(IAuditLogRepository repo) + { + var services = new ServiceCollection(); + // The actor opens a scope per tick and resolves IAuditLogRepository + // from that scope; registering as scoped mirrors how + // AddConfigurationDatabase wires the real repository. + services.AddScoped(_ => repo); + return services.BuildServiceProvider(); + } + + private IActorRef CreateActor( + ISiteEnumerator sites, + IPullAuditEventsClient client, + IAuditLogRepository repo, + SiteAuditReconciliationOptions options) + { + var sp = BuildScopedProvider(repo); + return Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor( + sites, + client, + sp, + Options.Create(options), + NullLogger.Instance))); + } + + /// + /// Subscribes to the EventStream and collects every + /// publication into a list + /// the test can assert on. Uses a probe actor so the stream's + /// fire-and-forget delivery is observable from the test thread. + /// + private (Akka.TestKit.TestProbe Probe, List Captured) SubscribeStalled() + { + var probe = CreateTestProbe(); + Sys.EventStream.Subscribe(probe.Ref, typeof(SiteAuditTelemetryStalledChanged)); + var captured = new List(); + return (probe, captured); + } + + // --------------------------------------------------------------------- + // 1. Timer_Fires_OnConfiguredInterval + // --------------------------------------------------------------------- + + [Fact] + public void Timer_Fires_OnConfiguredInterval() + { + var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); + var client = new ScriptedPullClient(); + var repo = new RecordingRepo(); + var opts = FastTickOptions(); + + CreateActor(sites, client, repo, opts); + + // The first scheduled tick fires after `ReconciliationIntervalSeconds`, + // which is 0 for the test — Akka's scheduler still respects the + // ScheduleTellRepeatedlyCancelable contract that issues a Tell on the + // scheduler thread, so we await visible side effects (a PullAsync call) + // rather than racing on internal state. + AwaitAssert( + () => Assert.True(client.Calls.Count >= 1, $"expected >= 1 pull call, got {client.Calls.Count}"), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + // --------------------------------------------------------------------- + // 2. Tick_PullsFromEachKnownSite + // --------------------------------------------------------------------- + + [Fact] + public void Tick_PullsFromEachKnownSite() + { + var sites = new StaticEnumerator( + new SiteEntry("siteA", "http://siteA:8083"), + new SiteEntry("siteB", "http://siteB:8083")); + var client = new ScriptedPullClient(); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert(() => + { + Assert.Contains(client.Calls, c => c.SiteId == "siteA"); + Assert.Contains(client.Calls, c => c.SiteId == "siteB"); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + // --------------------------------------------------------------------- + // 3. Tick_IngestEvents_ViaInsertIfNotExistsAsync + // --------------------------------------------------------------------- + + [Fact] + public void Tick_IngestEvents_ViaInsertIfNotExistsAsync() + { + var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); + var e1 = NewEvent("siteA"); + var e2 = NewEvent("siteA"); + var client = new ScriptedPullClient().Script("siteA", + new PullAuditEventsResponse(new[] { e1, e2 }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert(() => Assert.Equal(2, repo.InsertCallCount), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + Assert.Contains(repo.Inserted, e => e.EventId == e1.EventId); + Assert.Contains(repo.Inserted, e => e.EventId == e2.EventId); + } + + // --------------------------------------------------------------------- + // 4. Tick_Duplicates_NotDoubleInserted (real MSSQL idempotency) + // --------------------------------------------------------------------- + + private ScadaLinkDbContext CreateContext() => + new(new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString).Options); + + [SkippableFact] + public async Task Tick_Duplicates_NotDoubleInserted() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = "bundle-b-" + Guid.NewGuid().ToString("N").Substring(0, 8); + var pre = NewEvent(siteId); + + // Seed the row directly so the actor sees it already present when the + // pull returns it. + await using (var seedContext = CreateContext()) + { + await new AuditLogRepository(seedContext).InsertIfNotExistsAsync(pre); + } + + // Stack one new and the pre-existing row in the pull response. The + // second-pull script returns empty so the actor settles. + var fresh = NewEvent(siteId); + var sites = new StaticEnumerator(new SiteEntry(siteId, "http://x:8083")); + var client = new ScriptedPullClient().Script(siteId, + new PullAuditEventsResponse(new[] { pre, fresh }, MoreAvailable: false)); + + await using var context = CreateContext(); + var repo = new AuditLogRepository(context); + + CreateActor(sites, client, repo, FastTickOptions()); + + // Wait for the actor to ingest both rows. + await Task.Delay(TimeSpan.FromSeconds(1)); + AwaitAssert(() => Assert.True(client.Calls.Count >= 1), + duration: TimeSpan.FromSeconds(3)); + + // Even though the pull returned 2 events, only 1 fresh row should + // exist in MSSQL alongside the pre-existing one — InsertIfNotExistsAsync + // is first-write-wins on EventId. + await using var read = CreateContext(); + var rows = await read.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(2, rows.Count); + Assert.Contains(rows, r => r.EventId == pre.EventId); + Assert.Contains(rows, r => r.EventId == fresh.EventId); + } + + // --------------------------------------------------------------------- + // 5. Cursor_Advances_ToMaxOccurredAtUtc + // --------------------------------------------------------------------- + + [Fact] + public void Cursor_Advances_ToMaxOccurredAtUtc() + { + var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); + + var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc); + var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc); + var e1 = NewEvent("siteA", t1); + var e2 = NewEvent("siteA", t2); + var e3 = NewEvent("siteA", t3); + + // First pull returns three events with t1, t2, t3. Subsequent pulls + // return empty — but the test asserts the SECOND pull's since argument + // is t3 (the max OccurredAtUtc from the first pull). + var client = new ScriptedPullClient().Script("siteA", + new PullAuditEventsResponse(new[] { e1, e2, e3 }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + // Wait until we have at least two pulls — the second one must use t3 + // as its `since` argument because that was the max OccurredAtUtc in + // the first response. + AwaitAssert(() => Assert.True(client.Calls.Count >= 2, + $"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"), + duration: TimeSpan.FromSeconds(5), + interval: TimeSpan.FromMilliseconds(50)); + + Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc); + Assert.Equal(t3, client.Calls[1].SinceUtc); + } + + // --------------------------------------------------------------------- + // 6. Tick_OneSiteThrows_OtherSitesStillProcessed + // --------------------------------------------------------------------- + + [Fact] + public void Tick_OneSiteThrows_OtherSitesStillProcessed() + { + var sites = new StaticEnumerator( + new SiteEntry("siteA", "http://siteA:8083"), + new SiteEntry("siteB", "http://siteB:8083")); + + var bEvent = NewEvent("siteB"); + var client = new ScriptedPullClient() + .ThrowFor("siteA", new InvalidOperationException("simulated transport failure")) + .Script("siteB", + new PullAuditEventsResponse(new[] { bEvent }, MoreAvailable: false)); + var repo = new RecordingRepo(); + + CreateActor(sites, client, repo, FastTickOptions()); + + AwaitAssert(() => + { + Assert.Contains(client.Calls, c => c.SiteId == "siteA"); + Assert.Contains(repo.Inserted, e => e.EventId == bEvent.EventId); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + // --------------------------------------------------------------------- + // 7. StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue + // --------------------------------------------------------------------- + + [Fact] + public void StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue() + { + var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); + + // Two scripted responses that each return events AND MoreAvailable=true + // — the second pull triggers the stalled transition. + var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); + var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); + var client = new ScriptedPullClient().Script("siteA", + new PullAuditEventsResponse(batch1, MoreAvailable: true), + new PullAuditEventsResponse(batch2, MoreAvailable: true)); + + var repo = new RecordingRepo(); + var (probe, _) = SubscribeStalled(); + + CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2)); + + // Expect Stalled=true after the second non-draining tick. The probe + // waits with its own timeout (a few seconds gives the 0 s repeat + // interval ample slack). + var msg = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("siteA", msg.SiteId); + Assert.True(msg.Stalled); + } + + // --------------------------------------------------------------------- + // 8. StalledDetection_DrainingCycle_PublishesStalledFalse + // --------------------------------------------------------------------- + + [Fact] + public void StalledDetection_DrainingCycle_PublishesStalledFalse() + { + var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083")); + + // Two non-draining responses get the actor into Stalled=true, then a + // draining response (events but MoreAvailable=false) flips it back. + var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); + var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); + var batch3 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray(); + var client = new ScriptedPullClient().Script("siteA", + new PullAuditEventsResponse(batch1, MoreAvailable: true), + new PullAuditEventsResponse(batch2, MoreAvailable: true), + new PullAuditEventsResponse(batch3, MoreAvailable: false)); + + var repo = new RecordingRepo(); + var (probe, _) = SubscribeStalled(); + + CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2)); + + // First publication is the stalled=true transition; second is the + // back-to-draining flip. The actor publishes ONLY on transitions so we + // expect exactly these two messages in order. + var first = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(first.Stalled); + + var second = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.False(second.Stalled); + Assert.Equal("siteA", second.SiteId); + } +}