From c41f43c87ff3be4d13d081f0de5673812e47d858 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 01:42:28 -0400 Subject: [PATCH] feat(notification-outbox): add dispatcher loop to NotificationOutboxActor --- .../Messages/InternalMessages.cs | 26 ++ .../NotificationOutboxActor.cs | 184 +++++++++++- .../NotificationOutboxActorDispatchTests.cs | 282 ++++++++++++++++++ .../NotificationOutboxActorIngestTests.cs | 4 +- 4 files changed, 489 insertions(+), 7 deletions(-) create mode 100644 tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs diff --git a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs index 0096f34..95326da 100644 --- a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs +++ b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs @@ -26,4 +26,30 @@ internal static class InternalMessages IActorRef Sender, bool Succeeded, string? Error); + + /// + /// Periodic tick that triggers a dispatch sweep. Started as a periodic timer in + /// PreStart at the configured DispatchInterval. A singleton instance is + /// reused so the timer carries no per-tick state. + /// + internal sealed class DispatchTick + { + /// The shared singleton tick instance scheduled by the dispatch timer. + internal static readonly DispatchTick Instance = new(); + + private DispatchTick() { } + } + + /// + /// Completion signal for an asynchronous dispatch sweep, piped back to the actor so the + /// in-flight guard is cleared on the actor thread. Sent on both success and failure of + /// the sweep — the actor only needs to know the sweep has finished. + /// + internal sealed class DispatchComplete + { + /// The shared singleton completion instance. + internal static readonly DispatchComplete Instance = new(); + + private DispatchComplete() { } + } } diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 855a357..58f99f1 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -5,33 +5,67 @@ using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; using ScadaLink.NotificationOutbox.Messages; namespace ScadaLink.NotificationOutbox; /// -/// Central-side actor that owns the notification outbox. This task implements the ingest -/// path only: it accepts messages forwarded from sites, -/// persists each as a row, and acks the submitting site. -/// Dispatch, query, and purge are added by later tasks. +/// Central-side actor that owns the notification outbox. It accepts +/// messages forwarded from sites and persists each as a +/// row (the ingest path), and runs a periodic dispatch loop +/// that claims due notifications, delivers them through the matching channel adapter, and +/// applies the resulting status transition. Query and purge are added by later tasks. /// -public class NotificationOutboxActor : ReceiveActor +public class NotificationOutboxActor : ReceiveActor, IWithTimers { + private const string DispatchTimerKey = "dispatch"; + + /// Retry policy fallback used when no SMTP configuration row is present. + private const int FallbackMaxRetries = 10; + private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1); + private readonly IServiceProvider _serviceProvider; private readonly NotificationOutboxOptions _options; private readonly ILogger _logger; + private readonly IReadOnlyDictionary _adapters; + + /// + /// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared + /// when the sweep's arrives. While true, + /// further s are dropped so sweeps never overlap. + /// + private bool _dispatching; + + /// Akka timer scheduler, assigned by the actor system via . + public ITimerScheduler Timers { get; set; } = null!; public NotificationOutboxActor( IServiceProvider serviceProvider, NotificationOutboxOptions options, - ILogger logger) + ILogger logger, + IReadOnlyDictionary adapters) { _serviceProvider = serviceProvider; _options = options; _logger = logger; + _adapters = adapters; Receive(HandleSubmit); Receive(HandleIngestPersisted); + Receive(_ => HandleDispatchTick()); + Receive(_ => _dispatching = false); + } + + /// + /// Starts the periodic dispatch timer once the actor is running. The tick cadence is + /// . + /// + protected override void PreStart() + { + base.PreStart(); + Timers.StartPeriodicTimer( + DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval); } /// @@ -89,6 +123,144 @@ public class NotificationOutboxActor : ReceiveActor } } + /// + /// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so + /// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is + /// launched, with a piped back to + /// to lower the guard on the actor thread. + /// + private void HandleDispatchTick() + { + if (_dispatching) + { + return; + } + + _dispatching = true; + var now = DateTimeOffset.UtcNow; + + // RunDispatchPass swallows its own errors; the completion message is sent for both + // success and failure so the guard is always cleared. + RunDispatchPass(now).PipeTo(Self, success: () => InternalMessages.DispatchComplete.Instance); + } + + /// + /// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and + /// delivers each notification sequentially. Per-notification failures are caught and + /// logged so one bad row never aborts the rest of the batch. + /// + private async Task RunDispatchPass(DateTimeOffset now) + { + using var scope = _serviceProvider.CreateScope(); + var outboxRepository = scope.ServiceProvider.GetRequiredService(); + var notificationRepository = scope.ServiceProvider.GetRequiredService(); + + IReadOnlyList due; + try + { + due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize); + } + catch (Exception ex) + { + _logger.LogError(ex, "Dispatch sweep failed to claim due notifications."); + return; + } + + if (due.Count == 0) + { + return; + } + + var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository); + + foreach (var notification in due) + { + try + { + await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository); + } + catch (Exception ex) + { + // Isolate per-notification failures so the remainder of the batch still runs. + _logger.LogError( + ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId); + } + } + } + + /// + /// Resolves the retry policy from the first SMTP configuration row. When no SMTP + /// configuration exists, falls back to a conservative default — delivery itself will + /// permanently fail in that case, so the policy only acts as a guard. + /// + private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync( + INotificationRepository notificationRepository) + { + var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync(); + var configuration = configurations.Count > 0 ? configurations[0] : null; + return configuration is null + ? (FallbackMaxRetries, FallbackRetryDelay) + : (configuration.MaxRetries, configuration.RetryDelay); + } + + /// + /// Delivers a single notification through its channel adapter and applies the resulting + /// status transition. A missing adapter parks the notification; otherwise the + /// drives the transition. The updated row is always persisted. + /// + private async Task DeliverOneAsync( + Notification notification, + DateTimeOffset now, + int maxRetries, + TimeSpan retryDelay, + INotificationOutboxRepository outboxRepository) + { + if (!_adapters.TryGetValue(notification.Type, out var adapter)) + { + notification.Status = NotificationStatus.Parked; + notification.LastError = $"no delivery adapter for type {notification.Type}"; + notification.LastAttemptAt = now; + await outboxRepository.UpdateAsync(notification); + return; + } + + var outcome = await adapter.DeliverAsync(notification); + + switch (outcome.Result) + { + case DeliveryResult.Success: + notification.Status = NotificationStatus.Delivered; + notification.DeliveredAt = now; + notification.LastAttemptAt = now; + notification.ResolvedTargets = outcome.ResolvedTargets; + notification.LastError = null; + break; + + case DeliveryResult.TransientFailure: + notification.LastAttemptAt = now; + notification.RetryCount++; + notification.LastError = outcome.Error; + if (notification.RetryCount >= maxRetries) + { + notification.Status = NotificationStatus.Parked; + } + else + { + notification.Status = NotificationStatus.Retrying; + notification.NextAttemptAt = now + retryDelay; + } + break; + + case DeliveryResult.PermanentFailure: + notification.Status = NotificationStatus.Parked; + notification.LastAttemptAt = now; + notification.LastError = outcome.Error; + break; + } + + await outboxRepository.UpdateAsync(notification); + } + private static Notification BuildNotification(NotificationSubmit msg) { // All current notifications are email; NotificationType has only the Email member. diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs new file mode 100644 index 0000000..dedfc79 --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs @@ -0,0 +1,282 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// Task 14: Tests for the dispatcher loop — the +/// periodic sweep that claims due notifications via +/// , delivers each through the +/// matching , and applies the resulting status +/// transition with . +/// +public class NotificationOutboxActorDispatchTests : TestKit +{ + private readonly INotificationOutboxRepository _outboxRepository = + Substitute.For(); + + private readonly INotificationRepository _notificationRepository = + Substitute.For(); + + private IServiceProvider BuildServiceProvider() + { + var services = new ServiceCollection(); + services.AddScoped(_ => _outboxRepository); + services.AddScoped(_ => _notificationRepository); + return services.BuildServiceProvider(); + } + + /// + /// Stub adapter whose returns a configurable outcome and + /// optionally blocks for a delay — used to exercise the overlapping-tick guard. + /// + private sealed class StubAdapter : INotificationDeliveryAdapter + { + private readonly Func _outcome; + private readonly TimeSpan _delay; + + public StubAdapter(Func outcome, TimeSpan? delay = null) + { + _outcome = outcome; + _delay = delay ?? TimeSpan.Zero; + } + + public int CallCount; + + public NotificationType Type => NotificationType.Email; + + public async Task DeliverAsync( + Notification notification, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref CallCount); + if (_delay > TimeSpan.Zero) + { + await Task.Delay(_delay, cancellationToken); + } + + return _outcome(); + } + } + + private IActorRef CreateActor( + IReadOnlyDictionary adapters, + NotificationOutboxOptions? options = null) + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(), + options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, + NullLogger.Instance, + adapters))); + } + + private static Notification MakeNotification( + NotificationType type = NotificationType.Email, int retryCount = 0) + { + return new Notification( + Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1") + { + RetryCount = retryCount, + CreatedAt = DateTimeOffset.UtcNow, + }; + } + + private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay) + { + var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") + { + MaxRetries = maxRetries, + RetryDelay = retryDelay, + }; + _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any()) + .Returns(new[] { config }); + } + + [Fact] + public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).GetDueAsync( + Arg.Any(), Arg.Any(), Arg.Any()); + Assert.Equal(1, adapter.CallCount); + }); + } + + [Fact] + public void Success_MarksNotificationDelivered_WithResolvedTargets() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => + n.Status == NotificationStatus.Delivered && + n.DeliveredAt != null && + n.LastAttemptAt != null && + n.ResolvedTargets == "ops@example.com" && + n.LastError == null), + Arg.Any()); + }); + } + + [Fact] + public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3)); + var notification = MakeNotification(retryCount: 1); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => + n.Status == NotificationStatus.Retrying && + n.RetryCount == 2 && + n.NextAttemptAt != null && + n.LastError == "smtp timeout" && + n.LastAttemptAt != null), + Arg.Any()); + }); + } + + [Fact] + public void TransientFailure_ReachingMaxRetries_MarksParked() + { + SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1)); + // RetryCount starts at max-1; the failed attempt increments it to max. + var notification = MakeNotification(retryCount: 2); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => + n.Status == NotificationStatus.Parked && + n.RetryCount == 3), + Arg.Any()); + }); + } + + [Fact] + public void PermanentFailure_MarksParked_WithLastError() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address")); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => + n.Status == NotificationStatus.Parked && + n.LastError == "invalid recipient address" && + n.LastAttemptAt != null), + Arg.Any()); + }); + } + + [Fact] + public void NoAdapterForType_MarksParked_WithExplanatoryError() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + // Empty adapter dictionary: no adapter resolves for the notification's type. + var actor = CreateActor(new Dictionary()); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => + n.Status == NotificationStatus.Parked && + n.LastError != null && + n.LastError.Contains("no delivery adapter") && + n.LastAttemptAt != null), + Arg.Any()); + }); + } + + [Fact] + public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + // Slow adapter keeps the first sweep in flight while the second tick arrives. + var adapter = new StubAdapter( + () => DeliveryOutcome.Success("ops@example.com"), + delay: TimeSpan.FromMilliseconds(800)); + var actor = CreateActor(new Dictionary + { + [NotificationType.Email] = adapter, + }); + + actor.Tell(InternalMessages.DispatchTick.Instance); + actor.Tell(InternalMessages.DispatchTick.Instance); + + // Second tick is dropped by the in-flight guard: only one sweep runs. + AwaitAssert( + () => _outboxRepository.Received(1).GetDueAsync( + Arg.Any(), Arg.Any(), Arg.Any()), + duration: TimeSpan.FromSeconds(2)); + } +} diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs index 15edb37..b712e88 100644 --- a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs @@ -8,6 +8,7 @@ using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; namespace ScadaLink.NotificationOutbox.Tests; @@ -33,7 +34,8 @@ public class NotificationOutboxActorIngestTests : TestKit return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( BuildServiceProvider(), new NotificationOutboxOptions(), - NullLogger.Instance))); + NullLogger.Instance, + new Dictionary()))); } private static NotificationSubmit MakeSubmit(string? notificationId = null)