diff --git a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs
index 0096f34..95326da 100644
--- a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs
+++ b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs
@@ -26,4 +26,30 @@ internal static class InternalMessages
IActorRef Sender,
bool Succeeded,
string? Error);
+
+ ///
+ /// Periodic tick that triggers a dispatch sweep. Started as a periodic timer in
+ /// PreStart at the configured DispatchInterval. A singleton instance is
+ /// reused so the timer carries no per-tick state.
+ ///
+ internal sealed class DispatchTick
+ {
+ /// The shared singleton tick instance scheduled by the dispatch timer.
+ internal static readonly DispatchTick Instance = new();
+
+ private DispatchTick() { }
+ }
+
+ ///
+ /// Completion signal for an asynchronous dispatch sweep, piped back to the actor so the
+ /// in-flight guard is cleared on the actor thread. Sent on both success and failure of
+ /// the sweep — the actor only needs to know the sweep has finished.
+ ///
+ internal sealed class DispatchComplete
+ {
+ /// The shared singleton completion instance.
+ internal static readonly DispatchComplete Instance = new();
+
+ private DispatchComplete() { }
+ }
}
diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs
index 855a357..58f99f1 100644
--- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs
+++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs
@@ -5,33 +5,67 @@ using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
+using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.NotificationOutbox;
///
-/// Central-side actor that owns the notification outbox. This task implements the ingest
-/// path only: it accepts messages forwarded from sites,
-/// persists each as a row, and acks the submitting site.
-/// Dispatch, query, and purge are added by later tasks.
+/// Central-side actor that owns the notification outbox. It accepts
+/// messages forwarded from sites and persists each as a
+/// row (the ingest path), and runs a periodic dispatch loop
+/// that claims due notifications, delivers them through the matching channel adapter, and
+/// applies the resulting status transition. Query and purge are added by later tasks.
///
-public class NotificationOutboxActor : ReceiveActor
+public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
+ private const string DispatchTimerKey = "dispatch";
+
+ /// Retry policy fallback used when no SMTP configuration row is present.
+ private const int FallbackMaxRetries = 10;
+ private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1);
+
private readonly IServiceProvider _serviceProvider;
private readonly NotificationOutboxOptions _options;
private readonly ILogger _logger;
+ private readonly IReadOnlyDictionary _adapters;
+
+ ///
+ /// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared
+ /// when the sweep's arrives. While true,
+ /// further s are dropped so sweeps never overlap.
+ ///
+ private bool _dispatching;
+
+ /// Akka timer scheduler, assigned by the actor system via .
+ public ITimerScheduler Timers { get; set; } = null!;
public NotificationOutboxActor(
IServiceProvider serviceProvider,
NotificationOutboxOptions options,
- ILogger logger)
+ ILogger logger,
+ IReadOnlyDictionary adapters)
{
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
+ _adapters = adapters;
Receive(HandleSubmit);
Receive(HandleIngestPersisted);
+ Receive(_ => HandleDispatchTick());
+ Receive(_ => _dispatching = false);
+ }
+
+ ///
+ /// Starts the periodic dispatch timer once the actor is running. The tick cadence is
+ /// .
+ ///
+ protected override void PreStart()
+ {
+ base.PreStart();
+ Timers.StartPeriodicTimer(
+ DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
}
///
@@ -89,6 +123,144 @@ public class NotificationOutboxActor : ReceiveActor
}
}
+ ///
+ /// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so
+ /// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is
+ /// launched, with a piped back to
+ /// to lower the guard on the actor thread.
+ ///
+ private void HandleDispatchTick()
+ {
+ if (_dispatching)
+ {
+ return;
+ }
+
+ _dispatching = true;
+ var now = DateTimeOffset.UtcNow;
+
+ // RunDispatchPass swallows its own errors; the completion message is sent for both
+ // success and failure so the guard is always cleared.
+ RunDispatchPass(now).PipeTo(Self, success: () => InternalMessages.DispatchComplete.Instance);
+ }
+
+ ///
+ /// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and
+ /// delivers each notification sequentially. Per-notification failures are caught and
+ /// logged so one bad row never aborts the rest of the batch.
+ ///
+ private async Task RunDispatchPass(DateTimeOffset now)
+ {
+ using var scope = _serviceProvider.CreateScope();
+ var outboxRepository = scope.ServiceProvider.GetRequiredService();
+ var notificationRepository = scope.ServiceProvider.GetRequiredService();
+
+ IReadOnlyList due;
+ try
+ {
+ due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
+ return;
+ }
+
+ if (due.Count == 0)
+ {
+ return;
+ }
+
+ var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
+
+ foreach (var notification in due)
+ {
+ try
+ {
+ await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository);
+ }
+ catch (Exception ex)
+ {
+ // Isolate per-notification failures so the remainder of the batch still runs.
+ _logger.LogError(
+ ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
+ }
+ }
+ }
+
+ ///
+ /// Resolves the retry policy from the first SMTP configuration row. When no SMTP
+ /// configuration exists, falls back to a conservative default — delivery itself will
+ /// permanently fail in that case, so the policy only acts as a guard.
+ ///
+ private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync(
+ INotificationRepository notificationRepository)
+ {
+ var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync();
+ var configuration = configurations.Count > 0 ? configurations[0] : null;
+ return configuration is null
+ ? (FallbackMaxRetries, FallbackRetryDelay)
+ : (configuration.MaxRetries, configuration.RetryDelay);
+ }
+
+ ///
+ /// Delivers a single notification through its channel adapter and applies the resulting
+ /// status transition. A missing adapter parks the notification; otherwise the
+ /// drives the transition. The updated row is always persisted.
+ ///
+ private async Task DeliverOneAsync(
+ Notification notification,
+ DateTimeOffset now,
+ int maxRetries,
+ TimeSpan retryDelay,
+ INotificationOutboxRepository outboxRepository)
+ {
+ if (!_adapters.TryGetValue(notification.Type, out var adapter))
+ {
+ notification.Status = NotificationStatus.Parked;
+ notification.LastError = $"no delivery adapter for type {notification.Type}";
+ notification.LastAttemptAt = now;
+ await outboxRepository.UpdateAsync(notification);
+ return;
+ }
+
+ var outcome = await adapter.DeliverAsync(notification);
+
+ switch (outcome.Result)
+ {
+ case DeliveryResult.Success:
+ notification.Status = NotificationStatus.Delivered;
+ notification.DeliveredAt = now;
+ notification.LastAttemptAt = now;
+ notification.ResolvedTargets = outcome.ResolvedTargets;
+ notification.LastError = null;
+ break;
+
+ case DeliveryResult.TransientFailure:
+ notification.LastAttemptAt = now;
+ notification.RetryCount++;
+ notification.LastError = outcome.Error;
+ if (notification.RetryCount >= maxRetries)
+ {
+ notification.Status = NotificationStatus.Parked;
+ }
+ else
+ {
+ notification.Status = NotificationStatus.Retrying;
+ notification.NextAttemptAt = now + retryDelay;
+ }
+ break;
+
+ case DeliveryResult.PermanentFailure:
+ notification.Status = NotificationStatus.Parked;
+ notification.LastAttemptAt = now;
+ notification.LastError = outcome.Error;
+ break;
+ }
+
+ await outboxRepository.UpdateAsync(notification);
+ }
+
private static Notification BuildNotification(NotificationSubmit msg)
{
// All current notifications are email; NotificationType has only the Email member.
diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs
new file mode 100644
index 0000000..dedfc79
--- /dev/null
+++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorDispatchTests.cs
@@ -0,0 +1,282 @@
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging.Abstractions;
+using NSubstitute;
+using ScadaLink.Commons.Entities.Notifications;
+using ScadaLink.Commons.Interfaces.Repositories;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.NotificationOutbox.Delivery;
+using ScadaLink.NotificationOutbox.Messages;
+
+namespace ScadaLink.NotificationOutbox.Tests;
+
+///
+/// Task 14: Tests for the dispatcher loop — the
+/// periodic sweep that claims due notifications via
+/// , delivers each through the
+/// matching , and applies the resulting status
+/// transition with .
+///
+public class NotificationOutboxActorDispatchTests : TestKit
+{
+ private readonly INotificationOutboxRepository _outboxRepository =
+ Substitute.For();
+
+ private readonly INotificationRepository _notificationRepository =
+ Substitute.For();
+
+ private IServiceProvider BuildServiceProvider()
+ {
+ var services = new ServiceCollection();
+ services.AddScoped(_ => _outboxRepository);
+ services.AddScoped(_ => _notificationRepository);
+ return services.BuildServiceProvider();
+ }
+
+ ///
+ /// Stub adapter whose returns a configurable outcome and
+ /// optionally blocks for a delay — used to exercise the overlapping-tick guard.
+ ///
+ private sealed class StubAdapter : INotificationDeliveryAdapter
+ {
+ private readonly Func _outcome;
+ private readonly TimeSpan _delay;
+
+ public StubAdapter(Func outcome, TimeSpan? delay = null)
+ {
+ _outcome = outcome;
+ _delay = delay ?? TimeSpan.Zero;
+ }
+
+ public int CallCount;
+
+ public NotificationType Type => NotificationType.Email;
+
+ public async Task DeliverAsync(
+ Notification notification, CancellationToken cancellationToken = default)
+ {
+ Interlocked.Increment(ref CallCount);
+ if (_delay > TimeSpan.Zero)
+ {
+ await Task.Delay(_delay, cancellationToken);
+ }
+
+ return _outcome();
+ }
+ }
+
+ private IActorRef CreateActor(
+ IReadOnlyDictionary adapters,
+ NotificationOutboxOptions? options = null)
+ {
+ return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
+ BuildServiceProvider(),
+ options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
+ NullLogger.Instance,
+ adapters)));
+ }
+
+ private static Notification MakeNotification(
+ NotificationType type = NotificationType.Email, int retryCount = 0)
+ {
+ return new Notification(
+ Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1")
+ {
+ RetryCount = retryCount,
+ CreatedAt = DateTimeOffset.UtcNow,
+ };
+ }
+
+ private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
+ {
+ var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
+ {
+ MaxRetries = maxRetries,
+ RetryDelay = retryDelay,
+ };
+ _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any())
+ .Returns(new[] { config });
+ }
+
+ [Fact]
+ public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
+ var notification = MakeNotification();
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).GetDueAsync(
+ Arg.Any(), Arg.Any(), Arg.Any());
+ Assert.Equal(1, adapter.CallCount);
+ });
+ }
+
+ [Fact]
+ public void Success_MarksNotificationDelivered_WithResolvedTargets()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
+ var notification = MakeNotification();
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).UpdateAsync(
+ Arg.Is(n =>
+ n.Status == NotificationStatus.Delivered &&
+ n.DeliveredAt != null &&
+ n.LastAttemptAt != null &&
+ n.ResolvedTargets == "ops@example.com" &&
+ n.LastError == null),
+ Arg.Any());
+ });
+ }
+
+ [Fact]
+ public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3));
+ var notification = MakeNotification(retryCount: 1);
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).UpdateAsync(
+ Arg.Is(n =>
+ n.Status == NotificationStatus.Retrying &&
+ n.RetryCount == 2 &&
+ n.NextAttemptAt != null &&
+ n.LastError == "smtp timeout" &&
+ n.LastAttemptAt != null),
+ Arg.Any());
+ });
+ }
+
+ [Fact]
+ public void TransientFailure_ReachingMaxRetries_MarksParked()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1));
+ // RetryCount starts at max-1; the failed attempt increments it to max.
+ var notification = MakeNotification(retryCount: 2);
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).UpdateAsync(
+ Arg.Is(n =>
+ n.Status == NotificationStatus.Parked &&
+ n.RetryCount == 3),
+ Arg.Any());
+ });
+ }
+
+ [Fact]
+ public void PermanentFailure_MarksParked_WithLastError()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
+ var notification = MakeNotification();
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).UpdateAsync(
+ Arg.Is(n =>
+ n.Status == NotificationStatus.Parked &&
+ n.LastError == "invalid recipient address" &&
+ n.LastAttemptAt != null),
+ Arg.Any());
+ });
+ }
+
+ [Fact]
+ public void NoAdapterForType_MarksParked_WithExplanatoryError()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
+ var notification = MakeNotification();
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ // Empty adapter dictionary: no adapter resolves for the notification's type.
+ var actor = CreateActor(new Dictionary());
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ AwaitAssert(() =>
+ {
+ _outboxRepository.Received(1).UpdateAsync(
+ Arg.Is(n =>
+ n.Status == NotificationStatus.Parked &&
+ n.LastError != null &&
+ n.LastError.Contains("no delivery adapter") &&
+ n.LastAttemptAt != null),
+ Arg.Any());
+ });
+ }
+
+ [Fact]
+ public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently()
+ {
+ SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
+ var notification = MakeNotification();
+ _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(new[] { notification });
+ // Slow adapter keeps the first sweep in flight while the second tick arrives.
+ var adapter = new StubAdapter(
+ () => DeliveryOutcome.Success("ops@example.com"),
+ delay: TimeSpan.FromMilliseconds(800));
+ var actor = CreateActor(new Dictionary
+ {
+ [NotificationType.Email] = adapter,
+ });
+
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+ actor.Tell(InternalMessages.DispatchTick.Instance);
+
+ // Second tick is dropped by the in-flight guard: only one sweep runs.
+ AwaitAssert(
+ () => _outboxRepository.Received(1).GetDueAsync(
+ Arg.Any(), Arg.Any(), Arg.Any()),
+ duration: TimeSpan.FromSeconds(2));
+ }
+}
diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs
index 15edb37..b712e88 100644
--- a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs
+++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs
@@ -8,6 +8,7 @@ using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
+using ScadaLink.NotificationOutbox.Delivery;
namespace ScadaLink.NotificationOutbox.Tests;
@@ -33,7 +34,8 @@ public class NotificationOutboxActorIngestTests : TestKit
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
BuildServiceProvider(),
new NotificationOutboxOptions(),
- NullLogger.Instance)));
+ NullLogger.Instance,
+ new Dictionary())));
}
private static NotificationSubmit MakeSubmit(string? notificationId = null)