diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs index 6460c4d..e38e6d2 100644 --- a/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs +++ b/src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs @@ -137,6 +137,14 @@ public class SiteAuditReconciliationActor : ReceiveActor private async Task OnTickAsync() { + // Capture EventStream BEFORE the first await. Accessing Context (and + // therefore Context.System) after an await is unsafe because Akka's + // ActorBase.Context throws "no active ActorContext" once the + // continuation runs on a thread that isn't currently dispatching this + // actor — mirrors the AuditLogPurgeActor.OnTickAsync fix and the + // AuditLogIngestActor.OnIngestAsync Sender-capture pattern. + var eventStream = Context.System.EventStream; + IReadOnlyList sites; try { @@ -173,7 +181,7 @@ public class SiteAuditReconciliationActor : ReceiveActor { try { - await PullSiteAsync(site, repository).ConfigureAwait(false); + await PullSiteAsync(site, repository, eventStream).ConfigureAwait(false); } catch (Exception ex) { @@ -203,7 +211,7 @@ public class SiteAuditReconciliationActor : ReceiveActor /// drains across consecutive ticks. The stalled signal (two non-draining /// ticks in a row) surfaces when that drain isn't keeping up. /// - private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository) + private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository, Akka.Event.EventStream eventStream) { var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue; var response = await _client.PullAsync( @@ -247,7 +255,7 @@ public class SiteAuditReconciliationActor : ReceiveActor _cursors[site.SiteId] = maxOccurred; var nonDraining = response.MoreAvailable && response.Events.Count > 0; - UpdateStalledState(site.SiteId, draining: !nonDraining); + UpdateStalledState(site.SiteId, draining: !nonDraining, eventStream); } /// @@ -266,7 +274,7 @@ public class SiteAuditReconciliationActor : ReceiveActor /// silent so a downstream subscriber doesn't see a flood of redundant /// notifications. /// - private void UpdateStalledState(string siteId, bool draining) + private void UpdateStalledState(string siteId, bool draining, Akka.Event.EventStream eventStream) { var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior; @@ -276,7 +284,7 @@ public class SiteAuditReconciliationActor : ReceiveActor if (wasStalled) { _stalled[siteId] = false; - Context.System.EventStream.Publish( + eventStream.Publish( new SiteAuditTelemetryStalledChanged(siteId, Stalled: false)); } return; @@ -288,7 +296,7 @@ public class SiteAuditReconciliationActor : ReceiveActor if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled) { _stalled[siteId] = true; - Context.System.EventStream.Publish( + eventStream.Publish( new SiteAuditTelemetryStalledChanged(siteId, Stalled: true)); } }