fix(auditlog): SiteAuditReconciliationActor captures EventStream before await (#23 M6)

This commit is contained in:
Joseph Doherty
2026-05-20 18:39:19 -04:00
parent 660fdc4e93
commit cc2d6e91f1

View File

@@ -137,6 +137,14 @@ public class SiteAuditReconciliationActor : ReceiveActor
private async Task OnTickAsync() private async Task OnTickAsync()
{ {
// Capture EventStream BEFORE the first await. Accessing Context (and
// therefore Context.System) after an await is unsafe because Akka's
// ActorBase.Context throws "no active ActorContext" once the
// continuation runs on a thread that isn't currently dispatching this
// actor — mirrors the AuditLogPurgeActor.OnTickAsync fix and the
// AuditLogIngestActor.OnIngestAsync Sender-capture pattern.
var eventStream = Context.System.EventStream;
IReadOnlyList<SiteEntry> sites; IReadOnlyList<SiteEntry> sites;
try try
{ {
@@ -173,7 +181,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
{ {
try try
{ {
await PullSiteAsync(site, repository).ConfigureAwait(false); await PullSiteAsync(site, repository, eventStream).ConfigureAwait(false);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -203,7 +211,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
/// drains across consecutive ticks. The stalled signal (two non-draining /// drains across consecutive ticks. The stalled signal (two non-draining
/// ticks in a row) surfaces when that drain isn't keeping up. /// ticks in a row) surfaces when that drain isn't keeping up.
/// </summary> /// </summary>
private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository) private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository, Akka.Event.EventStream eventStream)
{ {
var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue; var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
var response = await _client.PullAsync( var response = await _client.PullAsync(
@@ -247,7 +255,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
_cursors[site.SiteId] = maxOccurred; _cursors[site.SiteId] = maxOccurred;
var nonDraining = response.MoreAvailable && response.Events.Count > 0; var nonDraining = response.MoreAvailable && response.Events.Count > 0;
UpdateStalledState(site.SiteId, draining: !nonDraining); UpdateStalledState(site.SiteId, draining: !nonDraining, eventStream);
} }
/// <summary> /// <summary>
@@ -266,7 +274,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
/// silent so a downstream subscriber doesn't see a flood of redundant /// silent so a downstream subscriber doesn't see a flood of redundant
/// notifications. /// notifications.
/// </remarks> /// </remarks>
private void UpdateStalledState(string siteId, bool draining) private void UpdateStalledState(string siteId, bool draining, Akka.Event.EventStream eventStream)
{ {
var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior; var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior;
@@ -276,7 +284,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
if (wasStalled) if (wasStalled)
{ {
_stalled[siteId] = false; _stalled[siteId] = false;
Context.System.EventStream.Publish( eventStream.Publish(
new SiteAuditTelemetryStalledChanged(siteId, Stalled: false)); new SiteAuditTelemetryStalledChanged(siteId, Stalled: false));
} }
return; return;
@@ -288,7 +296,7 @@ public class SiteAuditReconciliationActor : ReceiveActor
if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled) if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled)
{ {
_stalled[siteId] = true; _stalled[siteId] = true;
Context.System.EventStream.Publish( eventStream.Publish(
new SiteAuditTelemetryStalledChanged(siteId, Stalled: true)); new SiteAuditTelemetryStalledChanged(siteId, Stalled: true));
} }
} }