using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.NotificationOutbox;
///
/// Central-side actor that owns the notification outbox. It accepts
/// messages forwarded from sites and persists each as a
/// row (the ingest path), and runs a periodic dispatch loop
/// that claims due notifications, delivers them through the matching channel adapter, and
/// applies the resulting status transition. Query and purge are added by later tasks.
///
public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
private const string DispatchTimerKey = "dispatch";
/// Retry policy fallback used when no SMTP configuration row is present.
private const int FallbackMaxRetries = 10;
private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1);
private readonly IServiceProvider _serviceProvider;
private readonly NotificationOutboxOptions _options;
private readonly ILogger _logger;
private readonly IReadOnlyDictionary _adapters;
///
/// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared
/// when the sweep's arrives. While true,
/// further s are dropped so sweeps never overlap.
///
private bool _dispatching;
/// Akka timer scheduler, assigned by the actor system via .
public ITimerScheduler Timers { get; set; } = null!;
public NotificationOutboxActor(
IServiceProvider serviceProvider,
NotificationOutboxOptions options,
ILogger logger,
IReadOnlyDictionary adapters)
{
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
_adapters = adapters;
Receive(HandleSubmit);
Receive(HandleIngestPersisted);
Receive(_ => HandleDispatchTick());
Receive(_ => _dispatching = false);
}
///
/// Starts the periodic dispatch timer once the actor is running. The tick cadence is
/// .
///
protected override void PreStart()
{
base.PreStart();
Timers.StartPeriodicTimer(
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
}
///
/// Maps an inbound onto a ,
/// persists it idempotently, and pipes the outcome back to so the
/// ack is sent from the actor thread with the original sender preserved.
///
private void HandleSubmit(NotificationSubmit msg)
{
var sender = Sender;
var notification = BuildNotification(msg);
// The success projection fires for both a fresh insert and an existing row;
// only a thrown repository error reaches the failure projection.
PersistAsync(notification).PipeTo(
Self,
success: () => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: true, Error: null),
failure: ex => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message));
}
///
/// Resolves a scoped and inserts the
/// notification if a row with the same id does not already exist. The boolean result
/// of InsertIfNotExistsAsync is intentionally ignored: an existing row is an
/// idempotent re-submission and is acked just like a fresh insert so the site can
/// clear its forward buffer. Only a thrown error must surface to the caller.
///
private async Task PersistAsync(Notification notification)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService();
await repository.InsertIfNotExistsAsync(notification);
}
///
/// Acks the original submitter once persistence completes.
/// is Accepted for both a fresh insert and an existing row; only a thrown
/// repository error produces Accepted: false so the site retries the forward.
///
private void HandleIngestPersisted(InternalMessages.IngestPersisted msg)
{
if (msg.Succeeded)
{
_logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null));
}
else
{
_logger.LogWarning(
"Failed to ingest notification {NotificationId}: {Error}",
msg.NotificationId, msg.Error);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error));
}
}
///
/// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so
/// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is
/// launched, with a piped back to
/// to lower the guard on the actor thread.
///
private void HandleDispatchTick()
{
if (_dispatching)
{
return;
}
_dispatching = true;
var now = DateTimeOffset.UtcNow;
// RunDispatchPass swallows its own errors; the completion message is sent for both
// success and failure so the guard is always cleared.
RunDispatchPass(now).PipeTo(Self, success: () => InternalMessages.DispatchComplete.Instance);
}
///
/// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and
/// delivers each notification sequentially. Per-notification failures are caught and
/// logged so one bad row never aborts the rest of the batch.
///
private async Task RunDispatchPass(DateTimeOffset now)
{
using var scope = _serviceProvider.CreateScope();
var outboxRepository = scope.ServiceProvider.GetRequiredService();
var notificationRepository = scope.ServiceProvider.GetRequiredService();
IReadOnlyList due;
try
{
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
}
catch (Exception ex)
{
_logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
return;
}
if (due.Count == 0)
{
return;
}
var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
foreach (var notification in due)
{
try
{
await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository);
}
catch (Exception ex)
{
// Isolate per-notification failures so the remainder of the batch still runs.
_logger.LogError(
ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
}
}
}
///
/// Resolves the retry policy from the first SMTP configuration row. When no SMTP
/// configuration exists, falls back to a conservative default — delivery itself will
/// permanently fail in that case, so the policy only acts as a guard.
///
private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync(
INotificationRepository notificationRepository)
{
var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync();
var configuration = configurations.Count > 0 ? configurations[0] : null;
return configuration is null
? (FallbackMaxRetries, FallbackRetryDelay)
: (configuration.MaxRetries, configuration.RetryDelay);
}
///
/// Delivers a single notification through its channel adapter and applies the resulting
/// status transition. A missing adapter parks the notification; otherwise the
/// drives the transition. The updated row is always persisted.
///
private async Task DeliverOneAsync(
Notification notification,
DateTimeOffset now,
int maxRetries,
TimeSpan retryDelay,
INotificationOutboxRepository outboxRepository)
{
if (!_adapters.TryGetValue(notification.Type, out var adapter))
{
notification.Status = NotificationStatus.Parked;
notification.LastError = $"no delivery adapter for type {notification.Type}";
notification.LastAttemptAt = now;
await outboxRepository.UpdateAsync(notification);
return;
}
var outcome = await adapter.DeliverAsync(notification);
switch (outcome.Result)
{
case DeliveryResult.Success:
notification.Status = NotificationStatus.Delivered;
notification.DeliveredAt = now;
notification.LastAttemptAt = now;
notification.ResolvedTargets = outcome.ResolvedTargets;
notification.LastError = null;
break;
case DeliveryResult.TransientFailure:
notification.LastAttemptAt = now;
notification.RetryCount++;
notification.LastError = outcome.Error;
if (notification.RetryCount >= maxRetries)
{
notification.Status = NotificationStatus.Parked;
}
else
{
notification.Status = NotificationStatus.Retrying;
notification.NextAttemptAt = now + retryDelay;
}
break;
case DeliveryResult.PermanentFailure:
notification.Status = NotificationStatus.Parked;
notification.LastAttemptAt = now;
notification.LastError = outcome.Error;
break;
}
await outboxRepository.UpdateAsync(notification);
}
private static Notification BuildNotification(NotificationSubmit msg)
{
// All current notifications are email; NotificationType has only the Email member.
return new Notification(
msg.NotificationId,
NotificationType.Email,
msg.ListName,
msg.Subject,
msg.Body,
msg.SourceSiteId)
{
SourceInstanceId = msg.SourceInstanceId,
SourceScript = msg.SourceScript,
SiteEnqueuedAt = msg.SiteEnqueuedAt,
CreatedAt = DateTimeOffset.UtcNow,
// Status stays at its Pending default for the dispatch sweep to claim.
};
}
}