refactor(notification-outbox): make purge fault-handling symmetric with dispatch

This commit is contained in:
Joseph Doherty
2026-05-19 02:01:48 -04:00
parent 41358c1cee
commit 517437b0d9

View File

@@ -59,6 +59,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick()); Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false); Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
Receive<InternalMessages.PurgeTick>(_ => HandlePurgeTick()); Receive<InternalMessages.PurgeTick>(_ => HandlePurgeTick());
// No-op: purge has no in-flight guard to lower, and the outcome is already logged
// by the PipeTo projections, so PurgeComplete carries nothing to act on.
Receive<InternalMessages.PurgeComplete>(_ => { }); Receive<InternalMessages.PurgeComplete>(_ => { });
Receive<NotificationOutboxQueryRequest>(HandleQuery); Receive<NotificationOutboxQueryRequest>(HandleQuery);
Receive<NotificationStatusQuery>(HandleStatusQuery); Receive<NotificationStatusQuery>(HandleStatusQuery);
@@ -297,9 +299,11 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// <summary> /// <summary>
/// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal /// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal
/// notification rows older than <see cref="NotificationOutboxOptions.TerminalRetention"/>. /// notification rows older than <see cref="NotificationOutboxOptions.TerminalRetention"/>.
/// Purges are daily and idempotent, so no in-flight guard is needed; the outcome is piped /// Purges are daily and idempotent, so no in-flight guard is needed. <see cref="RunPurgePass"/>
/// back to <see cref="Self"/> only so a faulted purge is logged on the actor thread and /// self-isolates its faults — it logs internally and never faults its task — so the
/// never wedges anything. /// success projection is the normal completion path that logs the deleted count. The
/// failure projection is kept as a belt-and-braces backup, consistent with
/// <see cref="HandleDispatchTick"/>/<see cref="RunDispatchPass"/>.
/// </summary> /// </summary>
private void HandlePurgeTick() private void HandlePurgeTick()
{ {
@@ -324,13 +328,26 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// <summary> /// <summary>
/// Runs a single purge sweep: resolves a scoped <see cref="INotificationOutboxRepository"/> /// Runs a single purge sweep: resolves a scoped <see cref="INotificationOutboxRepository"/>
/// and bulk-deletes terminal rows created before <paramref name="cutoff"/>, returning the /// and bulk-deletes terminal rows created before <paramref name="cutoff"/>, returning the
/// deleted count. /// deleted count. The whole body is wrapped in a try/catch so the returned task never
/// faults — scope creation, service resolution, and the bulk delete can all throw, and
/// self-isolating the fault here keeps the fault-handling strategy symmetric with
/// <see cref="RunDispatchPass"/>. On failure the exception is logged and 0 is returned.
/// </summary> /// </summary>
private async Task<int> RunPurgePass(DateTimeOffset cutoff) private async Task<int> RunPurgePass(DateTimeOffset cutoff)
{ {
using var scope = _serviceProvider.CreateScope(); try
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>(); {
return await repository.DeleteTerminalOlderThanAsync(cutoff); using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
return await repository.DeleteTerminalOlderThanAsync(cutoff);
}
catch (Exception ex)
{
// Scope/service resolution or the bulk delete faulted; swallow and log so the
// returned task completes normally, mirroring RunDispatchPass.
_logger.LogError(ex, "Purge sweep failed unexpectedly.");
return 0;
}
} }
/// <summary> /// <summary>