7b0b9c7365
Solution + 23 src projects + 26 test projects renamed; folders, csproj, namespaces, and ScadaLinkDbContext/ScadaBridgeDbContext class updated. ActorSystem "scadalink" → "scadabridge", Akka seed-node URLs migrated. SQL roles/logins, LDAP domains, CLI command name, and CLI config dir (~/.scadalink → ~/.scadabridge) also renamed. Build green; 5 Host.Tests fail awaiting SQL login rename in next commit. Pre-existing StaleTagMonitor timing flakes unchanged. Rename script committed at tools/rename-to-scadabridge.sh.
1142 lines
52 KiB
C#
1142 lines
52 KiB
C#
using Akka.Actor;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Notifications;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Notifications;
|
|
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Delivery;
|
|
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Messages;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox;
|
|
|
|
/// <summary>
|
|
/// Central-side actor that owns the notification outbox. It accepts
|
|
/// <see cref="NotificationSubmit"/> messages forwarded from sites and persists each as a
|
|
/// <see cref="Notification"/> row (the ingest path), and runs a periodic dispatch loop
|
|
/// that claims due notifications, delivers them through the matching channel adapter, and
|
|
/// applies the resulting status transition. It also runs a periodic purge that bulk-deletes
|
|
/// terminal notification rows once they age past the configured retention window.
|
|
/// </summary>
|
|
public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|
{
|
|
private const string DispatchTimerKey = "dispatch";
|
|
private const string PurgeTimerKey = "purge";
|
|
|
|
/// <summary>Retry policy fallback used when no SMTP configuration row is present.</summary>
|
|
private const int FallbackMaxRetries = 10;
|
|
private static readonly TimeSpan FallbackRetryDelay = TimeSpan.FromMinutes(1);
|
|
|
|
/// <summary>
|
|
/// Audit <c>Actor</c> stamped on central-dispatch (<c>NotifyDeliver</c>) rows.
|
|
/// The Actor-column spec assigns central-originated audit rows a system
|
|
/// identity — there is no per-call authenticated user at dispatch time.
|
|
/// </summary>
|
|
private const string SystemActor = "system";
|
|
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly NotificationOutboxOptions _options;
|
|
private readonly ICentralAuditWriter _auditWriter;
|
|
private readonly ILogger<NotificationOutboxActor> _logger;
|
|
|
|
/// <summary>
|
|
/// In-flight guard for the dispatch loop. Set true at the start of a sweep and cleared
|
|
/// when the sweep's <see cref="InternalMessages.DispatchComplete"/> arrives. While true,
|
|
/// further <see cref="InternalMessages.DispatchTick"/>s are dropped so sweeps never overlap.
|
|
/// </summary>
|
|
private bool _dispatching;
|
|
|
|
/// <summary>
|
|
/// NotificationOutbox-006: cached <see cref="NotificationType"/> → adapter lookup, built
|
|
/// lazily on the first dispatch sweep and reused for the lifetime of the actor. The
|
|
/// adapter registration is decided at startup by <c>AddNotificationOutbox</c> (the set is
|
|
/// keyed by <see cref="NotificationType"/> and is static per process lifetime), so
|
|
/// rebuilding this dictionary on every sweep was pure allocation waste.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The cache is paired with <see cref="_adaptersScope"/>, an actor-lifetime
|
|
/// <see cref="IServiceScope"/> created on first use so the cached scoped adapter
|
|
/// instances and their dependencies live as long as the cache itself. The scope is
|
|
/// disposed in <see cref="PostStop"/>. The adapters are stateless wrappers that
|
|
/// resolve their per-call collaborators (e.g. <see cref="INotificationRepository"/>'s
|
|
/// underlying DbContext) through their own injected dependencies; holding them for
|
|
/// the actor's lifetime is consistent with the actor's own singleton lifetime on the
|
|
/// active central node.
|
|
/// </remarks>
|
|
private IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter>? _adaptersCache;
|
|
|
|
/// <summary>
|
|
/// NotificationOutbox-006: actor-lifetime DI scope that owns the cached
|
|
/// <see cref="_adaptersCache"/> adapter instances. Created lazily on the first
|
|
/// dispatch sweep that needs adapters; disposed in <see cref="PostStop"/> so the
|
|
/// scoped adapter graph (and any disposable dependencies it transitively holds) is
|
|
/// torn down with the actor.
|
|
/// </summary>
|
|
private IServiceScope? _adaptersScope;
|
|
|
|
/// <summary>
|
|
/// NO-003: lifecycle-scoped cancellation source, cancelled in <see cref="PostStop"/> so
|
|
/// any in-flight dispatch sweep — including a long-running SMTP send via the channel
|
|
/// adapter — observes shutdown promptly instead of blocking <c>CoordinatedShutdown</c>
|
|
/// for the full SMTP connect/auth/send timeout per in-progress notification.
|
|
/// </summary>
|
|
private CancellationTokenSource? _shutdownCts;
|
|
|
|
/// <summary>Akka timer scheduler, assigned by the actor system via <see cref="IWithTimers"/>.</summary>
|
|
public ITimerScheduler Timers { get; set; } = null!;
|
|
|
|
/// <summary>
|
|
/// Initializes the actor with its dependencies and registers all message handlers.
|
|
/// </summary>
|
|
/// <param name="serviceProvider">DI service provider used to open scopes for database operations.</param>
|
|
/// <param name="options">Notification outbox configuration options.</param>
|
|
/// <param name="auditWriter">Central audit writer for recording dispatch events.</param>
|
|
/// <param name="logger">Logger for this actor.</param>
|
|
public NotificationOutboxActor(
|
|
IServiceProvider serviceProvider,
|
|
NotificationOutboxOptions options,
|
|
ICentralAuditWriter auditWriter,
|
|
ILogger<NotificationOutboxActor> logger)
|
|
{
|
|
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
|
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
|
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
Receive<NotificationSubmit>(HandleSubmit);
|
|
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
|
|
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
|
|
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
|
|
Receive<InternalMessages.PurgeTick>(_ => HandlePurgeTick());
|
|
// No-op: purge has no in-flight guard to lower, and the outcome is already logged
|
|
// by the PipeTo projections, so PurgeComplete carries nothing to act on.
|
|
Receive<InternalMessages.PurgeComplete>(_ => { });
|
|
Receive<NotificationOutboxQueryRequest>(HandleQuery);
|
|
Receive<NotificationStatusQuery>(HandleStatusQuery);
|
|
Receive<NotificationDetailRequest>(HandleDetailRequest);
|
|
Receive<RetryNotificationRequest>(HandleRetry);
|
|
Receive<DiscardNotificationRequest>(HandleDiscard);
|
|
Receive<NotificationKpiRequest>(HandleKpiRequest);
|
|
Receive<PerSiteNotificationKpiRequest>(HandlePerSiteKpiRequest);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void PreStart()
|
|
{
|
|
base.PreStart();
|
|
// NO-003: shutdown token is alive for the lifetime of the actor; cancelled in PostStop
|
|
// so dispatcher sweeps and the SMTP send beneath them observe coordinated shutdown.
|
|
_shutdownCts = new CancellationTokenSource();
|
|
Timers.StartPeriodicTimer(
|
|
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
|
|
Timers.StartPeriodicTimer(
|
|
PurgeTimerKey, InternalMessages.PurgeTick.Instance, _options.PurgeInterval);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void PostStop()
|
|
{
|
|
// NO-003: cancel the shutdown token first so the in-flight sweep's adapter call
|
|
// observes cancellation, then dispose the source. Order matters — disposing first
|
|
// would race with an in-flight sweep registering with the token.
|
|
try
|
|
{
|
|
_shutdownCts?.Cancel();
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
// Already disposed under a restarted-actor race; nothing to do.
|
|
}
|
|
|
|
_shutdownCts?.Dispose();
|
|
_shutdownCts = null;
|
|
|
|
// NotificationOutbox-006: dispose the actor-lifetime adapter scope so the cached
|
|
// scoped adapter instances and their disposable dependencies are torn down with
|
|
// the actor (e.g. on a CoordinatedShutdown / failover that stops the singleton).
|
|
_adaptersScope?.Dispose();
|
|
_adaptersScope = null;
|
|
_adaptersCache = null;
|
|
|
|
base.PostStop();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Maps an inbound <see cref="NotificationSubmit"/> onto a <see cref="Notification"/>,
|
|
/// persists it idempotently, and pipes the outcome back to <see cref="Self"/> so the
|
|
/// ack is sent from the actor thread with the original sender preserved.
|
|
/// </summary>
|
|
private void HandleSubmit(NotificationSubmit msg)
|
|
{
|
|
var sender = Sender;
|
|
var notification = BuildNotification(msg);
|
|
|
|
// The success projection fires for both a fresh insert and an existing row;
|
|
// only a thrown repository error reaches the failure projection.
|
|
PersistAsync(notification).PipeTo(
|
|
Self,
|
|
success: () => new InternalMessages.IngestPersisted(
|
|
msg.NotificationId, sender, Succeeded: true, Error: null),
|
|
failure: ex => new InternalMessages.IngestPersisted(
|
|
msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resolves a scoped <see cref="INotificationOutboxRepository"/> and inserts the
|
|
/// notification if a row with the same id does not already exist. The boolean result
|
|
/// of <c>InsertIfNotExistsAsync</c> is intentionally ignored: an existing row is an
|
|
/// idempotent re-submission and is acked just like a fresh insert so the site can
|
|
/// clear its forward buffer. Only a thrown error must surface to the caller.
|
|
/// </summary>
|
|
private async Task PersistAsync(Notification notification)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
await repository.InsertIfNotExistsAsync(notification);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Acks the original submitter once persistence completes. <see cref="NotificationSubmitAck"/>
|
|
/// is <c>Accepted</c> for both a fresh insert and an existing row; only a thrown
|
|
/// repository error produces <c>Accepted: false</c> so the site retries the forward.
|
|
/// </summary>
|
|
private void HandleIngestPersisted(InternalMessages.IngestPersisted msg)
|
|
{
|
|
if (msg.Succeeded)
|
|
{
|
|
_logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId);
|
|
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null));
|
|
}
|
|
else
|
|
{
|
|
_logger.LogWarning(
|
|
"Failed to ingest notification {NotificationId}: {Error}",
|
|
msg.NotificationId, msg.Error);
|
|
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error));
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a dispatch tick. If a sweep is already in flight the tick is dropped so
|
|
/// sweeps never overlap; otherwise the guard is raised and an asynchronous sweep is
|
|
/// launched, with a <see cref="InternalMessages.DispatchComplete"/> piped back to
|
|
/// <see cref="Self"/> to lower the guard on the actor thread.
|
|
/// </summary>
|
|
private void HandleDispatchTick()
|
|
{
|
|
if (_dispatching)
|
|
{
|
|
return;
|
|
}
|
|
|
|
_dispatching = true;
|
|
var now = DateTimeOffset.UtcNow;
|
|
// NO-003: hand the lifecycle token to the sweep so cancellation reaches the
|
|
// adapter. A null token (very early start / post-stop race) is replaced with
|
|
// None — no behaviour change vs. the pre-NO-003 dispatcher.
|
|
var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None;
|
|
|
|
// RunDispatchPass swallows its own errors, but the failure projection is kept as a
|
|
// belt-and-braces guard so even a faulted task still lowers the in-flight guard —
|
|
// otherwise the dispatcher would wedge permanently.
|
|
RunDispatchPass(now, cancellationToken).PipeTo(
|
|
Self,
|
|
success: () => InternalMessages.DispatchComplete.Instance,
|
|
failure: ex =>
|
|
{
|
|
_logger.LogError(ex, "Dispatch sweep faulted unexpectedly.");
|
|
return InternalMessages.DispatchComplete.Instance;
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Runs a single dispatch sweep: claims the due batch, resolves the retry policy, and
|
|
/// delivers each notification sequentially. Per-notification failures are caught and
|
|
/// logged so one bad row never aborts the rest of the batch. The whole body is wrapped
|
|
/// in a try/catch so the returned task never faults — scope creation, service resolution,
|
|
/// and retry-policy resolution can all throw, and a faulted task would otherwise leave
|
|
/// the dispatcher's in-flight guard stuck and wedge the loop permanently.
|
|
///
|
|
/// The per-sweep DI scope still owns the repository graph
|
|
/// (<see cref="INotificationOutboxRepository"/> + <see cref="INotificationRepository"/>),
|
|
/// which is correct because those services back a fresh DbContext per sweep. The
|
|
/// channel delivery adapters, however, are cached for the actor's lifetime via
|
|
/// <see cref="ResolveAdapters"/> — see <see cref="_adaptersCache"/> for the
|
|
/// NotificationOutbox-006 rationale.
|
|
/// </summary>
|
|
private async Task RunDispatchPass(DateTimeOffset now, CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var outboxRepository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
|
|
var adapters = ResolveAdapters();
|
|
|
|
IReadOnlyList<Notification> due;
|
|
try
|
|
{
|
|
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize, cancellationToken);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// NO-003: shutdown cancelled the claim; row stays Pending and the next active
|
|
// node picks it up. Not a failure.
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Dispatch sweep failed to claim due notifications.");
|
|
return;
|
|
}
|
|
|
|
if (due.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var (maxRetries, retryDelay) = await ResolveRetryPolicyAsync(notificationRepository);
|
|
|
|
foreach (var notification in due)
|
|
{
|
|
// NO-003: between deliveries, observe shutdown so we don't kick off a fresh
|
|
// SMTP send when the actor is already tearing down.
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await DeliverOneAsync(
|
|
notification, now, maxRetries, retryDelay, outboxRepository, adapters, cancellationToken);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// NO-003: in-flight delivery interrupted by shutdown. Row remains in its
|
|
// pre-attempt state; next active sweep retries.
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Isolate per-notification failures so the remainder of the batch still runs.
|
|
_logger.LogError(
|
|
ex, "Dispatch failed for notification {NotificationId}.", notification.NotificationId);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Scope/service resolution or retry-policy resolution faulted; swallow and log so
|
|
// the returned task completes normally and the in-flight guard is always cleared.
|
|
_logger.LogError(ex, "Dispatch sweep failed unexpectedly.");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resolves the retry policy from the first SMTP configuration row. When no SMTP
|
|
/// configuration exists, falls back to a conservative default — delivery itself will
|
|
/// permanently fail in that case, so the policy only acts as a guard.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// NO-002: a non-positive <see cref="SmtpConfiguration.MaxRetries"/> (zero or negative)
|
|
/// would otherwise satisfy <c>RetryCount >= maxRetries</c> on the very first transient
|
|
/// failure and park the row without a single retry — silently halving the outbox's
|
|
/// delivery guarantees. The same applies to a non-positive <c>RetryDelay</c>, which
|
|
/// would burn-loop the dispatcher. Both values are clamped to the fallback constants
|
|
/// with a Warning so an operator can spot the misconfiguration in logs.
|
|
/// </remarks>
|
|
private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync(
|
|
INotificationRepository notificationRepository)
|
|
{
|
|
var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync();
|
|
var configuration = configurations.Count > 0 ? configurations[0] : null;
|
|
if (configuration is null)
|
|
{
|
|
return (FallbackMaxRetries, FallbackRetryDelay);
|
|
}
|
|
|
|
var maxRetries = configuration.MaxRetries;
|
|
var retryDelay = configuration.RetryDelay;
|
|
|
|
if (maxRetries <= 0)
|
|
{
|
|
_logger.LogWarning(
|
|
"SmtpConfiguration.MaxRetries={ConfiguredMaxRetries} is non-positive; " +
|
|
"clamping to FallbackMaxRetries={FallbackMaxRetries} so transient failures " +
|
|
"actually retry before parking.",
|
|
maxRetries, FallbackMaxRetries);
|
|
maxRetries = FallbackMaxRetries;
|
|
}
|
|
|
|
if (retryDelay <= TimeSpan.Zero)
|
|
{
|
|
_logger.LogWarning(
|
|
"SmtpConfiguration.RetryDelay={ConfiguredRetryDelay} is non-positive; " +
|
|
"clamping to FallbackRetryDelay={FallbackRetryDelay} so the dispatcher does " +
|
|
"not burn-loop on transient failures.",
|
|
retryDelay, FallbackRetryDelay);
|
|
retryDelay = FallbackRetryDelay;
|
|
}
|
|
|
|
return (maxRetries, retryDelay);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the <see cref="NotificationType"/> → adapter lookup, building it lazily on
|
|
/// the first call and caching it on <see cref="_adaptersCache"/> for the actor's
|
|
/// lifetime. The last adapter registered for a given type wins, mirroring DI's
|
|
/// last-wins resolution semantics.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// NotificationOutbox-006: the lookup used to be rebuilt on every dispatch sweep
|
|
/// from the per-sweep DI scope. Adapter registration is static per process
|
|
/// lifetime, so the dict is now built ONCE — on the first sweep that needs it —
|
|
/// and reused. To respect each adapter's scoped lifetime
|
|
/// (<see cref="EmailNotificationDeliveryAdapter"/> takes a scoped
|
|
/// <see cref="INotificationRepository"/>), the cache is paired with
|
|
/// <see cref="_adaptersScope"/>, an actor-lifetime <see cref="IServiceScope"/> that
|
|
/// owns the cached adapter instances and is disposed in <see cref="PostStop"/>.
|
|
/// </remarks>
|
|
private IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> ResolveAdapters()
|
|
{
|
|
if (_adaptersCache is not null)
|
|
{
|
|
return _adaptersCache;
|
|
}
|
|
|
|
_adaptersScope = _serviceProvider.CreateScope();
|
|
var adapters = new Dictionary<NotificationType, INotificationDeliveryAdapter>();
|
|
foreach (var adapter in _adaptersScope.ServiceProvider.GetServices<INotificationDeliveryAdapter>())
|
|
{
|
|
adapters[adapter.Type] = adapter;
|
|
}
|
|
|
|
_adaptersCache = adapters;
|
|
return _adaptersCache;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Delivers a single notification through its channel adapter and applies the resulting
|
|
/// status transition. A missing adapter parks the notification; otherwise the
|
|
/// <see cref="DeliveryOutcome"/> drives the transition. The updated row is always persisted.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// M4 Bundle B2 + B3: a single
|
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
|
/// row is emitted with <see cref="AuditStatus.Attempted"/> per attempt
|
|
/// (success, transient, permanent); when the post-outcome status is a
|
|
/// terminal one (Delivered, Parked) a SECOND row is emitted carrying
|
|
/// that terminal status. Both emissions are wrapped in a try/catch so a
|
|
/// thrown audit writer NEVER aborts the user-facing dispatch — the
|
|
/// <see cref="CentralAuditWriter"/> itself swallows internal failures,
|
|
/// but the dispatcher wraps defensively per alog.md §13. The
|
|
/// missing-adapter park path also emits both rows because it IS an
|
|
/// attempt that resolved to a park from the dispatcher's point of view.
|
|
/// </para>
|
|
/// <para>
|
|
/// Attempt duration is measured around the adapter call and recorded on
|
|
/// the Attempted row so downstream KPIs can compute per-attempt latency
|
|
/// without joining to the row update timestamps.
|
|
/// </para>
|
|
/// </remarks>
|
|
private async Task DeliverOneAsync(
|
|
Notification notification,
|
|
DateTimeOffset now,
|
|
int maxRetries,
|
|
TimeSpan retryDelay,
|
|
INotificationOutboxRepository outboxRepository,
|
|
IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> adapters,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (!adapters.TryGetValue(notification.Type, out var adapter))
|
|
{
|
|
// Missing-adapter park: from the dispatcher's perspective this is an
|
|
// attempt that resolved to a terminal park. Emit Attempted then the
|
|
// terminal Parked row, both carrying the same explanatory error.
|
|
var missingAdapterError = $"no delivery adapter for type {notification.Type}";
|
|
notification.Status = NotificationStatus.Parked;
|
|
notification.LastError = missingAdapterError;
|
|
notification.LastAttemptAt = now;
|
|
await outboxRepository.UpdateAsync(notification, cancellationToken);
|
|
await EmitAttemptAuditAsync(
|
|
notification,
|
|
now,
|
|
durationMs: 0,
|
|
errorMessage: missingAdapterError);
|
|
await EmitTerminalAuditAsync(notification, now, errorMessage: missingAdapterError);
|
|
return;
|
|
}
|
|
|
|
// Measure the attempt duration around the adapter call so the
|
|
// Attempted row carries it for KPI use.
|
|
// NO-003: pass the lifecycle token so a coordinated shutdown promptly cancels the
|
|
// in-flight SMTP send instead of waiting for the SMTP connect/auth/send timeout.
|
|
var attemptStart = DateTimeOffset.UtcNow;
|
|
var outcome = await adapter.DeliverAsync(notification, cancellationToken);
|
|
var durationMs = (int)Math.Min(
|
|
int.MaxValue, Math.Max(0, (DateTimeOffset.UtcNow - attemptStart).TotalMilliseconds));
|
|
|
|
switch (outcome.Result)
|
|
{
|
|
case DeliveryResult.Success:
|
|
notification.Status = NotificationStatus.Delivered;
|
|
notification.DeliveredAt = now;
|
|
notification.LastAttemptAt = now;
|
|
notification.ResolvedTargets = outcome.ResolvedTargets;
|
|
notification.LastError = null;
|
|
break;
|
|
|
|
case DeliveryResult.TransientFailure:
|
|
notification.LastAttemptAt = now;
|
|
notification.RetryCount++;
|
|
notification.LastError = outcome.Error;
|
|
if (notification.RetryCount >= maxRetries)
|
|
{
|
|
notification.Status = NotificationStatus.Parked;
|
|
}
|
|
else
|
|
{
|
|
notification.Status = NotificationStatus.Retrying;
|
|
notification.NextAttemptAt = now + retryDelay;
|
|
}
|
|
break;
|
|
|
|
case DeliveryResult.PermanentFailure:
|
|
notification.Status = NotificationStatus.Parked;
|
|
notification.LastAttemptAt = now;
|
|
notification.LastError = outcome.Error;
|
|
break;
|
|
}
|
|
|
|
await outboxRepository.UpdateAsync(notification, cancellationToken);
|
|
|
|
// Emit the per-attempt Attempted row exactly once regardless of the
|
|
// outcome (B2). The error message comes from the outcome, not from
|
|
// notification.LastError, so a success row is null and a transient
|
|
// row carries the SMTP failure reason verbatim.
|
|
await EmitAttemptAuditAsync(
|
|
notification,
|
|
now,
|
|
durationMs: durationMs,
|
|
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
|
|
|
|
// If the post-outcome status is terminal (Delivered or Parked — the
|
|
// dispatcher never sets Discarded; that lives on the manual discard
|
|
// path), emit the terminal NotifyDeliver row (B3). The error message
|
|
// on a Delivered terminal is null; on Parked it carries the outcome's
|
|
// reason so downstream consumers can link Attempted+Parked rows.
|
|
if (IsTerminal(notification.Status))
|
|
{
|
|
await EmitTerminalAuditAsync(
|
|
notification,
|
|
now,
|
|
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// True for <see cref="NotificationStatus.Delivered"/>,
|
|
/// <see cref="NotificationStatus.Parked"/>, or
|
|
/// <see cref="NotificationStatus.Discarded"/> — the three terminal states
|
|
/// on the central outbox lifecycle. Used by the dispatcher and the manual
|
|
/// discard handler to decide when to emit the terminal NotifyDeliver row.
|
|
/// </summary>
|
|
private static bool IsTerminal(NotificationStatus status)
|
|
{
|
|
return status is NotificationStatus.Delivered
|
|
or NotificationStatus.Parked
|
|
or NotificationStatus.Discarded;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Emits a single
|
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
|
/// audit row carrying the terminal status (Delivered, Parked, or
|
|
/// Discarded) of <paramref name="notification"/>. Wrapped in try/catch
|
|
/// for the same defensive reason as <see cref="EmitAttemptAuditAsync"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// NO-004: <see cref="ICentralAuditWriter.WriteAsync"/> is awaited inside the
|
|
/// try/catch so the catch is actually reachable for writer faults and so the
|
|
/// audit task does not outlive the per-sweep DI scope. The audit-write-never-
|
|
/// affects-delivery invariant is preserved by the surrounding catch.
|
|
/// </remarks>
|
|
private async Task EmitTerminalAuditAsync(
|
|
Notification notification,
|
|
DateTimeOffset now,
|
|
string? errorMessage)
|
|
{
|
|
try
|
|
{
|
|
var terminalStatus = MapNotificationStatusToAuditStatus(notification.Status);
|
|
var evt = BuildNotifyDeliverEvent(notification, now, terminalStatus, errorMessage);
|
|
await _auditWriter.WriteAsync(evt);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(
|
|
ex,
|
|
"Failed to emit terminal {Status} audit row for notification {NotificationId}.",
|
|
notification.Status, notification.NotificationId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Maps the central-outbox <see cref="NotificationStatus"/> terminal
|
|
/// values onto the corresponding <see cref="AuditStatus"/> values used by
|
|
/// AuditLog (#23). Non-terminal statuses throw — the caller must gate on
|
|
/// <see cref="IsTerminal"/>.
|
|
/// </summary>
|
|
private static AuditStatus MapNotificationStatusToAuditStatus(NotificationStatus status)
|
|
{
|
|
return status switch
|
|
{
|
|
NotificationStatus.Delivered => AuditStatus.Delivered,
|
|
NotificationStatus.Parked => AuditStatus.Parked,
|
|
NotificationStatus.Discarded => AuditStatus.Discarded,
|
|
_ => throw new ArgumentOutOfRangeException(
|
|
nameof(status), status, "non-terminal status has no audit terminal mapping"),
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Emits a single
|
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
|
/// audit row with <see cref="AuditStatus.Attempted"/>. Wrapped in
|
|
/// try/catch so an audit-write failure never propagates back into the
|
|
/// dispatcher loop (alog.md §13).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// NO-004: previously the writer task was discarded (<c>_ = WriteAsync(...)</c>),
|
|
/// which made the surrounding catch unreachable for any fault originating in the
|
|
/// awaited body of <c>WriteAsync</c> and let the audit task outlive the dispatcher's
|
|
/// per-sweep DI scope. The task is now awaited inside the try/catch: the
|
|
/// audit-write-never-affects-delivery invariant is preserved by the catch, and
|
|
/// writer faults reach the operator log instead of being silently lost.
|
|
/// </remarks>
|
|
private async Task EmitAttemptAuditAsync(
|
|
Notification notification,
|
|
DateTimeOffset now,
|
|
int durationMs,
|
|
string? errorMessage)
|
|
{
|
|
try
|
|
{
|
|
var evt = BuildNotifyDeliverEvent(notification, now, AuditStatus.Attempted, errorMessage)
|
|
with { DurationMs = durationMs };
|
|
await _auditWriter.WriteAsync(evt);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(
|
|
ex,
|
|
"Failed to emit Attempted audit row for notification {NotificationId}.",
|
|
notification.NotificationId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Builds a <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
|
/// row with the per-notification provenance fields (correlation id, list
|
|
/// name, source site/instance/script) populated from
|
|
/// <paramref name="notification"/>. <see cref="AuditEvent.CorrelationId"/>
|
|
/// parses the notification's id as a Guid; sites generate the id with
|
|
/// <c>Guid.NewGuid().ToString("N")</c> so the parse always succeeds, but
|
|
/// a non-Guid id is recorded as null rather than crashing the dispatcher.
|
|
/// <see cref="AuditEvent.ExecutionId"/> is copied straight from
|
|
/// <see cref="Notification.OriginExecutionId"/> so the dispatcher's
|
|
/// <c>NotifyDeliver</c> rows carry the same per-run id as the site's
|
|
/// <c>NotifySend</c> row (Audit Log #23); <see cref="AuditEvent.ParentExecutionId"/>
|
|
/// is likewise copied from <see cref="Notification.OriginParentExecutionId"/>.
|
|
/// </summary>
|
|
private static AuditEvent BuildNotifyDeliverEvent(
|
|
Notification notification,
|
|
DateTimeOffset now,
|
|
AuditStatus status,
|
|
string? errorMessage)
|
|
{
|
|
Guid? correlationId = Guid.TryParse(notification.NotificationId, out var parsed)
|
|
? parsed
|
|
: null;
|
|
|
|
return new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = now.UtcDateTime,
|
|
Channel = AuditChannel.Notification,
|
|
Kind = AuditKind.NotifyDeliver,
|
|
CorrelationId = correlationId,
|
|
// Central dispatch — a system identity per the Actor-column spec;
|
|
// there is no per-call authenticated user here. The originating
|
|
// script is still captured on SourceScript (and on the upstream
|
|
// NotifySend row).
|
|
Actor = SystemActor,
|
|
SourceSiteId = notification.SourceSiteId,
|
|
SourceInstanceId = notification.SourceInstanceId,
|
|
SourceScript = notification.SourceScript,
|
|
// ExecutionId (Audit Log #23): the originating script execution's id,
|
|
// carried from the site on NotificationSubmit and persisted on the
|
|
// Notification row. Echoing it here links the central NotifyDeliver
|
|
// rows to the site-emitted NotifySend row for the same run. Null when
|
|
// the notification was raised outside a script execution.
|
|
ExecutionId = notification.OriginExecutionId,
|
|
// ParentExecutionId (Audit Log #23): the originating routed run's
|
|
// parent ExecutionId, carried from the site on NotificationSubmit and
|
|
// persisted on the Notification row. Echoing it here links the central
|
|
// NotifyDeliver rows to the routed run's parent. Null for non-routed runs.
|
|
ParentExecutionId = notification.OriginParentExecutionId,
|
|
Target = notification.ListName,
|
|
Status = status,
|
|
ErrorMessage = errorMessage,
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal
|
|
/// notification rows older than <see cref="NotificationOutboxOptions.TerminalRetention"/>.
|
|
/// Purges are daily and idempotent, so no in-flight guard is needed. <see cref="RunPurgePass"/>
|
|
/// self-isolates its faults — it logs internally and never faults its task — so the
|
|
/// success projection is the normal completion path that logs the deleted count. The
|
|
/// failure projection is kept as a belt-and-braces backup, consistent with
|
|
/// <see cref="HandleDispatchTick"/>/<see cref="RunDispatchPass"/>.
|
|
/// </summary>
|
|
private void HandlePurgeTick()
|
|
{
|
|
var cutoff = DateTimeOffset.UtcNow - _options.TerminalRetention;
|
|
|
|
RunPurgePass(cutoff).PipeTo(
|
|
Self,
|
|
success: deleted =>
|
|
{
|
|
_logger.LogInformation(
|
|
"Purge removed {DeletedCount} terminal notification(s) older than {Cutoff:o}.",
|
|
deleted, cutoff);
|
|
return InternalMessages.PurgeComplete.Instance;
|
|
},
|
|
failure: ex =>
|
|
{
|
|
_logger.LogError(ex, "Purge sweep faulted unexpectedly.");
|
|
return InternalMessages.PurgeComplete.Instance;
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Runs a single purge sweep: resolves a scoped <see cref="INotificationOutboxRepository"/>
|
|
/// and bulk-deletes terminal rows created before <paramref name="cutoff"/>, returning the
|
|
/// deleted count. The whole body is wrapped in a try/catch so the returned task never
|
|
/// faults — scope creation, service resolution, and the bulk delete can all throw, and
|
|
/// self-isolating the fault here keeps the fault-handling strategy symmetric with
|
|
/// <see cref="RunDispatchPass"/>. On failure the exception is logged and 0 is returned.
|
|
/// </summary>
|
|
private async Task<int> RunPurgePass(DateTimeOffset cutoff)
|
|
{
|
|
try
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
return await repository.DeleteTerminalOlderThanAsync(cutoff);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Scope/service resolution or the bulk delete faulted; swallow and log so the
|
|
// returned task completes normally, mirroring RunDispatchPass.
|
|
_logger.LogError(ex, "Purge sweep failed unexpectedly.");
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a paginated, filtered query over the outbox. Builds a
|
|
/// <see cref="NotificationOutboxFilter"/> from the request (parsing the string status/type
|
|
/// filters to their enums and deriving the stuck cutoff when <c>StuckOnly</c> is set),
|
|
/// runs the query on a scoped repository, and pipes the mapped response back to the
|
|
/// captured sender. A repository fault yields a failure response with an empty list.
|
|
/// </summary>
|
|
private void HandleQuery(NotificationOutboxQueryRequest request)
|
|
{
|
|
var sender = Sender;
|
|
var now = DateTimeOffset.UtcNow;
|
|
|
|
QueryOutboxAsync(request, now).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new NotificationOutboxQueryResponse(
|
|
request.CorrelationId,
|
|
Success: false,
|
|
ErrorMessage: ex.GetBaseException().Message,
|
|
Notifications: Array.Empty<NotificationSummary>(),
|
|
TotalCount: 0));
|
|
}
|
|
|
|
private async Task<NotificationOutboxQueryResponse> QueryOutboxAsync(
|
|
NotificationOutboxQueryRequest request, DateTimeOffset now)
|
|
{
|
|
var filter = new NotificationOutboxFilter(
|
|
Status: ParseEnum<NotificationStatus>(request.StatusFilter),
|
|
Type: ParseEnum<NotificationType>(request.TypeFilter),
|
|
SourceSiteId: request.SourceSiteFilter,
|
|
ListName: request.ListNameFilter,
|
|
SubjectKeyword: request.SubjectKeyword,
|
|
StuckOnly: request.StuckOnly,
|
|
StuckCutoff: request.StuckOnly ? StuckCutoff(now) : null,
|
|
From: request.From,
|
|
To: request.To,
|
|
SourceNode: request.SourceNodeFilter);
|
|
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var (rows, totalCount) = await repository.QueryAsync(filter, request.PageNumber, request.PageSize);
|
|
|
|
var stuckCutoff = StuckCutoff(now);
|
|
var summaries = rows
|
|
.Select(row => new NotificationSummary(
|
|
row.NotificationId,
|
|
row.Type.ToString(),
|
|
row.ListName,
|
|
row.Subject,
|
|
row.Status.ToString(),
|
|
row.RetryCount,
|
|
row.LastError,
|
|
row.SourceSiteId,
|
|
row.SourceInstanceId,
|
|
row.CreatedAt,
|
|
row.DeliveredAt,
|
|
IsStuck: IsStuck(row, stuckCutoff),
|
|
SourceNode: row.SourceNode))
|
|
.ToList();
|
|
|
|
return new NotificationOutboxQueryResponse(
|
|
request.CorrelationId, Success: true, ErrorMessage: null, summaries, totalCount);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a single-notification status query. Replies <c>Found: false</c> with empty
|
|
/// detail when no row matches, otherwise the row's current status, retry count, last
|
|
/// error, and delivery time.
|
|
/// </summary>
|
|
private void HandleStatusQuery(NotificationStatusQuery query)
|
|
{
|
|
var sender = Sender;
|
|
|
|
StatusQueryAsync(query).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex =>
|
|
{
|
|
// NotificationStatusResponse has no error field, so a repository fault is
|
|
// reported as Found: false — log the fault so a transient DB error is not
|
|
// silently indistinguishable from a genuinely-missing notification.
|
|
_logger.LogWarning(
|
|
ex, "Status query for notification {NotificationId} failed.", query.NotificationId);
|
|
return new NotificationStatusResponse(
|
|
query.CorrelationId, Found: false, Status: string.Empty,
|
|
RetryCount: 0, LastError: null, DeliveredAt: null);
|
|
});
|
|
}
|
|
|
|
private async Task<NotificationStatusResponse> StatusQueryAsync(NotificationStatusQuery query)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var notification = await repository.GetByIdAsync(query.NotificationId);
|
|
|
|
if (notification is null)
|
|
{
|
|
return new NotificationStatusResponse(
|
|
query.CorrelationId, Found: false, Status: string.Empty,
|
|
RetryCount: 0, LastError: null, DeliveredAt: null);
|
|
}
|
|
|
|
return new NotificationStatusResponse(
|
|
query.CorrelationId,
|
|
Found: true,
|
|
Status: notification.Status.ToString(),
|
|
RetryCount: notification.RetryCount,
|
|
LastError: notification.LastError,
|
|
DeliveredAt: notification.DeliveredAt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a full-detail query for a single notification — backs the report detail
|
|
/// modal, which needs the Body and resolved recipients that the grid summary omits.
|
|
/// </summary>
|
|
private void HandleDetailRequest(NotificationDetailRequest request)
|
|
{
|
|
var sender = Sender;
|
|
|
|
DetailAsync(request).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new NotificationDetailResponse(
|
|
request.CorrelationId, Success: false,
|
|
ErrorMessage: ex.GetBaseException().Message, Detail: null));
|
|
}
|
|
|
|
private async Task<NotificationDetailResponse> DetailAsync(NotificationDetailRequest request)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var notification = await repository.GetByIdAsync(request.NotificationId);
|
|
|
|
if (notification is null)
|
|
{
|
|
return new NotificationDetailResponse(
|
|
request.CorrelationId, Success: false,
|
|
ErrorMessage: "notification not found", Detail: null);
|
|
}
|
|
|
|
var detail = new NotificationDetail(
|
|
notification.NotificationId,
|
|
notification.Type.ToString(),
|
|
notification.ListName,
|
|
notification.Subject,
|
|
notification.Body,
|
|
notification.Status.ToString(),
|
|
notification.RetryCount,
|
|
notification.LastError,
|
|
notification.ResolvedTargets,
|
|
notification.TypeData,
|
|
notification.SourceSiteId,
|
|
notification.SourceInstanceId,
|
|
notification.SourceScript,
|
|
notification.SiteEnqueuedAt,
|
|
notification.CreatedAt,
|
|
notification.LastAttemptAt,
|
|
notification.NextAttemptAt,
|
|
notification.DeliveredAt,
|
|
notification.SourceNode);
|
|
|
|
return new NotificationDetailResponse(
|
|
request.CorrelationId, Success: true, ErrorMessage: null, detail);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a manual retry request. Only a <c>Parked</c> notification can be retried;
|
|
/// it is reset to <c>Pending</c> with a cleared retry count, next-attempt time, and
|
|
/// last error so the dispatch loop re-claims it on the next sweep.
|
|
/// </summary>
|
|
private void HandleRetry(RetryNotificationRequest request)
|
|
{
|
|
var sender = Sender;
|
|
|
|
RetryAsync(request).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new RetryNotificationResponse(
|
|
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
|
|
}
|
|
|
|
private async Task<RetryNotificationResponse> RetryAsync(RetryNotificationRequest request)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var notification = await repository.GetByIdAsync(request.NotificationId);
|
|
|
|
if (notification is null)
|
|
{
|
|
return new RetryNotificationResponse(
|
|
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
|
|
}
|
|
|
|
if (notification.Status != NotificationStatus.Parked)
|
|
{
|
|
return new RetryNotificationResponse(
|
|
request.CorrelationId, Success: false,
|
|
ErrorMessage: "only parked notifications can be retried");
|
|
}
|
|
|
|
notification.Status = NotificationStatus.Pending;
|
|
notification.RetryCount = 0;
|
|
notification.NextAttemptAt = null;
|
|
notification.LastError = null;
|
|
await repository.UpdateAsync(notification);
|
|
|
|
return new RetryNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a manual discard request. Only a <c>Parked</c> notification can be discarded;
|
|
/// it is moved to the terminal <c>Discarded</c> status.
|
|
/// </summary>
|
|
private void HandleDiscard(DiscardNotificationRequest request)
|
|
{
|
|
var sender = Sender;
|
|
|
|
DiscardAsync(request).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new DiscardNotificationResponse(
|
|
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
|
|
}
|
|
|
|
private async Task<DiscardNotificationResponse> DiscardAsync(DiscardNotificationRequest request)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var notification = await repository.GetByIdAsync(request.NotificationId);
|
|
|
|
if (notification is null)
|
|
{
|
|
return new DiscardNotificationResponse(
|
|
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
|
|
}
|
|
|
|
if (notification.Status != NotificationStatus.Parked)
|
|
{
|
|
return new DiscardNotificationResponse(
|
|
request.CorrelationId, Success: false,
|
|
ErrorMessage: "only parked notifications can be discarded");
|
|
}
|
|
|
|
notification.Status = NotificationStatus.Discarded;
|
|
await repository.UpdateAsync(notification);
|
|
|
|
// M4 Bundle B3: a manual discard is the OTHER code path that produces
|
|
// a terminal NotificationStatus transition (alongside the dispatcher).
|
|
// Emit a Discarded NotifyDeliver row to match the dispatcher's
|
|
// Delivered/Parked emissions; the row carries no error message because
|
|
// the discard is an operator-driven cancellation, not a delivery error.
|
|
await EmitTerminalAuditAsync(notification, DateTimeOffset.UtcNow, errorMessage: null);
|
|
|
|
return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a KPI snapshot request, computing the outbox metrics with the stuck cutoff
|
|
/// derived from <see cref="NotificationOutboxOptions.StuckAgeThreshold"/> and the
|
|
/// delivered window from <see cref="NotificationOutboxOptions.DeliveredKpiWindow"/>.
|
|
/// </summary>
|
|
private void HandleKpiRequest(NotificationKpiRequest request)
|
|
{
|
|
var sender = Sender;
|
|
var now = DateTimeOffset.UtcNow;
|
|
var stuckCutoff = StuckCutoff(now);
|
|
var deliveredSince = now - _options.DeliveredKpiWindow;
|
|
|
|
ComputeKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new NotificationKpiResponse(
|
|
request.CorrelationId,
|
|
Success: false,
|
|
ErrorMessage: ex.GetBaseException().Message,
|
|
QueueDepth: 0,
|
|
StuckCount: 0,
|
|
ParkedCount: 0,
|
|
DeliveredLastInterval: 0,
|
|
OldestPendingAge: null));
|
|
}
|
|
|
|
private async Task<NotificationKpiResponse> ComputeKpisAsync(
|
|
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var snapshot = await repository.ComputeKpisAsync(stuckCutoff, deliveredSince);
|
|
|
|
return new NotificationKpiResponse(
|
|
correlationId,
|
|
Success: true,
|
|
ErrorMessage: null,
|
|
snapshot.QueueDepth,
|
|
snapshot.StuckCount,
|
|
snapshot.ParkedCount,
|
|
snapshot.DeliveredLastInterval,
|
|
snapshot.OldestPendingAge);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles a per-site KPI request, computing the per-source-site outbox metrics with the
|
|
/// same stuck cutoff and delivered window as <see cref="HandleKpiRequest"/>.
|
|
/// </summary>
|
|
private void HandlePerSiteKpiRequest(PerSiteNotificationKpiRequest request)
|
|
{
|
|
var sender = Sender;
|
|
var now = DateTimeOffset.UtcNow;
|
|
var stuckCutoff = StuckCutoff(now);
|
|
var deliveredSince = now - _options.DeliveredKpiWindow;
|
|
|
|
ComputePerSiteKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(
|
|
sender,
|
|
success: response => response,
|
|
failure: ex => new PerSiteNotificationKpiResponse(
|
|
request.CorrelationId,
|
|
Success: false,
|
|
ErrorMessage: ex.GetBaseException().Message,
|
|
Sites: Array.Empty<SiteNotificationKpiSnapshot>()));
|
|
}
|
|
|
|
private async Task<PerSiteNotificationKpiResponse> ComputePerSiteKpisAsync(
|
|
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
|
var sites = await repository.ComputePerSiteKpisAsync(stuckCutoff, deliveredSince);
|
|
|
|
return new PerSiteNotificationKpiResponse(correlationId, Success: true, ErrorMessage: null, sites);
|
|
}
|
|
|
|
/// <summary>
|
|
/// The instant before which a still-pending notification counts as stuck — <paramref name="now"/>
|
|
/// offset back by <see cref="NotificationOutboxOptions.StuckAgeThreshold"/>.
|
|
/// </summary>
|
|
private DateTimeOffset StuckCutoff(DateTimeOffset now) => now - _options.StuckAgeThreshold;
|
|
|
|
/// <summary>
|
|
/// A notification counts as stuck when it is still in a non-terminal status
|
|
/// (<c>Pending</c> or <c>Retrying</c>) and was created before the supplied cutoff.
|
|
/// </summary>
|
|
private static bool IsStuck(Notification notification, DateTimeOffset stuckCutoff)
|
|
{
|
|
return notification.Status is NotificationStatus.Pending or NotificationStatus.Retrying
|
|
&& notification.CreatedAt < stuckCutoff;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses a string filter value to a nullable enum, ignoring case. An empty, whitespace,
|
|
/// or unrecognised value yields <c>null</c> — meaning "no constraint on that dimension".
|
|
/// </summary>
|
|
private static TEnum? ParseEnum<TEnum>(string? value) where TEnum : struct, Enum
|
|
{
|
|
return Enum.TryParse<TEnum>(value, ignoreCase: true, out var parsed) ? parsed : null;
|
|
}
|
|
|
|
private static Notification BuildNotification(NotificationSubmit msg)
|
|
{
|
|
// All current notifications are email; NotificationType has only the Email member.
|
|
return new Notification(
|
|
msg.NotificationId,
|
|
NotificationType.Email,
|
|
msg.ListName,
|
|
msg.Subject,
|
|
msg.Body,
|
|
msg.SourceSiteId)
|
|
{
|
|
SourceInstanceId = msg.SourceInstanceId,
|
|
SourceScript = msg.SourceScript,
|
|
// SourceNode (SourceNode-stamping Task 13): the cluster node on which the
|
|
// notification was emitted (node-a/node-b for site rows). Stamped by the
|
|
// emitting site from INodeIdentityProvider and carried, inside the
|
|
// serialized payload, through the S&F buffer to central. EF tracked-entity
|
|
// insert flows it through to the Notifications.SourceNode column. Null on
|
|
// submissions buffered before the field existed.
|
|
SourceNode = msg.SourceNode,
|
|
// OriginExecutionId (Audit Log #23): the originating script execution's id,
|
|
// carried from the site so the dispatcher can echo it onto NotifyDeliver rows.
|
|
OriginExecutionId = msg.OriginExecutionId,
|
|
// OriginParentExecutionId (Audit Log #23): the originating routed run's parent
|
|
// ExecutionId, carried from the site so the dispatcher can echo it onto
|
|
// NotifyDeliver rows.
|
|
OriginParentExecutionId = msg.OriginParentExecutionId,
|
|
SiteEnqueuedAt = msg.SiteEnqueuedAt,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
// Status stays at its Pending default for the dispatch sweep to claim.
|
|
};
|
|
}
|
|
}
|