Files
scadalink-design/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs

941 lines
41 KiB
C#

using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Commons.Types.Notifications;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.NotificationOutbox;
/// <summary>
/// Central-side actor that owns the notification outbox. It accepts
/// <see cref="NotificationSubmit"/> messages forwarded from sites and persists each as a
/// <see cref="Notification"/> row (the ingest path), and runs a periodic dispatch loop
/// that claims due notifications, delivers them through the matching channel adapter, and
/// applies the resulting status transition. It also runs a periodic purge that bulk-deletes
/// terminal notification rows once they age past the configured retention window.
/// </summary>
public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
private const string DispatchTimerKey = "dispatch";
private const string PurgeTimerKey = "purge";
/// <summary>Retry policy fallback used when no SMTP configuration row is present.</summary>
private const int FallbackMaxRetries = 10;
private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1);
private readonly IServiceProvider _serviceProvider;
private readonly NotificationOutboxOptions _options;
private readonly ICentralAuditWriter _auditWriter;
private readonly ILogger<NotificationOutboxActor> _logger;
/// <summary>
/// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared
/// when the sweep's <see cref="InternalMessages.DispatchComplete"/> arrives. While true,
/// further <see cref="InternalMessages.DispatchTick"/>s are dropped so sweeps never overlap.
/// </summary>
private bool _dispatching;
/// <summary>Akka timer scheduler, assigned by the actor system via <see cref="IWithTimers"/>.</summary>
public ITimerScheduler Timers { get; set; } = null!;
public NotificationOutboxActor(
IServiceProvider serviceProvider,
NotificationOutboxOptions options,
ICentralAuditWriter auditWriter,
ILogger<NotificationOutboxActor> logger)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_options = options ?? throw new ArgumentNullException(nameof(options));
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Receive<NotificationSubmit>(HandleSubmit);
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
Receive<InternalMessages.PurgeTick>(_ => HandlePurgeTick());
// No-op: purge has no in-flight guard to lower, and the outcome is already logged
// by the PipeTo projections, so PurgeComplete carries nothing to act on.
Receive<InternalMessages.PurgeComplete>(_ => { });
Receive<NotificationOutboxQueryRequest>(HandleQuery);
Receive<NotificationStatusQuery>(HandleStatusQuery);
Receive<NotificationDetailRequest>(HandleDetailRequest);
Receive<RetryNotificationRequest>(HandleRetry);
Receive<DiscardNotificationRequest>(HandleDiscard);
Receive<NotificationKpiRequest>(HandleKpiRequest);
Receive<PerSiteNotificationKpiRequest>(HandlePerSiteKpiRequest);
}
/// <summary>
/// Starts the periodic timers once the actor is running: the dispatch loop at
/// <see cref="NotificationOutboxOptions.DispatchInterval"/> and the terminal-row purge
/// at <see cref="NotificationOutboxOptions.PurgeInterval"/>.
/// </summary>
protected override void PreStart()
{
base.PreStart();
Timers.StartPeriodicTimer(
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
Timers.StartPeriodicTimer(
PurgeTimerKey, InternalMessages.PurgeTick.Instance, _options.PurgeInterval);
}
/// <summary>
/// Maps an inbound <see cref="NotificationSubmit"/> onto a <see cref="Notification"/>,
/// persists it idempotently, and pipes the outcome back to <see cref="Self"/> so the
/// ack is sent from the actor thread with the original sender preserved.
/// </summary>
private void HandleSubmit(NotificationSubmit msg)
{
var sender = Sender;
var notification = BuildNotification(msg);
// The success projection fires for both a fresh insert and an existing row;
// only a thrown repository error reaches the failure projection.
PersistAsync(notification).PipeTo(
Self,
success: () => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: true, Error: null),
failure: ex => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message));
}
/// <summary>
/// Resolves a scoped <see cref="INotificationOutboxRepository"/> and inserts the
/// notification if a row with the same id does not already exist. The boolean result
/// of <c>InsertIfNotExistsAsync</c> is intentionally ignored: an existing row is an
/// idempotent re-submission and is acked just like a fresh insert so the site can
/// clear its forward buffer. Only a thrown error must surface to the caller.
/// </summary>
private async Task PersistAsync(Notification notification)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
await repository.InsertIfNotExistsAsync(notification);
}
/// <summary>
/// Acks the original submitter once persistence completes. <see cref="NotificationSubmitAck"/>
/// is <c>Accepted</c> for both a fresh insert and an existing row; only a thrown
/// repository error produces <c>Accepted: false</c> so the site retries the forward.
/// </summary>
private void HandleIngestPersisted(InternalMessages.IngestPersisted msg)
{
if (msg.Succeeded)
{
_logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null));
}
else
{
_logger.LogWarning(
"Failed to ingest notification {NotificationId}: {Error}",
msg.NotificationId, msg.Error);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error));
}
}
/// <summary>
/// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so
/// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is
/// launched, with a <see cref="InternalMessages.DispatchComplete"/> piped back to
/// <see cref="Self"/> to lower the guard on the actor thread.
/// </summary>
private void HandleDispatchTick()
{
if (_dispatching)
{
return;
}
_dispatching = true;
var now = DateTimeOffset.UtcNow;
// RunDispatchPass swallows its own errors, but the failure projection is kept as a
// belt-and-braces guard so even a faulted task still lowers the in-flight guard —
// otherwise the dispatcher would wedge permanently.
RunDispatchPass(now).PipeTo(
Self,
success: () => InternalMessages.DispatchComplete.Instance,
failure: ex =>
{
_logger.LogError(ex, "Dispatch sweep faulted unexpectedly.");
return InternalMessages.DispatchComplete.Instance;
});
}
/// <summary>
/// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and
/// delivers each notification sequentially. Per-notification failures are caught and
/// logged so one bad row never aborts the rest of the batch. The whole body is wrapped
/// in a try/catch so the returned task never faults — scope creation, service resolution,
/// and retry-policy resolution can all throw, and a faulted task would otherwise leave
/// the dispatcher's in-flight guard stuck and wedge the loop permanently.
///
/// The channel delivery adapters are resolved from the per-sweep scope, not held in a
/// field: <see cref="EmailNotificationDeliveryAdapter"/> takes a scoped
/// <see cref="INotificationRepository"/> directly, so a long-lived adapter reference on
/// this singleton actor would be a captive dependency over a disposed DbContext.
/// </summary>
private async Task RunDispatchPass(DateTimeOffset now)
{
try
{
using var scope = _serviceProvider.CreateScope();
var outboxRepository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
var adapters = ResolveAdapters(scope.ServiceProvider);
IReadOnlyList<Notification> due;
try
{
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
}
catch (Exception ex)
{
_logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
return;
}
if (due.Count == 0)
{
return;
}
var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
foreach (var notification in due)
{
try
{
await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository, adapters);
}
catch (Exception ex)
{
// Isolate per-notification failures so the remainder of the batch still runs.
_logger.LogError(
ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
}
}
}
catch (Exception ex)
{
// Scope/service resolution or retry-policy resolution faulted; swallow and log so
// the returned task completes normally and the in-flight guard is always cleared.
_logger.LogError(ex, "Dispatch sweep failed unexpectedly.");
}
}
/// <summary>
/// Resolves the retry policy from the first SMTP configuration row. When no SMTP
/// configuration exists, falls back to a conservative default — delivery itself will
/// permanently fail in that case, so the policy only acts as a guard.
/// </summary>
private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync(
INotificationRepository notificationRepository)
{
var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync();
var configuration = configurations.Count > 0 ? configurations[0] : null;
return configuration is null
? (FallbackMaxRetries, FallbackRetryDelay)
: (configuration.MaxRetries, configuration.RetryDelay);
}
/// <summary>
/// Builds the <see cref="NotificationType"/> → adapter lookup for a dispatch sweep from
/// the registered <see cref="INotificationDeliveryAdapter"/> services in the supplied
/// scope. The last adapter registered for a given type wins, mirroring DI's last-wins
/// resolution semantics.
/// </summary>
private static IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> ResolveAdapters(
IServiceProvider scopedServices)
{
var adapters = new Dictionary<NotificationType, INotificationDeliveryAdapter>();
foreach (var adapter in scopedServices.GetServices<INotificationDeliveryAdapter>())
{
adapters[adapter.Type] = adapter;
}
return adapters;
}
/// <summary>
/// Delivers a single notification through its channel adapter and applies the resulting
/// status transition. A missing adapter parks the notification; otherwise the
/// <see cref="DeliveryOutcome"/> drives the transition. The updated row is always persisted.
/// </summary>
/// <remarks>
/// <para>
/// M4 Bundle B2 + B3: a single
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// row is emitted with <see cref="AuditStatus.Attempted"/> per attempt
/// (success, transient, permanent); when the post-outcome status is a
/// terminal one (Delivered, Parked) a SECOND row is emitted carrying
/// that terminal status. Both emissions are wrapped in a try/catch so a
/// thrown audit writer NEVER aborts the user-facing dispatch — the
/// <see cref="CentralAuditWriter"/> itself swallows internal failures,
/// but the dispatcher wraps defensively per alog.md §13. The
/// missing-adapter park path also emits both rows because it IS an
/// attempt that resolved to a park from the dispatcher's point of view.
/// </para>
/// <para>
/// Attempt duration is measured around the adapter call and recorded on
/// the Attempted row so downstream KPIs can compute per-attempt latency
/// without joining to the row update timestamps.
/// </para>
/// </remarks>
private async Task DeliverOneAsync(
Notification notification,
DateTimeOffset now,
int maxRetries,
TimeSpan retryDelay,
INotificationOutboxRepository outboxRepository,
IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> adapters)
{
if (!adapters.TryGetValue(notification.Type, out var adapter))
{
// Missing-adapter park: from the dispatcher's perspective this is an
// attempt that resolved to a terminal park. Emit Attempted then the
// terminal Parked row, both carrying the same explanatory error.
var missingAdapterError = $"no delivery adapter for type {notification.Type}";
notification.Status = NotificationStatus.Parked;
notification.LastError = missingAdapterError;
notification.LastAttemptAt = now;
await outboxRepository.UpdateAsync(notification);
EmitAttemptAudit(
notification,
now,
durationMs: 0,
errorMessage: missingAdapterError);
EmitTerminalAudit(notification, now, errorMessage: missingAdapterError);
return;
}
// Measure the attempt duration around the adapter call so the
// Attempted row carries it for KPI use.
var attemptStart = DateTimeOffset.UtcNow;
var outcome = await adapter.DeliverAsync(notification);
var durationMs = (int)Math.Min(
int.MaxValue, Math.Max(0, (DateTimeOffset.UtcNow - attemptStart).TotalMilliseconds));
switch (outcome.Result)
{
case DeliveryResult.Success:
notification.Status = NotificationStatus.Delivered;
notification.DeliveredAt = now;
notification.LastAttemptAt = now;
notification.ResolvedTargets = outcome.ResolvedTargets;
notification.LastError = null;
break;
case DeliveryResult.TransientFailure:
notification.LastAttemptAt = now;
notification.RetryCount++;
notification.LastError = outcome.Error;
if (notification.RetryCount >= maxRetries)
{
notification.Status = NotificationStatus.Parked;
}
else
{
notification.Status = NotificationStatus.Retrying;
notification.NextAttemptAt = now + retryDelay;
}
break;
case DeliveryResult.PermanentFailure:
notification.Status = NotificationStatus.Parked;
notification.LastAttemptAt = now;
notification.LastError = outcome.Error;
break;
}
await outboxRepository.UpdateAsync(notification);
// Emit the per-attempt Attempted row exactly once regardless of the
// outcome (B2). The error message comes from the outcome, not from
// notification.LastError, so a success row is null and a transient
// row carries the SMTP failure reason verbatim.
EmitAttemptAudit(
notification,
now,
durationMs: durationMs,
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
// If the post-outcome status is terminal (Delivered or Parked — the
// dispatcher never sets Discarded; that lives on the manual discard
// path), emit the terminal NotifyDeliver row (B3). The error message
// on a Delivered terminal is null; on Parked it carries the outcome's
// reason so downstream consumers can link Attempted+Parked rows.
if (IsTerminal(notification.Status))
{
EmitTerminalAudit(
notification,
now,
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
}
}
/// <summary>
/// True for <see cref="NotificationStatus.Delivered"/>,
/// <see cref="NotificationStatus.Parked"/>, or
/// <see cref="NotificationStatus.Discarded"/> — the three terminal states
/// on the central outbox lifecycle. Used by the dispatcher and the manual
/// discard handler to decide when to emit the terminal NotifyDeliver row.
/// </summary>
private static bool IsTerminal(NotificationStatus status)
{
return status is NotificationStatus.Delivered
or NotificationStatus.Parked
or NotificationStatus.Discarded;
}
/// <summary>
/// Emits a single
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// audit row carrying the terminal status (Delivered, Parked, or
/// Discarded) of <paramref name="notification"/>. Wrapped in try/catch
/// for the same defensive reason as <see cref="EmitAttemptAudit"/>.
/// </summary>
private void EmitTerminalAudit(
Notification notification,
DateTimeOffset now,
string? errorMessage)
{
try
{
var terminalStatus = MapNotificationStatusToAuditStatus(notification.Status);
var evt = BuildNotifyDeliverEvent(notification, now, terminalStatus, errorMessage);
_ = _auditWriter.WriteAsync(evt);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to emit terminal {Status} audit row for notification {NotificationId}.",
notification.Status, notification.NotificationId);
}
}
/// <summary>
/// Maps the central-outbox <see cref="NotificationStatus"/> terminal
/// values onto the corresponding <see cref="AuditStatus"/> values used by
/// AuditLog (#23). Non-terminal statuses throw — the caller must gate on
/// <see cref="IsTerminal"/>.
/// </summary>
private static AuditStatus MapNotificationStatusToAuditStatus(NotificationStatus status)
{
return status switch
{
NotificationStatus.Delivered => AuditStatus.Delivered,
NotificationStatus.Parked => AuditStatus.Parked,
NotificationStatus.Discarded => AuditStatus.Discarded,
_ => throw new ArgumentOutOfRangeException(
nameof(status), status, "non-terminal status has no audit terminal mapping"),
};
}
/// <summary>
/// Emits a single
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// audit row with <see cref="AuditStatus.Attempted"/>. Wrapped in
/// try/catch so an audit-write failure never propagates back into the
/// dispatcher loop — the <see cref="CentralAuditWriter"/> already
/// swallows, this is defensive (alog.md §13).
/// </summary>
private void EmitAttemptAudit(
Notification notification,
DateTimeOffset now,
int durationMs,
string? errorMessage)
{
try
{
var evt = BuildNotifyDeliverEvent(notification, now, AuditStatus.Attempted, errorMessage)
with { DurationMs = durationMs };
// Fire-and-forget — we do NOT await: the dispatcher loop must not
// be blocked by audit IO, and the writer swallows its own faults.
// PipeTo is not used because the writer never throws.
_ = _auditWriter.WriteAsync(evt);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to emit Attempted audit row for notification {NotificationId}.",
notification.NotificationId);
}
}
/// <summary>
/// Builds a <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// row with the per-notification provenance fields (correlation id, list
/// name, source site/instance/script) populated from
/// <paramref name="notification"/>. <see cref="AuditEvent.CorrelationId"/>
/// parses the notification's id as a Guid; sites generate the id with
/// <c>Guid.NewGuid().ToString("N")</c> so the parse always succeeds, but
/// a non-Guid id is recorded as null rather than crashing the dispatcher.
/// </summary>
private static AuditEvent BuildNotifyDeliverEvent(
Notification notification,
DateTimeOffset now,
AuditStatus status,
string? errorMessage)
{
Guid? correlationId = Guid.TryParse(notification.NotificationId, out var parsed)
? parsed
: null;
return new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = now.UtcDateTime,
Channel = AuditChannel.Notification,
Kind = AuditKind.NotifyDeliver,
CorrelationId = correlationId,
// Central dispatch — no authenticated actor (the originating
// script's identity is captured on the upstream NotifySend row).
Actor = null,
SourceSiteId = notification.SourceSiteId,
SourceInstanceId = notification.SourceInstanceId,
SourceScript = notification.SourceScript,
Target = notification.ListName,
Status = status,
ErrorMessage = errorMessage,
};
}
/// <summary>
/// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal
/// notification rows older than <see cref="NotificationOutboxOptions.TerminalRetention"/>.
/// Purges are daily and idempotent, so no in-flight guard is needed. <see cref="RunPurgePass"/>
/// self-isolates its faults — it logs internally and never faults its task — so the
/// success projection is the normal completion path that logs the deleted count. The
/// failure projection is kept as a belt-and-braces backup, consistent with
/// <see cref="HandleDispatchTick"/>/<see cref="RunDispatchPass"/>.
/// </summary>
private void HandlePurgeTick()
{
var cutoff = DateTimeOffset.UtcNow - _options.TerminalRetention;
RunPurgePass(cutoff).PipeTo(
Self,
success: deleted =>
{
_logger.LogInformation(
"Purge removed {DeletedCount} terminal notification(s) older than {Cutoff:o}.",
deleted, cutoff);
return InternalMessages.PurgeComplete.Instance;
},
failure: ex =>
{
_logger.LogError(ex, "Purge sweep faulted unexpectedly.");
return InternalMessages.PurgeComplete.Instance;
});
}
/// <summary>
/// Runs a single purge sweep: resolves a scoped <see cref="INotificationOutboxRepository"/>
/// and bulk-deletes terminal rows created before <paramref name="cutoff"/>, returning the
/// deleted count. The whole body is wrapped in a try/catch so the returned task never
/// faults — scope creation, service resolution, and the bulk delete can all throw, and
/// self-isolating the fault here keeps the fault-handling strategy symmetric with
/// <see cref="RunDispatchPass"/>. On failure the exception is logged and 0 is returned.
/// </summary>
private async Task<int> RunPurgePass(DateTimeOffset cutoff)
{
try
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
return await repository.DeleteTerminalOlderThanAsync(cutoff);
}
catch (Exception ex)
{
// Scope/service resolution or the bulk delete faulted; swallow and log so the
// returned task completes normally, mirroring RunDispatchPass.
_logger.LogError(ex, "Purge sweep failed unexpectedly.");
return 0;
}
}
/// <summary>
/// Handles a paginated, filtered query over the outbox. Builds a
/// <see cref="NotificationOutboxFilter"/> from the request (parsing the string status/type
/// filters to their enums and deriving the stuck cutoff when <c>StuckOnly</c> is set),
/// runs the query on a scoped repository, and pipes the mapped response back to the
/// captured sender. A repository fault yields a failure response with an empty list.
/// </summary>
private void HandleQuery(NotificationOutboxQueryRequest request)
{
var sender = Sender;
var now = DateTimeOffset.UtcNow;
QueryOutboxAsync(request, now).PipeTo(
sender,
success: response => response,
failure: ex => new NotificationOutboxQueryResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Notifications: Array.Empty<NotificationSummary>(),
TotalCount: 0));
}
private async Task<NotificationOutboxQueryResponse> QueryOutboxAsync(
NotificationOutboxQueryRequest request, DateTimeOffset now)
{
var filter = new NotificationOutboxFilter(
Status: ParseEnum<NotificationStatus>(request.StatusFilter),
Type: ParseEnum<NotificationType>(request.TypeFilter),
SourceSiteId: request.SourceSiteFilter,
ListName: request.ListNameFilter,
SubjectKeyword: request.SubjectKeyword,
StuckOnly: request.StuckOnly,
StuckCutoff: request.StuckOnly ? StuckCutoff(now) : null,
From: request.From,
To: request.To);
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var (rows, totalCount) = await repository.QueryAsync(filter, request.PageNumber, request.PageSize);
var stuckCutoff = StuckCutoff(now);
var summaries = rows
.Select(row => new NotificationSummary(
row.NotificationId,
row.Type.ToString(),
row.ListName,
row.Subject,
row.Status.ToString(),
row.RetryCount,
row.LastError,
row.SourceSiteId,
row.SourceInstanceId,
row.CreatedAt,
row.DeliveredAt,
IsStuck: IsStuck(row, stuckCutoff)))
.ToList();
return new NotificationOutboxQueryResponse(
request.CorrelationId, Success: true, ErrorMessage: null, summaries, totalCount);
}
/// <summary>
/// Handles a single-notification status query. Replies <c>Found: false</c> with empty
/// detail when no row matches, otherwise the row's current status, retry count, last
/// error, and delivery time.
/// </summary>
private void HandleStatusQuery(NotificationStatusQuery query)
{
var sender = Sender;
StatusQueryAsync(query).PipeTo(
sender,
success: response => response,
failure: ex =>
{
// NotificationStatusResponse has no error field, so a repository fault is
// reported as Found: false — log the fault so a transient DB error is not
// silently indistinguishable from a genuinely-missing notification.
_logger.LogWarning(
ex, "Status query for notification {NotificationId} failed.", query.NotificationId);
return new NotificationStatusResponse(
query.CorrelationId, Found: false, Status: string.Empty,
RetryCount: 0, LastError: null, DeliveredAt: null);
});
}
private async Task<NotificationStatusResponse> StatusQueryAsync(NotificationStatusQuery query)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notification = await repository.GetByIdAsync(query.NotificationId);
if (notification is null)
{
return new NotificationStatusResponse(
query.CorrelationId, Found: false, Status: string.Empty,
RetryCount: 0, LastError: null, DeliveredAt: null);
}
return new NotificationStatusResponse(
query.CorrelationId,
Found: true,
Status: notification.Status.ToString(),
RetryCount: notification.RetryCount,
LastError: notification.LastError,
DeliveredAt: notification.DeliveredAt);
}
/// <summary>
/// Handles a full-detail query for a single notification — backs the report detail
/// modal, which needs the Body and resolved recipients that the grid summary omits.
/// </summary>
private void HandleDetailRequest(NotificationDetailRequest request)
{
var sender = Sender;
DetailAsync(request).PipeTo(
sender,
success: response => response,
failure: ex => new NotificationDetailResponse(
request.CorrelationId, Success: false,
ErrorMessage: ex.GetBaseException().Message, Detail: null));
}
private async Task<NotificationDetailResponse> DetailAsync(NotificationDetailRequest request)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notification = await repository.GetByIdAsync(request.NotificationId);
if (notification is null)
{
return new NotificationDetailResponse(
request.CorrelationId, Success: false,
ErrorMessage: "notification not found", Detail: null);
}
var detail = new NotificationDetail(
notification.NotificationId,
notification.Type.ToString(),
notification.ListName,
notification.Subject,
notification.Body,
notification.Status.ToString(),
notification.RetryCount,
notification.LastError,
notification.ResolvedTargets,
notification.TypeData,
notification.SourceSiteId,
notification.SourceInstanceId,
notification.SourceScript,
notification.SiteEnqueuedAt,
notification.CreatedAt,
notification.LastAttemptAt,
notification.NextAttemptAt,
notification.DeliveredAt);
return new NotificationDetailResponse(
request.CorrelationId, Success: true, ErrorMessage: null, detail);
}
/// <summary>
/// Handles a manual retry request. Only a <c>Parked</c> notification can be retried;
/// it is reset to <c>Pending</c> with a cleared retry count, next-attempt time, and
/// last error so the dispatch loop re-claims it on the next sweep.
/// </summary>
private void HandleRetry(RetryNotificationRequest request)
{
var sender = Sender;
RetryAsync(request).PipeTo(
sender,
success: response => response,
failure: ex => new RetryNotificationResponse(
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
}
private async Task<RetryNotificationResponse> RetryAsync(RetryNotificationRequest request)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notification = await repository.GetByIdAsync(request.NotificationId);
if (notification is null)
{
return new RetryNotificationResponse(
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
}
if (notification.Status != NotificationStatus.Parked)
{
return new RetryNotificationResponse(
request.CorrelationId, Success: false,
ErrorMessage: "only parked notifications can be retried");
}
notification.Status = NotificationStatus.Pending;
notification.RetryCount = 0;
notification.NextAttemptAt = null;
notification.LastError = null;
await repository.UpdateAsync(notification);
return new RetryNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
}
/// <summary>
/// Handles a manual discard request. Only a <c>Parked</c> notification can be discarded;
/// it is moved to the terminal <c>Discarded</c> status.
/// </summary>
private void HandleDiscard(DiscardNotificationRequest request)
{
var sender = Sender;
DiscardAsync(request).PipeTo(
sender,
success: response => response,
failure: ex => new DiscardNotificationResponse(
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
}
private async Task<DiscardNotificationResponse> DiscardAsync(DiscardNotificationRequest request)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var notification = await repository.GetByIdAsync(request.NotificationId);
if (notification is null)
{
return new DiscardNotificationResponse(
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
}
if (notification.Status != NotificationStatus.Parked)
{
return new DiscardNotificationResponse(
request.CorrelationId, Success: false,
ErrorMessage: "only parked notifications can be discarded");
}
notification.Status = NotificationStatus.Discarded;
await repository.UpdateAsync(notification);
// M4 Bundle B3: a manual discard is the OTHER code path that produces
// a terminal NotificationStatus transition (alongside the dispatcher).
// Emit a Discarded NotifyDeliver row to match the dispatcher's
// Delivered/Parked emissions; the row carries no error message because
// the discard is an operator-driven cancellation, not a delivery error.
EmitTerminalAudit(notification, DateTimeOffset.UtcNow, errorMessage: null);
return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
}
/// <summary>
/// Handles a KPI snapshot request, computing the outbox metrics with the stuck cutoff
/// derived from <see cref="NotificationOutboxOptions.StuckAgeThreshold"/> and the
/// delivered window from <see cref="NotificationOutboxOptions.DeliveredKpiWindow"/>.
/// </summary>
private void HandleKpiRequest(NotificationKpiRequest request)
{
var sender = Sender;
var now = DateTimeOffset.UtcNow;
var stuckCutoff = StuckCutoff(now);
var deliveredSince = now - _options.DeliveredKpiWindow;
ComputeKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(
sender,
success: response => response,
failure: ex => new NotificationKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
QueueDepth: 0,
StuckCount: 0,
ParkedCount: 0,
DeliveredLastInterval: 0,
OldestPendingAge: null));
}
private async Task<NotificationKpiResponse> ComputeKpisAsync(
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var snapshot = await repository.ComputeKpisAsync(stuckCutoff, deliveredSince);
return new NotificationKpiResponse(
correlationId,
Success: true,
ErrorMessage: null,
snapshot.QueueDepth,
snapshot.StuckCount,
snapshot.ParkedCount,
snapshot.DeliveredLastInterval,
snapshot.OldestPendingAge);
}
/// <summary>
/// Handles a per-site KPI request, computing the per-source-site outbox metrics with the
/// same stuck cutoff and delivered window as <see cref="HandleKpiRequest"/>.
/// </summary>
private void HandlePerSiteKpiRequest(PerSiteNotificationKpiRequest request)
{
var sender = Sender;
var now = DateTimeOffset.UtcNow;
var stuckCutoff = StuckCutoff(now);
var deliveredSince = now - _options.DeliveredKpiWindow;
ComputePerSiteKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(
sender,
success: response => response,
failure: ex => new PerSiteNotificationKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Sites: Array.Empty<SiteNotificationKpiSnapshot>()));
}
private async Task<PerSiteNotificationKpiResponse> ComputePerSiteKpisAsync(
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
var sites = await repository.ComputePerSiteKpisAsync(stuckCutoff, deliveredSince);
return new PerSiteNotificationKpiResponse(correlationId, Success: true, ErrorMessage: null, sites);
}
/// <summary>
/// The instant before which a still-pending notification counts as stuck — <paramref name="now"/>
/// offset back by <see cref="NotificationOutboxOptions.StuckAgeThreshold"/>.
/// </summary>
private DateTimeOffset StuckCutoff(DateTimeOffset now) => now - _options.StuckAgeThreshold;
/// <summary>
/// A notification counts as stuck when it is still in a non-terminal status
/// (<c>Pending</c> or <c>Retrying</c>) and was created before the supplied cutoff.
/// </summary>
private static bool IsStuck(Notification notification, DateTimeOffset stuckCutoff)
{
return notification.Status is NotificationStatus.Pending or NotificationStatus.Retrying
&& notification.CreatedAt < stuckCutoff;
}
/// <summary>
/// Parses a string filter value to a nullable enum, ignoring case. An empty, whitespace,
/// or unrecognised value yields <c>null</c> — meaning "no constraint on that dimension".
/// </summary>
private static TEnum? ParseEnum<TEnum>(string? value) where TEnum : struct, Enum
{
return Enum.TryParse<TEnum>(value, ignoreCase: true, out var parsed) ? parsed : null;
}
private static Notification BuildNotification(NotificationSubmit msg)
{
// All current notifications are email; NotificationType has only the Email member.
return new Notification(
msg.NotificationId,
NotificationType.Email,
msg.ListName,
msg.Subject,
msg.Body,
msg.SourceSiteId)
{
SourceInstanceId = msg.SourceInstanceId,
SourceScript = msg.SourceScript,
SiteEnqueuedAt = msg.SiteEnqueuedAt,
CreatedAt = DateTimeOffset.UtcNow,
// Status stays at its Pending default for the dispatch sweep to claim.
};
}
}