fix(notification-outbox): clear dispatch in-flight flag on a faulted pass

This commit is contained in:
Joseph Doherty
2026-05-19 01:45:09 -04:00
parent c41f43c87f
commit ab3721a2e8
2 changed files with 71 additions and 29 deletions

View File

@@ -139,52 +139,72 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
_dispatching = true;
var now = DateTimeOffset.UtcNow;
// RunDispatchPass swallows its own errors; the completion message is sent for both
// success and failure so the guard is always cleared.
RunDispatchPass(now).PipeTo(Self, success: () => InternalMessages.DispatchComplete.Instance);
// RunDispatchPass swallows its own errors, but the failure projection is kept as a
// belt-and-braces guard so even a faulted task still lowers the in-flight guard —
// otherwise the dispatcher would wedge permanently.
RunDispatchPass(now).PipeTo(
Self,
success: () => InternalMessages.DispatchComplete.Instance,
failure: ex =>
{
_logger.LogError(ex, "Dispatch sweep faulted unexpectedly.");
return InternalMessages.DispatchComplete.Instance;
});
}
/// <summary>
/// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and
/// delivers each notification sequentially. Per-notification failures are caught and
/// logged so one bad row never aborts the rest of the batch.
/// logged so one bad row never aborts the rest of the batch. The whole body is wrapped
/// in a try/catch so the returned task never faults — scope creation, service resolution,
/// and retry-policy resolution can all throw, and a faulted task would otherwise leave
/// the dispatcher's in-flight guard stuck and wedge the loop permanently.
/// </summary>
private async Task RunDispatchPass(DateTimeOffset now)
{
using var scope = _serviceProvider.CreateScope();
var outboxRepository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
IReadOnlyList<Notification> due;
try
{
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
}
catch (Exception ex)
{
_logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
return;
}
using var scope = _serviceProvider.CreateScope();
var outboxRepository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
if (due.Count == 0)
{
return;
}
var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
foreach (var notification in due)
{
IReadOnlyList<Notification> due;
try
{
await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository);
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
}
catch (Exception ex)
{
// Isolate per-notification failures so the remainder of the batch still runs.
_logger.LogError(
ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
_logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
return;
}
if (due.Count == 0)
{
return;
}
var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
foreach (var notification in due)
{
try
{
await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository);
}
catch (Exception ex)
{
// Isolate per-notification failures so the remainder of the batch still runs.
_logger.LogError(
ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
}
}
}
catch (Exception ex)
{
// Scope/service resolution or retry-policy resolution faulted; swallow and log so
// the returned task completes normally and the in-flight guard is always cleared.
_logger.LogError(ex, "Dispatch sweep failed unexpectedly.");
}
}

View File

@@ -254,6 +254,28 @@ public class NotificationOutboxActorDispatchTests : TestKit
});
}
[Fact]
public void FaultedDispatchPass_ClearsInFlightGuard_SoNextTickStillRuns()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
// GetDueAsync throws on every call: the dispatch pass's task could fault if the
// failure were not handled, which would leave _dispatching stuck true forever.
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns<IReadOnlyList<Notification>>(_ => throw new InvalidOperationException("db down"));
var actor = CreateActor(new Dictionary<NotificationType, INotificationDeliveryAdapter>());
// First tick: the pass faults internally but must still clear the in-flight guard.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => _outboxRepository.Received(1).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()));
// Second tick after the first completes: if the guard had wedged, this would be
// dropped and GetDueAsync would still show only one call.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => _outboxRepository.Received(2).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()));
}
[Fact]
public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently()
{