diff --git a/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs b/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs
new file mode 100644
index 0000000..e094e48
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs
@@ -0,0 +1,45 @@
+using ScadaLink.Commons.Messages.Integration;
+
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Mockable abstraction over the central-side PullAuditEvents gRPC
+/// client surface that uses to
+/// fetch the next reconciliation batch from a specific site. Extracted so the
+/// actor can be unit-tested against an in-memory stub without standing up a
+/// real GrpcChannel per site.
+///
+///
+///
+/// The production implementation (host wiring task) wraps the auto-generated
+/// SiteStreamService.SiteStreamServiceClient, multiplexing one
+/// GrpcChannel per site keyed on
+/// . Until that wiring lands the DI
+/// composition root binds a NoOp default that returns an empty response — the
+/// reconciliation tick is still scheduled and the cursor logic still runs, so
+/// regressions in the actor itself are caught even before the real client
+/// arrives.
+///
+///
+/// Implementations MUST NOT throw on transport faults that the actor can
+/// tolerate (connection refused, deadline exceeded). The actor's contract is
+/// "one site's failure doesn't sink the rest of the tick"; an exception still
+/// won't crash the actor (the per-site try/catch catches it), but returning
+/// an empty response on a known-recoverable error keeps the logs cleaner.
+///
+///
+public interface IPullAuditEventsClient
+{
+ ///
+ /// Issues a PullAuditEvents RPC against the site whose endpoint
+ /// is registered against . Returns the next
+ /// batch of
+ /// rows ordered oldest-first AND a MoreAvailable flag the actor
+ /// uses to decide whether to fire another pull immediately.
+ ///
+ Task PullAsync(
+ string siteId,
+ DateTime sinceUtc,
+ int batchSize,
+ CancellationToken ct);
+}
diff --git a/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs b/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs
new file mode 100644
index 0000000..9e9607c
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs
@@ -0,0 +1,34 @@
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Enumeration surface consumed by to
+/// discover which sites to poll on each reconciliation tick. Extracted so the
+/// actor can be unit-tested against a static list without depending on the
+/// production ISiteRepository + EF Core DbContext.
+///
+///
+/// The production implementation wraps ISiteRepository.GetAllSitesAsync
+/// and projects each Site to a using the
+/// site's configured GrpcNodeAAddress (falling back to
+/// GrpcNodeBAddress when NodeA is unset). Sites with NO gRPC address
+/// configured are silently skipped — the reconciliation pull cannot reach
+/// them, but absence of an address is a configuration decision, not a runtime
+/// error.
+///
+public interface ISiteEnumerator
+{
+ ///
+ /// Returns the current set of sites the reconciliation puller should visit
+ /// on the next tick. Implementations should reflect adds/removes promptly
+ /// — the actor calls this once per tick.
+ ///
+ Task> EnumerateAsync(CancellationToken ct = default);
+}
+
+///
+/// One reconciliation target: the site identifier the actor uses as the
+/// cursor key and the gRPC endpoint dials
+/// to issue the pull. Endpoint is the bare authority (e.g. http://siteA:8083);
+/// transport selection (TLS, keepalive, etc.) is the client's concern.
+///
+public sealed record SiteEntry(string SiteId, string GrpcEndpoint);
diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs
new file mode 100644
index 0000000..6460c4d
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs
@@ -0,0 +1,324 @@
+using Akka.Actor;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Repositories;
+
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Central singleton (M6 Bundle B) that drives the audit-log reconciliation
+/// pull loop. On a configurable timer (default 5 minutes) the actor walks every
+/// known site, asks the site for any rows with
+/// >= the site's last reconciled
+/// cursor, ingests them idempotently into the central
+/// , and advances the cursor.
+///
+///
+///
+/// Self-healing telemetry, not a dispatcher. The push path
+/// ( +
+/// IngestAuditEvents) is the primary mechanism. This actor exists so a
+/// missed push (gRPC blip, central restart, site offline) is eventually
+/// repaired by central re-pulling whatever the site still has in
+/// Pending/Forwarded state. Idempotency on
+/// (M2 Bundle A's race-fix) makes duplicate
+/// arrivals from both paths a silent no-op.
+///
+///
+/// Cursor lifetime. The per-site LastReconciledAt watermark is
+/// kept in-memory for the actor's lifetime. The cluster singleton normally
+/// survives the host process; on a deliberate failover OR a singleton restart
+/// the cursors reset to . That is conservative
+/// but correct — the next tick simply asks for everything the site still has,
+/// and idempotent ingest swallows the dupes. Persisting cursors to MS SQL was
+/// considered and rejected for M6: the cost of a write per tick outweighs the
+/// rare benefit of avoiding one over-broad pull after a restart.
+///
+///
+/// Stalled detection. The brief calls a site "stalled" when two
+/// consecutive pull cycles BOTH return non-empty AND MoreAvailable=true
+/// — i.e. the backlog isn't draining. The actor publishes
+/// on the actor system's
+/// EventStream so a future ICentralHealthCollector bridge (M6 Bundle E)
+/// can flip the health metric without coupling this actor to the health
+/// collection surface today.
+///
+///
+/// Failure isolation. A single site that throws (DNS, transport,
+/// repository write) must NOT prevent other sites from being polled on the
+/// same tick. The per-site work runs inside its own try/catch; the actor's
+/// supervisor strategy keeps it alive across any leaked exception with
+/// 's Restart
+/// semantics — restart resets the in-memory cursors, but as noted above that's
+/// a safe (over-pull, idempotent) recovery.
+///
+///
+/// DI scopes. is a scoped EF Core
+/// service registered by AddConfigurationDatabase. The singleton actor
+/// opens one DI scope per tick and reuses the same repository across all
+/// sites in that tick — one DbContext per tick mirrors the
+/// AuditLogIngestActor + NotificationOutboxActor pattern.
+///
+///
+public class SiteAuditReconciliationActor : ReceiveActor
+{
+ private readonly ISiteEnumerator _sites;
+ private readonly IPullAuditEventsClient _client;
+ private readonly IServiceProvider _services;
+ private readonly SiteAuditReconciliationOptions _options;
+ private readonly ILogger _logger;
+
+ ///
+ /// Per-site reconciliation watermark — the highest
+ /// seen for that site on a previous
+ /// tick. Asking for OccurredAtUtc >= cursor rather than >
+ /// is the site contract ();
+ /// duplicate-with-same-timestamp rows are filtered out by the idempotent
+ /// repository write.
+ ///
+ private readonly Dictionary _cursors = new();
+
+ ///
+ /// Per-site count of consecutive non-draining cycles. Resets to zero on the
+ /// first draining (or empty) cycle.
+ ///
+ private readonly Dictionary _nonDrainingCycles = new();
+
+ ///
+ /// Per-site latched stalled state — used so the actor only publishes a
+ /// transition when the
+ /// stalled flag actually changes, not on every tick while stalled.
+ ///
+ private readonly Dictionary _stalled = new();
+
+ private ICancelable? _timer;
+
+ public SiteAuditReconciliationActor(
+ ISiteEnumerator sites,
+ IPullAuditEventsClient client,
+ IServiceProvider services,
+ IOptions options,
+ ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(sites);
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(options);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ _sites = sites;
+ _client = client;
+ _services = services;
+ _options = options.Value;
+ _logger = logger;
+
+ ReceiveAsync(_ => OnTickAsync());
+ }
+
+ protected override void PreStart()
+ {
+ base.PreStart();
+ var interval = _options.ReconciliationInterval;
+ _timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
+ initialDelay: interval,
+ interval: interval,
+ receiver: Self,
+ message: ReconciliationTick.Instance,
+ sender: Self);
+ }
+
+ protected override void PostStop()
+ {
+ _timer?.Cancel();
+ base.PostStop();
+ }
+
+ private async Task OnTickAsync()
+ {
+ IReadOnlyList sites;
+ try
+ {
+ sites = await _sites.EnumerateAsync().ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Site enumeration failed; skipping reconciliation tick.");
+ return;
+ }
+
+ if (sites.Count == 0)
+ {
+ return;
+ }
+
+ IServiceScope? scope = null;
+ IAuditLogRepository repository;
+ try
+ {
+ scope = _services.CreateScope();
+ repository = scope.ServiceProvider.GetRequiredService();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to resolve IAuditLogRepository for reconciliation tick.");
+ scope?.Dispose();
+ return;
+ }
+
+ try
+ {
+ foreach (var site in sites)
+ {
+ try
+ {
+ await PullSiteAsync(site, repository).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // Catch-all per the failure-isolation invariant: one site's
+ // fault must not sink the rest of the tick. The cursor for
+ // the failing site is left at its previous value so the
+ // next tick retries the same window.
+ _logger.LogWarning(
+ ex,
+ "Reconciliation pull failed for site {SiteId}; other sites continue.",
+ site.SiteId);
+ }
+ }
+ }
+ finally
+ {
+ scope.Dispose();
+ }
+ }
+
+ ///
+ /// Issues one PullAuditEvents RPC against the site, ingests the
+ /// returned rows idempotently into the central repository, and advances
+ /// the cursor based on the maximum
+ /// observed. The brief's "saturate until backlog clears" intent is met by
+ /// the natural cadence — each tick issues one pull, and a backed-up site
+ /// drains across consecutive ticks. The stalled signal (two non-draining
+ /// ticks in a row) surfaces when that drain isn't keeping up.
+ ///
+ private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository)
+ {
+ var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
+ var response = await _client.PullAsync(
+ site.SiteId, since, _options.BatchSize, CancellationToken.None)
+ .ConfigureAwait(false);
+
+ var maxOccurred = since;
+ var nowUtc = DateTime.UtcNow;
+ foreach (var evt in response.Events)
+ {
+ try
+ {
+ // Idempotent repository write: duplicate EventIds (from a
+ // concurrent push, or a retry of this very pull) collapse to
+ // a no-op courtesy of M2 Bundle A's race-fix on
+ // InsertIfNotExistsAsync.
+ var ingested = evt with { IngestedAtUtc = nowUtc };
+ await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // Per-row catch so one bad event does not abandon the rest of
+ // the batch. The cursor still advances based on OccurredAtUtc
+ // — the row was returned by the site, so the next tick won't
+ // re-fetch it; if it permanently fails to persist, that's an
+ // operational concern surfaced by the log, not a hot-loop
+ // trigger.
+ _logger.LogError(
+ ex,
+ "Reconciliation ingest failed for AuditEvent {EventId} from site {SiteId}.",
+ evt.EventId,
+ site.SiteId);
+ }
+
+ if (evt.OccurredAtUtc > maxOccurred)
+ {
+ maxOccurred = evt.OccurredAtUtc;
+ }
+ }
+
+ _cursors[site.SiteId] = maxOccurred;
+
+ var nonDraining = response.MoreAvailable && response.Events.Count > 0;
+ UpdateStalledState(site.SiteId, draining: !nonDraining);
+ }
+
+ ///
+ /// Flips the per-site stalled flag based on whether this tick drained the
+ /// queue. A "draining" cycle is one where the server reported no more rows
+ /// available OR returned zero events. A "non-draining" cycle is the
+ /// inverse (events returned AND MoreAvailable=true).
+ ///
+ ///
+ /// The state machine: counter increments on each consecutive non-draining
+ /// tick. On reaching
+ /// the actor latches Stalled=true and publishes the transition; on
+ /// any subsequent draining tick the counter resets to zero AND, if the
+ /// latch is currently true, the actor publishes Stalled=false. Only
+ /// transitions are published — repeated ticks in the same state are
+ /// silent so a downstream subscriber doesn't see a flood of redundant
+ /// notifications.
+ ///
+ private void UpdateStalledState(string siteId, bool draining)
+ {
+ var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior;
+
+ if (draining)
+ {
+ _nonDrainingCycles[siteId] = 0;
+ if (wasStalled)
+ {
+ _stalled[siteId] = false;
+ Context.System.EventStream.Publish(
+ new SiteAuditTelemetryStalledChanged(siteId, Stalled: false));
+ }
+ return;
+ }
+
+ var consecutive = _nonDrainingCycles.GetValueOrDefault(siteId) + 1;
+ _nonDrainingCycles[siteId] = consecutive;
+
+ if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled)
+ {
+ _stalled[siteId] = true;
+ Context.System.EventStream.Publish(
+ new SiteAuditTelemetryStalledChanged(siteId, Stalled: true));
+ }
+ }
+
+ ///
+ /// Resume on any unhandled exception inside the receive — the singleton
+ /// MUST stay alive even if the per-tick try/catch leaks. Restart would
+ /// reset the cursors (safe but wasteful); Resume preserves them.
+ ///
+ protected override SupervisorStrategy SupervisorStrategy()
+ {
+ return new OneForOneStrategy(
+ maxNrOfRetries: 0,
+ withinTimeRange: TimeSpan.Zero,
+ decider: Akka.Actor.SupervisorStrategy.DefaultDecider);
+ }
+
+ /// Self-tick triggering a reconciliation pass across all sites.
+ internal sealed class ReconciliationTick
+ {
+ public static readonly ReconciliationTick Instance = new();
+ private ReconciliationTick() { }
+ }
+}
+
+///
+/// Published on the actor system EventStream when a site's reconciliation
+/// puller transitions into or out of the "stalled" state (backlog not
+/// draining across multiple cycles). The M6 Bundle E central health collector
+/// will subscribe to this and surface
+/// SiteAuditTelemetryStalled on the health-report payload.
+///
+public sealed record SiteAuditTelemetryStalledChanged(string SiteId, bool Stalled);
diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs
new file mode 100644
index 0000000..d32c5e6
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationOptions.cs
@@ -0,0 +1,60 @@
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Tuning knobs for the central singleton.
+/// Defaults mirror the M6 Bundle B brief: pull every 5 minutes per site, 256 rows per
+/// batch, declare a site "stalled" after two consecutive pull cycles return non-empty
+/// AND MoreAvailable=true (the backlog is not draining).
+///
+///
+///
+/// Per the M6 plan the reconciliation actor is the fallback when push telemetry is
+/// lost; it is intentionally low-frequency. Lowering
+/// in production trades MS SQL load for
+/// fresher self-healing — keep the default unless a deployment can prove the extra
+/// load is acceptable.
+///
+///
+/// = 2 because a single non-draining
+/// cycle can happen on a surge (e.g. a backed-up site replays its hot queue); the
+/// stalled signal should only fire when the backlog persists across cycles, which is
+/// the symptom the central health surface is asking us to detect.
+///
+///
+public sealed class SiteAuditReconciliationOptions
+{
+ ///
+ /// Period of the reconciliation tick. Each tick visits every known site once.
+ ///
+ public int ReconciliationIntervalSeconds { get; set; } = 300;
+
+ ///
+ /// Test-only override for finer control over the tick cadence than
+ /// whole-second resolution allows. When non-null, takes precedence over
+ /// . Not bound from config —
+ /// production config exposes
+ /// only.
+ ///
+ public TimeSpan? ReconciliationIntervalOverride { get; set; }
+
+ ///
+ /// Resolves the effective tick interval, honouring the test override when
+ /// set. Falls back to .
+ ///
+ public TimeSpan ReconciliationInterval =>
+ ReconciliationIntervalOverride ?? TimeSpan.FromSeconds(ReconciliationIntervalSeconds);
+
+ ///
+ /// Maximum number of
+ /// rows requested in a single PullAuditEvents RPC call.
+ ///
+ public int BatchSize { get; set; } = 256;
+
+ ///
+ /// Number of consecutive non-draining cycles (events returned AND
+ /// MoreAvailable=true) that must accumulate for a site before the actor
+ /// publishes SiteAuditTelemetryStalledChanged(Stalled: true) on the
+ /// EventStream.
+ ///
+ public int StalledAfterNonDrainingCycles { get; set; } = 2;
+}
diff --git a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs
new file mode 100644
index 0000000..2d77dcd
--- /dev/null
+++ b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs
@@ -0,0 +1,438 @@
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using ScadaLink.AuditLog.Central;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Repositories;
+using ScadaLink.Commons.Messages.Integration;
+using ScadaLink.Commons.Types.Audit;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.ConfigurationDatabase;
+using ScadaLink.ConfigurationDatabase.Repositories;
+using ScadaLink.ConfigurationDatabase.Tests.Migrations;
+
+namespace ScadaLink.AuditLog.Tests.Central;
+
+///
+/// Bundle B (M6-T3) tests for . Most
+/// tests substitute the with an in-memory
+/// recording stub so the actor's tick / cursor / stalled state machinery can
+/// be exercised in milliseconds without an MSSQL container. The duplicate /
+/// idempotency assertion uses the real against
+/// the so we verify InsertIfNotExistsAsync
+/// actually swallows duplicate-key collisions (the M2 Bundle A race-fix the
+/// reconciliation puller depends on).
+///
+public class SiteAuditReconciliationActorTests : TestKit, IClassFixture
+{
+ private readonly MsSqlMigrationFixture _fixture;
+
+ public SiteAuditReconciliationActorTests(MsSqlMigrationFixture fixture)
+ {
+ _fixture = fixture;
+ }
+
+ private static AuditEvent NewEvent(
+ string siteId,
+ DateTime? occurredAt = null,
+ Guid? id = null) => new()
+ {
+ EventId = id ?? Guid.NewGuid(),
+ OccurredAtUtc = occurredAt ?? new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
+ Channel = AuditChannel.ApiOutbound,
+ Kind = AuditKind.ApiCall,
+ Status = AuditStatus.Delivered,
+ SourceSiteId = siteId,
+ };
+
+ private static SiteAuditReconciliationOptions FastTickOptions(
+ int batchSize = 256,
+ int stalledAfter = 2) =>
+ new()
+ {
+ // 100 ms tick keeps each test under a second. AwaitAssert covers
+ // schedule jitter so a 100 ms tick has up to ~3 s to fire.
+ ReconciliationIntervalSeconds = 300,
+ ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
+ BatchSize = batchSize,
+ StalledAfterNonDrainingCycles = stalledAfter,
+ };
+
+ ///
+ /// In-memory recording stub used for non-MSSQL tests. Captures every
+ /// call AND deduplicates on
+ /// so duplicate-handling assertions don't
+ /// need a real database for the simple cases.
+ ///
+ private sealed class RecordingRepo : IAuditLogRepository
+ {
+ public List Inserted { get; } = new();
+ private readonly HashSet _seen = new();
+ public int InsertCallCount { get; private set; }
+
+ public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
+ {
+ InsertCallCount++;
+ if (_seen.Add(evt.EventId))
+ {
+ Inserted.Add(evt);
+ }
+ return Task.CompletedTask;
+ }
+
+ public Task> QueryAsync(
+ AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
+ Task.FromResult>(Inserted);
+
+ public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
+ Task.CompletedTask;
+ }
+
+ ///
+ /// In-memory enumerator returning a static list of sites.
+ ///
+ private sealed class StaticEnumerator : ISiteEnumerator
+ {
+ private readonly IReadOnlyList _sites;
+ public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
+ public Task> EnumerateAsync(CancellationToken ct = default) =>
+ Task.FromResult(_sites);
+ }
+
+ ///
+ /// Scripted pull client — returns the next queued response for the site
+ /// on each call, looping the last entry if the queue is exhausted. Also
+ /// records every invocation so tests can assert call counts + arguments.
+ ///
+ private sealed class ScriptedPullClient : IPullAuditEventsClient
+ {
+ public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new();
+ private readonly Dictionary> _scripted = new();
+ private readonly Dictionary _throwOnSite = new();
+
+ public ScriptedPullClient Script(string siteId, params PullAuditEventsResponse[] responses)
+ {
+ _scripted[siteId] = new Queue(responses);
+ return this;
+ }
+
+ public ScriptedPullClient ThrowFor(string siteId, Exception ex)
+ {
+ _throwOnSite[siteId] = ex;
+ return this;
+ }
+
+ public Task PullAsync(
+ string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
+ {
+ Calls.Add((siteId, sinceUtc, batchSize));
+ if (_throwOnSite.TryGetValue(siteId, out var ex))
+ {
+ throw ex;
+ }
+ if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0)
+ {
+ return Task.FromResult(queue.Dequeue());
+ }
+ return Task.FromResult(
+ new PullAuditEventsResponse(Array.Empty(), MoreAvailable: false));
+ }
+ }
+
+ private IServiceProvider BuildScopedProvider(IAuditLogRepository repo)
+ {
+ var services = new ServiceCollection();
+ // The actor opens a scope per tick and resolves IAuditLogRepository
+ // from that scope; registering as scoped mirrors how
+ // AddConfigurationDatabase wires the real repository.
+ services.AddScoped(_ => repo);
+ return services.BuildServiceProvider();
+ }
+
+ private IActorRef CreateActor(
+ ISiteEnumerator sites,
+ IPullAuditEventsClient client,
+ IAuditLogRepository repo,
+ SiteAuditReconciliationOptions options)
+ {
+ var sp = BuildScopedProvider(repo);
+ return Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor(
+ sites,
+ client,
+ sp,
+ Options.Create(options),
+ NullLogger.Instance)));
+ }
+
+ ///
+ /// Subscribes to the EventStream and collects every
+ /// publication into a list
+ /// the test can assert on. Uses a probe actor so the stream's
+ /// fire-and-forget delivery is observable from the test thread.
+ ///
+ private (Akka.TestKit.TestProbe Probe, List Captured) SubscribeStalled()
+ {
+ var probe = CreateTestProbe();
+ Sys.EventStream.Subscribe(probe.Ref, typeof(SiteAuditTelemetryStalledChanged));
+ var captured = new List();
+ return (probe, captured);
+ }
+
+ // ---------------------------------------------------------------------
+ // 1. Timer_Fires_OnConfiguredInterval
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Timer_Fires_OnConfiguredInterval()
+ {
+ var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
+ var client = new ScriptedPullClient();
+ var repo = new RecordingRepo();
+ var opts = FastTickOptions();
+
+ CreateActor(sites, client, repo, opts);
+
+ // The first scheduled tick fires after `ReconciliationIntervalSeconds`,
+ // which is 0 for the test — Akka's scheduler still respects the
+ // ScheduleTellRepeatedlyCancelable contract that issues a Tell on the
+ // scheduler thread, so we await visible side effects (a PullAsync call)
+ // rather than racing on internal state.
+ AwaitAssert(
+ () => Assert.True(client.Calls.Count >= 1, $"expected >= 1 pull call, got {client.Calls.Count}"),
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 2. Tick_PullsFromEachKnownSite
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_PullsFromEachKnownSite()
+ {
+ var sites = new StaticEnumerator(
+ new SiteEntry("siteA", "http://siteA:8083"),
+ new SiteEntry("siteB", "http://siteB:8083"));
+ var client = new ScriptedPullClient();
+ var repo = new RecordingRepo();
+
+ CreateActor(sites, client, repo, FastTickOptions());
+
+ AwaitAssert(() =>
+ {
+ Assert.Contains(client.Calls, c => c.SiteId == "siteA");
+ Assert.Contains(client.Calls, c => c.SiteId == "siteB");
+ },
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 3. Tick_IngestEvents_ViaInsertIfNotExistsAsync
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_IngestEvents_ViaInsertIfNotExistsAsync()
+ {
+ var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
+ var e1 = NewEvent("siteA");
+ var e2 = NewEvent("siteA");
+ var client = new ScriptedPullClient().Script("siteA",
+ new PullAuditEventsResponse(new[] { e1, e2 }, MoreAvailable: false));
+ var repo = new RecordingRepo();
+
+ CreateActor(sites, client, repo, FastTickOptions());
+
+ AwaitAssert(() => Assert.Equal(2, repo.InsertCallCount),
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ Assert.Contains(repo.Inserted, e => e.EventId == e1.EventId);
+ Assert.Contains(repo.Inserted, e => e.EventId == e2.EventId);
+ }
+
+ // ---------------------------------------------------------------------
+ // 4. Tick_Duplicates_NotDoubleInserted (real MSSQL idempotency)
+ // ---------------------------------------------------------------------
+
+ private ScadaLinkDbContext CreateContext() =>
+ new(new DbContextOptionsBuilder()
+ .UseSqlServer(_fixture.ConnectionString).Options);
+
+ [SkippableFact]
+ public async Task Tick_Duplicates_NotDoubleInserted()
+ {
+ Skip.IfNot(_fixture.Available, _fixture.SkipReason);
+
+ var siteId = "bundle-b-" + Guid.NewGuid().ToString("N").Substring(0, 8);
+ var pre = NewEvent(siteId);
+
+ // Seed the row directly so the actor sees it already present when the
+ // pull returns it.
+ await using (var seedContext = CreateContext())
+ {
+ await new AuditLogRepository(seedContext).InsertIfNotExistsAsync(pre);
+ }
+
+ // Stack one new and the pre-existing row in the pull response. The
+ // second-pull script returns empty so the actor settles.
+ var fresh = NewEvent(siteId);
+ var sites = new StaticEnumerator(new SiteEntry(siteId, "http://x:8083"));
+ var client = new ScriptedPullClient().Script(siteId,
+ new PullAuditEventsResponse(new[] { pre, fresh }, MoreAvailable: false));
+
+ await using var context = CreateContext();
+ var repo = new AuditLogRepository(context);
+
+ CreateActor(sites, client, repo, FastTickOptions());
+
+ // Wait for the actor to ingest both rows.
+ await Task.Delay(TimeSpan.FromSeconds(1));
+ AwaitAssert(() => Assert.True(client.Calls.Count >= 1),
+ duration: TimeSpan.FromSeconds(3));
+
+ // Even though the pull returned 2 events, only 1 fresh row should
+ // exist in MSSQL alongside the pre-existing one — InsertIfNotExistsAsync
+ // is first-write-wins on EventId.
+ await using var read = CreateContext();
+ var rows = await read.Set()
+ .Where(e => e.SourceSiteId == siteId)
+ .ToListAsync();
+ Assert.Equal(2, rows.Count);
+ Assert.Contains(rows, r => r.EventId == pre.EventId);
+ Assert.Contains(rows, r => r.EventId == fresh.EventId);
+ }
+
+ // ---------------------------------------------------------------------
+ // 5. Cursor_Advances_ToMaxOccurredAtUtc
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Cursor_Advances_ToMaxOccurredAtUtc()
+ {
+ var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
+
+ var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
+ var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc);
+ var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc);
+ var e1 = NewEvent("siteA", t1);
+ var e2 = NewEvent("siteA", t2);
+ var e3 = NewEvent("siteA", t3);
+
+ // First pull returns three events with t1, t2, t3. Subsequent pulls
+ // return empty — but the test asserts the SECOND pull's since argument
+ // is t3 (the max OccurredAtUtc from the first pull).
+ var client = new ScriptedPullClient().Script("siteA",
+ new PullAuditEventsResponse(new[] { e1, e2, e3 }, MoreAvailable: false));
+ var repo = new RecordingRepo();
+
+ CreateActor(sites, client, repo, FastTickOptions());
+
+ // Wait until we have at least two pulls — the second one must use t3
+ // as its `since` argument because that was the max OccurredAtUtc in
+ // the first response.
+ AwaitAssert(() => Assert.True(client.Calls.Count >= 2,
+ $"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"),
+ duration: TimeSpan.FromSeconds(5),
+ interval: TimeSpan.FromMilliseconds(50));
+
+ Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc);
+ Assert.Equal(t3, client.Calls[1].SinceUtc);
+ }
+
+ // ---------------------------------------------------------------------
+ // 6. Tick_OneSiteThrows_OtherSitesStillProcessed
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_OneSiteThrows_OtherSitesStillProcessed()
+ {
+ var sites = new StaticEnumerator(
+ new SiteEntry("siteA", "http://siteA:8083"),
+ new SiteEntry("siteB", "http://siteB:8083"));
+
+ var bEvent = NewEvent("siteB");
+ var client = new ScriptedPullClient()
+ .ThrowFor("siteA", new InvalidOperationException("simulated transport failure"))
+ .Script("siteB",
+ new PullAuditEventsResponse(new[] { bEvent }, MoreAvailable: false));
+ var repo = new RecordingRepo();
+
+ CreateActor(sites, client, repo, FastTickOptions());
+
+ AwaitAssert(() =>
+ {
+ Assert.Contains(client.Calls, c => c.SiteId == "siteA");
+ Assert.Contains(repo.Inserted, e => e.EventId == bEvent.EventId);
+ },
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 7. StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue()
+ {
+ var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
+
+ // Two scripted responses that each return events AND MoreAvailable=true
+ // — the second pull triggers the stalled transition.
+ var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
+ var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
+ var client = new ScriptedPullClient().Script("siteA",
+ new PullAuditEventsResponse(batch1, MoreAvailable: true),
+ new PullAuditEventsResponse(batch2, MoreAvailable: true));
+
+ var repo = new RecordingRepo();
+ var (probe, _) = SubscribeStalled();
+
+ CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
+
+ // Expect Stalled=true after the second non-draining tick. The probe
+ // waits with its own timeout (a few seconds gives the 0 s repeat
+ // interval ample slack).
+ var msg = probe.ExpectMsg(TimeSpan.FromSeconds(5));
+ Assert.Equal("siteA", msg.SiteId);
+ Assert.True(msg.Stalled);
+ }
+
+ // ---------------------------------------------------------------------
+ // 8. StalledDetection_DrainingCycle_PublishesStalledFalse
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void StalledDetection_DrainingCycle_PublishesStalledFalse()
+ {
+ var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
+
+ // Two non-draining responses get the actor into Stalled=true, then a
+ // draining response (events but MoreAvailable=false) flips it back.
+ var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
+ var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
+ var batch3 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
+ var client = new ScriptedPullClient().Script("siteA",
+ new PullAuditEventsResponse(batch1, MoreAvailable: true),
+ new PullAuditEventsResponse(batch2, MoreAvailable: true),
+ new PullAuditEventsResponse(batch3, MoreAvailable: false));
+
+ var repo = new RecordingRepo();
+ var (probe, _) = SubscribeStalled();
+
+ CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
+
+ // First publication is the stalled=true transition; second is the
+ // back-to-draining flip. The actor publishes ONLY on transitions so we
+ // expect exactly these two messages in order.
+ var first = probe.ExpectMsg(TimeSpan.FromSeconds(5));
+ Assert.True(first.Stalled);
+
+ var second = probe.ExpectMsg(TimeSpan.FromSeconds(5));
+ Assert.False(second.Stalled);
+ Assert.Equal("siteA", second.SiteId);
+ }
+}