using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Notifications; using ScadaLink.NotificationOutbox.Delivery; using ScadaLink.NotificationOutbox.Messages; namespace ScadaLink.NotificationOutbox; /// /// Central-side actor that owns the notification outbox. It accepts /// messages forwarded from sites and persists each as a /// row (the ingest path), and runs a periodic dispatch loop /// that claims due notifications, delivers them through the matching channel adapter, and /// applies the resulting status transition. Query and purge are added by later tasks. /// public class NotificationOutboxActor : ReceiveActor, IWithTimers { private const string DispatchTimerKey = "dispatch"; /// Retry policy fallback used when no SMTP configuration row is present. private const int FallbackMaxRetries = 10; private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1); private readonly IServiceProvider _serviceProvider; private readonly NotificationOutboxOptions _options; private readonly ILogger _logger; private readonly IReadOnlyDictionary _adapters; /// /// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared /// when the sweep's arrives. While true, /// further s are dropped so sweeps never overlap. /// private bool _dispatching; /// Akka timer scheduler, assigned by the actor system via . public ITimerScheduler Timers { get; set; } = null!; public NotificationOutboxActor( IServiceProvider serviceProvider, NotificationOutboxOptions options, ILogger logger, IReadOnlyDictionary adapters) { _serviceProvider = serviceProvider; _options = options; _logger = logger; _adapters = adapters; Receive(HandleSubmit); Receive(HandleIngestPersisted); Receive(_ => HandleDispatchTick()); Receive(_ => _dispatching = false); Receive(HandleQuery); Receive(HandleStatusQuery); Receive(HandleRetry); Receive(HandleDiscard); Receive(HandleKpiRequest); } /// /// Starts the periodic dispatch timer once the actor is running. The tick cadence is /// . /// protected override void PreStart() { base.PreStart(); Timers.StartPeriodicTimer( DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval); } /// /// Maps an inbound onto a , /// persists it idempotently, and pipes the outcome back to so the /// ack is sent from the actor thread with the original sender preserved. /// private void HandleSubmit(NotificationSubmit msg) { var sender = Sender; var notification = BuildNotification(msg); // The success projection fires for both a fresh insert and an existing row; // only a thrown repository error reaches the failure projection. PersistAsync(notification).PipeTo( Self, success: () => new InternalMessages.IngestPersisted( msg.NotificationId, sender, Succeeded: true, Error: null), failure: ex => new InternalMessages.IngestPersisted( msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message)); } /// /// Resolves a scoped and inserts the /// notification if a row with the same id does not already exist. The boolean result /// of InsertIfNotExistsAsync is intentionally ignored: an existing row is an /// idempotent re-submission and is acked just like a fresh insert so the site can /// clear its forward buffer. Only a thrown error must surface to the caller. /// private async Task PersistAsync(Notification notification) { using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); await repository.InsertIfNotExistsAsync(notification); } /// /// Acks the original submitter once persistence completes. /// is Accepted for both a fresh insert and an existing row; only a thrown /// repository error produces Accepted: false so the site retries the forward. /// private void HandleIngestPersisted(InternalMessages.IngestPersisted msg) { if (msg.Succeeded) { _logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId); msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null)); } else { _logger.LogWarning( "Failed to ingest notification {NotificationId}: {Error}", msg.NotificationId, msg.Error); msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error)); } } /// /// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so /// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is /// launched, with a piped back to /// to lower the guard on the actor thread. /// private void HandleDispatchTick() { if (_dispatching) { return; } _dispatching = true; var now = DateTimeOffset.UtcNow; // RunDispatchPass swallows its own errors, but the failure projection is kept as a // belt-and-braces guard so even a faulted task still lowers the in-flight guard — // otherwise the dispatcher would wedge permanently. RunDispatchPass(now).PipeTo( Self, success: () => InternalMessages.DispatchComplete.Instance, failure: ex => { _logger.LogError(ex, "Dispatch sweep faulted unexpectedly."); return InternalMessages.DispatchComplete.Instance; }); } /// /// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and /// delivers each notification sequentially. Per-notification failures are caught and /// logged so one bad row never aborts the rest of the batch. The whole body is wrapped /// in a try/catch so the returned task never faults — scope creation, service resolution, /// and retry-policy resolution can all throw, and a faulted task would otherwise leave /// the dispatcher's in-flight guard stuck and wedge the loop permanently. /// private async Task RunDispatchPass(DateTimeOffset now) { try { using var scope = _serviceProvider.CreateScope(); var outboxRepository = scope.ServiceProvider.GetRequiredService(); var notificationRepository = scope.ServiceProvider.GetRequiredService(); IReadOnlyList due; try { due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize); } catch (Exception ex) { _logger.LogError(ex, "Dispatch sweep failed to claim due notifications."); return; } if (due.Count == 0) { return; } var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository); foreach (var notification in due) { try { await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository); } catch (Exception ex) { // Isolate per-notification failures so the remainder of the batch still runs. _logger.LogError( ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId); } } } catch (Exception ex) { // Scope/service resolution or retry-policy resolution faulted; swallow and log so // the returned task completes normally and the in-flight guard is always cleared. _logger.LogError(ex, "Dispatch sweep failed unexpectedly."); } } /// /// Resolves the retry policy from the first SMTP configuration row. When no SMTP /// configuration exists, falls back to a conservative default — delivery itself will /// permanently fail in that case, so the policy only acts as a guard. /// private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync( INotificationRepository notificationRepository) { var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync(); var configuration = configurations.Count > 0 ? configurations[0] : null; return configuration is null ? (FallbackMaxRetries, FallbackRetryDelay) : (configuration.MaxRetries, configuration.RetryDelay); } /// /// Delivers a single notification through its channel adapter and applies the resulting /// status transition. A missing adapter parks the notification; otherwise the /// drives the transition. The updated row is always persisted. /// private async Task DeliverOneAsync( Notification notification, DateTimeOffset now, int maxRetries, TimeSpan retryDelay, INotificationOutboxRepository outboxRepository) { if (!_adapters.TryGetValue(notification.Type, out var adapter)) { notification.Status = NotificationStatus.Parked; notification.LastError = $"no delivery adapter for type {notification.Type}"; notification.LastAttemptAt = now; await outboxRepository.UpdateAsync(notification); return; } var outcome = await adapter.DeliverAsync(notification); switch (outcome.Result) { case DeliveryResult.Success: notification.Status = NotificationStatus.Delivered; notification.DeliveredAt = now; notification.LastAttemptAt = now; notification.ResolvedTargets = outcome.ResolvedTargets; notification.LastError = null; break; case DeliveryResult.TransientFailure: notification.LastAttemptAt = now; notification.RetryCount++; notification.LastError = outcome.Error; if (notification.RetryCount >= maxRetries) { notification.Status = NotificationStatus.Parked; } else { notification.Status = NotificationStatus.Retrying; notification.NextAttemptAt = now + retryDelay; } break; case DeliveryResult.PermanentFailure: notification.Status = NotificationStatus.Parked; notification.LastAttemptAt = now; notification.LastError = outcome.Error; break; } await outboxRepository.UpdateAsync(notification); } /// /// Handles a paginated, filtered query over the outbox. Builds a /// from the request (parsing the string status/type /// filters to their enums and deriving the stuck cutoff when StuckOnly is set), /// runs the query on a scoped repository, and pipes the mapped response back to the /// captured sender. A repository fault yields a failure response with an empty list. /// private void HandleQuery(NotificationOutboxQueryRequest request) { var sender = Sender; var now = DateTimeOffset.UtcNow; QueryOutboxAsync(request, now).PipeTo( sender, success: response => response, failure: ex => new NotificationOutboxQueryResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, Notifications: Array.Empty(), TotalCount: 0)); } private async Task QueryOutboxAsync( NotificationOutboxQueryRequest request, DateTimeOffset now) { var filter = new NotificationOutboxFilter( Status: ParseEnum(request.StatusFilter), Type: ParseEnum(request.TypeFilter), SourceSiteId: request.SourceSiteFilter, ListName: request.ListNameFilter, SubjectKeyword: request.SubjectKeyword, StuckOnly: request.StuckOnly, StuckCutoff: request.StuckOnly ? StuckCutoff(now) : null, From: request.From, To: request.To); using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); var (rows, totalCount) = await repository.QueryAsync(filter, request.PageNumber, request.PageSize); var stuckCutoff = StuckCutoff(now); var summaries = rows .Select(row => new NotificationSummary( row.NotificationId, row.Type.ToString(), row.ListName, row.Subject, row.Status.ToString(), row.RetryCount, row.LastError, row.SourceSiteId, row.SourceInstanceId, row.CreatedAt, row.DeliveredAt, IsStuck: IsStuck(row, stuckCutoff))) .ToList(); return new NotificationOutboxQueryResponse( request.CorrelationId, Success: true, ErrorMessage: null, summaries, totalCount); } /// /// Handles a single-notification status query. Replies Found: false with empty /// detail when no row matches, otherwise the row's current status, retry count, last /// error, and delivery time. /// private void HandleStatusQuery(NotificationStatusQuery query) { var sender = Sender; StatusQueryAsync(query).PipeTo( sender, success: response => response, failure: ex => { // NotificationStatusResponse has no error field, so a repository fault is // reported as Found: false — log the fault so a transient DB error is not // silently indistinguishable from a genuinely-missing notification. _logger.LogWarning( ex, "Status query for notification {NotificationId} failed.", query.NotificationId); return new NotificationStatusResponse( query.CorrelationId, Found: false, Status: string.Empty, RetryCount: 0, LastError: null, DeliveredAt: null); }); } private async Task StatusQueryAsync(NotificationStatusQuery query) { using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); var notification = await repository.GetByIdAsync(query.NotificationId); if (notification is null) { return new NotificationStatusResponse( query.CorrelationId, Found: false, Status: string.Empty, RetryCount: 0, LastError: null, DeliveredAt: null); } return new NotificationStatusResponse( query.CorrelationId, Found: true, Status: notification.Status.ToString(), RetryCount: notification.RetryCount, LastError: notification.LastError, DeliveredAt: notification.DeliveredAt); } /// /// Handles a manual retry request. Only a Parked notification can be retried; /// it is reset to Pending with a cleared retry count, next-attempt time, and /// last error so the dispatch loop re-claims it on the next sweep. /// private void HandleRetry(RetryNotificationRequest request) { var sender = Sender; RetryAsync(request).PipeTo( sender, success: response => response, failure: ex => new RetryNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message)); } private async Task RetryAsync(RetryNotificationRequest request) { using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); var notification = await repository.GetByIdAsync(request.NotificationId); if (notification is null) { return new RetryNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: "notification not found"); } if (notification.Status != NotificationStatus.Parked) { return new RetryNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: "only parked notifications can be retried"); } notification.Status = NotificationStatus.Pending; notification.RetryCount = 0; notification.NextAttemptAt = null; notification.LastError = null; await repository.UpdateAsync(notification); return new RetryNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null); } /// /// Handles a manual discard request. Only a Parked notification can be discarded; /// it is moved to the terminal Discarded status. /// private void HandleDiscard(DiscardNotificationRequest request) { var sender = Sender; DiscardAsync(request).PipeTo( sender, success: response => response, failure: ex => new DiscardNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message)); } private async Task DiscardAsync(DiscardNotificationRequest request) { using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); var notification = await repository.GetByIdAsync(request.NotificationId); if (notification is null) { return new DiscardNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: "notification not found"); } if (notification.Status != NotificationStatus.Parked) { return new DiscardNotificationResponse( request.CorrelationId, Success: false, ErrorMessage: "only parked notifications can be discarded"); } notification.Status = NotificationStatus.Discarded; await repository.UpdateAsync(notification); return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null); } /// /// Handles a KPI snapshot request, computing the outbox metrics with the stuck cutoff /// derived from and the /// delivered window from . /// private void HandleKpiRequest(NotificationKpiRequest request) { var sender = Sender; var now = DateTimeOffset.UtcNow; var stuckCutoff = StuckCutoff(now); var deliveredSince = now - _options.DeliveredKpiWindow; ComputeKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo( sender, success: response => response, failure: ex => new NotificationKpiResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, QueueDepth: 0, StuckCount: 0, ParkedCount: 0, DeliveredLastInterval: 0, OldestPendingAge: null)); } private async Task ComputeKpisAsync( string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince) { using var scope = _serviceProvider.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService(); var snapshot = await repository.ComputeKpisAsync(stuckCutoff, deliveredSince); return new NotificationKpiResponse( correlationId, Success: true, ErrorMessage: null, snapshot.QueueDepth, snapshot.StuckCount, snapshot.ParkedCount, snapshot.DeliveredLastInterval, snapshot.OldestPendingAge); } /// /// The instant before which a still-pending notification counts as stuck — /// offset back by . /// private DateTimeOffset StuckCutoff(DateTimeOffset now) => now - _options.StuckAgeThreshold; /// /// A notification counts as stuck when it is still in a non-terminal status /// (Pending or Retrying) and was created before the supplied cutoff. /// private static bool IsStuck(Notification notification, DateTimeOffset stuckCutoff) { return notification.Status is NotificationStatus.Pending or NotificationStatus.Retrying && notification.CreatedAt < stuckCutoff; } /// /// Parses a string filter value to a nullable enum, ignoring case. An empty, whitespace, /// or unrecognised value yields null — meaning "no constraint on that dimension". /// private static TEnum? ParseEnum(string? value) where TEnum : struct, Enum { return Enum.TryParse(value, ignoreCase: true, out var parsed) ? parsed : null; } private static Notification BuildNotification(NotificationSubmit msg) { // All current notifications are email; NotificationType has only the Email member. return new Notification( msg.NotificationId, NotificationType.Email, msg.ListName, msg.Subject, msg.Body, msg.SourceSiteId) { SourceInstanceId = msg.SourceInstanceId, SourceScript = msg.SourceScript, SiteEnqueuedAt = msg.SiteEnqueuedAt, CreatedAt = DateTimeOffset.UtcNow, // Status stays at its Pending default for the dispatch sweep to claim. }; } }