feat(notif): emit NotifyDeliver(terminal) on terminal transitions (#23 M4)
M4 Bundle B (B3) — NotificationOutboxActor emits a second NotifyDeliver audit row carrying the terminal AuditStatus whenever a notification transitions to a terminal state (Delivered, Parked, Discarded). - Dispatcher: after the B2 Attempted row, a Delivered or Parked row is emitted when the post-outcome status is terminal. Discarded is never produced by the dispatcher — only by the manual discard path. - Missing-adapter park: now emits both Attempted and terminal Parked, both carrying the same explanatory error. - Manual discard (DiscardAsync): after the row update, emits a terminal Discarded NotifyDeliver row with no error message (operator-driven cancellation, not a delivery error). - MapNotificationStatusToAuditStatus + IsTerminal helpers added; terminal emission shares BuildNotifyDeliverEvent with the B2 Attempted path so the two rows carry identical correlation/provenance fields. Audit failure NEVER aborts the user-facing action: every emission is wrapped in try/catch (defensive — the CentralAuditWriter itself swallows).
This commit is contained in:
@@ -272,15 +272,17 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// M4 Bundle B2: a single
|
||||
/// M4 Bundle B2 + B3: a single
|
||||
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||
/// row is emitted with <see cref="AuditStatus.Attempted"/> per attempt
|
||||
/// (success, transient, permanent). The emission is wrapped in a
|
||||
/// try/catch so a thrown audit writer NEVER aborts the user-facing
|
||||
/// dispatch — the <see cref="CentralAuditWriter"/> itself swallows
|
||||
/// internal failures, but the dispatcher wraps defensively per
|
||||
/// alog.md §13. The missing-adapter park path also emits an Attempted
|
||||
/// row because it IS an attempt from the dispatcher's point of view.
|
||||
/// (success, transient, permanent); when the post-outcome status is a
|
||||
/// terminal one (Delivered, Parked) a SECOND row is emitted carrying
|
||||
/// that terminal status. Both emissions are wrapped in a try/catch so a
|
||||
/// thrown audit writer NEVER aborts the user-facing dispatch — the
|
||||
/// <see cref="CentralAuditWriter"/> itself swallows internal failures,
|
||||
/// but the dispatcher wraps defensively per alog.md §13. The
|
||||
/// missing-adapter park path also emits both rows because it IS an
|
||||
/// attempt that resolved to a park from the dispatcher's point of view.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Attempt duration is measured around the adapter call and recorded on
|
||||
@@ -299,8 +301,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
if (!adapters.TryGetValue(notification.Type, out var adapter))
|
||||
{
|
||||
// Missing-adapter park: from the dispatcher's perspective this is an
|
||||
// attempt that resolved to a park, so we emit the Attempted row
|
||||
// alongside the row update.
|
||||
// attempt that resolved to a terminal park. Emit Attempted then the
|
||||
// terminal Parked row, both carrying the same explanatory error.
|
||||
var missingAdapterError = $"no delivery adapter for type {notification.Type}";
|
||||
notification.Status = NotificationStatus.Parked;
|
||||
notification.LastError = missingAdapterError;
|
||||
@@ -311,6 +313,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
now,
|
||||
durationMs: 0,
|
||||
errorMessage: missingAdapterError);
|
||||
EmitTerminalAudit(notification, now, errorMessage: missingAdapterError);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -364,6 +367,78 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
now,
|
||||
durationMs: durationMs,
|
||||
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
|
||||
|
||||
// If the post-outcome status is terminal (Delivered or Parked — the
|
||||
// dispatcher never sets Discarded; that lives on the manual discard
|
||||
// path), emit the terminal NotifyDeliver row (B3). The error message
|
||||
// on a Delivered terminal is null; on Parked it carries the outcome's
|
||||
// reason so downstream consumers can link Attempted+Parked rows.
|
||||
if (IsTerminal(notification.Status))
|
||||
{
|
||||
EmitTerminalAudit(
|
||||
notification,
|
||||
now,
|
||||
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// True for <see cref="NotificationStatus.Delivered"/>,
|
||||
/// <see cref="NotificationStatus.Parked"/>, or
|
||||
/// <see cref="NotificationStatus.Discarded"/> — the three terminal states
|
||||
/// on the central outbox lifecycle. Used by the dispatcher and the manual
|
||||
/// discard handler to decide when to emit the terminal NotifyDeliver row.
|
||||
/// </summary>
|
||||
private static bool IsTerminal(NotificationStatus status)
|
||||
{
|
||||
return status is NotificationStatus.Delivered
|
||||
or NotificationStatus.Parked
|
||||
or NotificationStatus.Discarded;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Emits a single
|
||||
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||
/// audit row carrying the terminal status (Delivered, Parked, or
|
||||
/// Discarded) of <paramref name="notification"/>. Wrapped in try/catch
|
||||
/// for the same defensive reason as <see cref="EmitAttemptAudit"/>.
|
||||
/// </summary>
|
||||
private void EmitTerminalAudit(
|
||||
Notification notification,
|
||||
DateTimeOffset now,
|
||||
string? errorMessage)
|
||||
{
|
||||
try
|
||||
{
|
||||
var terminalStatus = MapNotificationStatusToAuditStatus(notification.Status);
|
||||
var evt = BuildNotifyDeliverEvent(notification, now, terminalStatus, errorMessage);
|
||||
_ = _auditWriter.WriteAsync(evt);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Failed to emit terminal {Status} audit row for notification {NotificationId}.",
|
||||
notification.Status, notification.NotificationId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps the central-outbox <see cref="NotificationStatus"/> terminal
|
||||
/// values onto the corresponding <see cref="AuditStatus"/> values used by
|
||||
/// AuditLog (#23). Non-terminal statuses throw — the caller must gate on
|
||||
/// <see cref="IsTerminal"/>.
|
||||
/// </summary>
|
||||
private static AuditStatus MapNotificationStatusToAuditStatus(NotificationStatus status)
|
||||
{
|
||||
return status switch
|
||||
{
|
||||
NotificationStatus.Delivered => AuditStatus.Delivered,
|
||||
NotificationStatus.Parked => AuditStatus.Parked,
|
||||
NotificationStatus.Discarded => AuditStatus.Discarded,
|
||||
_ => throw new ArgumentOutOfRangeException(
|
||||
nameof(status), status, "non-terminal status has no audit terminal mapping"),
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -680,6 +755,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
notification.Status = NotificationStatus.Discarded;
|
||||
await repository.UpdateAsync(notification);
|
||||
|
||||
// M4 Bundle B3: a manual discard is the OTHER code path that produces
|
||||
// a terminal NotificationStatus transition (alongside the dispatcher).
|
||||
// Emit a Discarded NotifyDeliver row to match the dispatcher's
|
||||
// Delivered/Parked emissions; the row carries no error message because
|
||||
// the discard is an operator-driven cancellation, not a delivery error.
|
||||
EmitTerminalAudit(notification, DateTimeOffset.UtcNow, errorMessage: null);
|
||||
|
||||
return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,283 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Entities.Notifications;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Messages.Notification;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.NotificationOutbox.Delivery;
|
||||
using ScadaLink.NotificationOutbox.Messages;
|
||||
|
||||
namespace ScadaLink.NotificationOutbox.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// M4 Bundle B (B3) — verifies the <see cref="NotificationOutboxActor"/>
|
||||
/// emits a second
|
||||
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||
/// audit row carrying the terminal status (Delivered, Parked, Discarded) on
|
||||
/// every terminal-state transition. The B2 Attempted row is still emitted
|
||||
/// alongside the terminal one — these tests assert ONLY the terminal row
|
||||
/// presence and status.
|
||||
/// </summary>
|
||||
public class NotificationOutboxActorTerminalEmissionTests : TestKit
|
||||
{
|
||||
private readonly INotificationOutboxRepository _outboxRepository =
|
||||
Substitute.For<INotificationOutboxRepository>();
|
||||
|
||||
private readonly INotificationRepository _notificationRepository =
|
||||
Substitute.For<INotificationRepository>();
|
||||
|
||||
private readonly RecordingCentralAuditWriter _auditWriter = new();
|
||||
|
||||
private sealed class RecordingCentralAuditWriter : ICentralAuditWriter
|
||||
{
|
||||
public List<AuditEvent> Events { get; } = new();
|
||||
public Func<AuditEvent, Task>? OnWrite { get; set; }
|
||||
|
||||
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
lock (Events)
|
||||
{
|
||||
Events.Add(evt);
|
||||
}
|
||||
|
||||
return OnWrite?.Invoke(evt) ?? Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private IServiceProvider BuildServiceProvider(IEnumerable<INotificationDeliveryAdapter> adapters)
|
||||
{
|
||||
var services = new ServiceCollection();
|
||||
services.AddScoped(_ => _outboxRepository);
|
||||
services.AddScoped(_ => _notificationRepository);
|
||||
foreach (var adapter in adapters)
|
||||
{
|
||||
services.AddScoped<INotificationDeliveryAdapter>(_ => adapter);
|
||||
}
|
||||
|
||||
return services.BuildServiceProvider();
|
||||
}
|
||||
|
||||
private sealed class StubAdapter : INotificationDeliveryAdapter
|
||||
{
|
||||
private readonly Func<DeliveryOutcome> _outcome;
|
||||
|
||||
public StubAdapter(Func<DeliveryOutcome> outcome) { _outcome = outcome; }
|
||||
|
||||
public NotificationType Type => NotificationType.Email;
|
||||
|
||||
public Task<DeliveryOutcome> DeliverAsync(
|
||||
Notification notification, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult(_outcome());
|
||||
}
|
||||
|
||||
private IActorRef CreateActor(IEnumerable<INotificationDeliveryAdapter> adapters)
|
||||
{
|
||||
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
||||
BuildServiceProvider(adapters),
|
||||
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
|
||||
(ICentralAuditWriter)_auditWriter,
|
||||
NullLogger<NotificationOutboxActor>.Instance)));
|
||||
}
|
||||
|
||||
private static Notification MakeNotification(
|
||||
NotificationStatus status = NotificationStatus.Pending,
|
||||
int retryCount = 0,
|
||||
Guid? notificationId = null)
|
||||
{
|
||||
return new Notification(
|
||||
(notificationId ?? Guid.NewGuid()).ToString("D"),
|
||||
NotificationType.Email,
|
||||
"ops-team",
|
||||
"Tank overflow",
|
||||
"Tank 3 level critical",
|
||||
"site-1")
|
||||
{
|
||||
Status = status,
|
||||
RetryCount = retryCount,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
};
|
||||
}
|
||||
|
||||
private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
|
||||
{
|
||||
var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
|
||||
{
|
||||
MaxRetries = maxRetries,
|
||||
RetryDelay = retryDelay,
|
||||
};
|
||||
_notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { config });
|
||||
}
|
||||
|
||||
private List<AuditEvent> EventsByStatus(AuditStatus status)
|
||||
{
|
||||
lock (_auditWriter.Events)
|
||||
{
|
||||
return _auditWriter.Events.Where(e => e.Status == status).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Terminal_Delivered_EmitsEvent_StatusDelivered()
|
||||
{
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification();
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
||||
var actor = CreateActor([adapter]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var delivered = EventsByStatus(AuditStatus.Delivered);
|
||||
Assert.Single(delivered);
|
||||
var evt = delivered[0];
|
||||
Assert.Equal(AuditChannel.Notification, evt.Channel);
|
||||
Assert.Equal(AuditKind.NotifyDeliver, evt.Kind);
|
||||
Assert.Equal("ops-team", evt.Target);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Terminal_Parked_OnPermanentFailure_EmitsEvent_StatusParked()
|
||||
{
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification();
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
|
||||
var actor = CreateActor([adapter]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var parked = EventsByStatus(AuditStatus.Parked);
|
||||
Assert.Single(parked);
|
||||
Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind);
|
||||
Assert.Equal("invalid recipient address", parked[0].ErrorMessage);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Terminal_Parked_OnTransientReachingMaxRetries_EmitsEvent_StatusParked()
|
||||
{
|
||||
SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1));
|
||||
// RetryCount starts at max-1; the failed attempt increments it to max
|
||||
// which triggers the Parked terminal transition.
|
||||
var notification = MakeNotification(retryCount: 2);
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
||||
var actor = CreateActor([adapter]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var parked = EventsByStatus(AuditStatus.Parked);
|
||||
Assert.Single(parked);
|
||||
Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Terminal_Parked_OnMissingAdapter_EmitsEvent_StatusParked()
|
||||
{
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification();
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
// No adapters registered: the missing-adapter park path runs.
|
||||
var actor = CreateActor([]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var parked = EventsByStatus(AuditStatus.Parked);
|
||||
Assert.Single(parked);
|
||||
Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind);
|
||||
Assert.Contains("no delivery adapter", parked[0].ErrorMessage!);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Transient_BelowMaxRetries_DoesNotEmitTerminalRow()
|
||||
{
|
||||
// A transient failure that does not reach max-retries leaves the row
|
||||
// in Retrying — non-terminal, so no terminal audit row should be
|
||||
// emitted (only the Attempted row from B2).
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification(retryCount: 0);
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
||||
var actor = CreateActor([adapter]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
// Wait for the Attempted row to land so we know dispatch has run.
|
||||
AwaitAssert(() => Assert.Single(EventsByStatus(AuditStatus.Attempted)));
|
||||
|
||||
// No terminal rows of any kind.
|
||||
Assert.Empty(EventsByStatus(AuditStatus.Delivered));
|
||||
Assert.Empty(EventsByStatus(AuditStatus.Parked));
|
||||
Assert.Empty(EventsByStatus(AuditStatus.Discarded));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Terminal_Discarded_OnManualDiscard_EmitsEvent_StatusDiscarded()
|
||||
{
|
||||
// Wire the actor with a parked row that GetByIdAsync returns; the
|
||||
// discard handler must emit a terminal Discarded audit row.
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification(status: NotificationStatus.Parked);
|
||||
_outboxRepository.GetByIdAsync(notification.NotificationId, Arg.Any<CancellationToken>())
|
||||
.Returns(notification);
|
||||
var actor = CreateActor([]);
|
||||
|
||||
actor.Tell(new DiscardNotificationRequest(
|
||||
CorrelationId: "test-corr", NotificationId: notification.NotificationId));
|
||||
|
||||
// First wait for the discard handler to reply (handshake), then assert
|
||||
// the audit row landed.
|
||||
ExpectMsg<DiscardNotificationResponse>(r => r.Success);
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var discarded = EventsByStatus(AuditStatus.Discarded);
|
||||
Assert.Single(discarded);
|
||||
Assert.Equal(AuditKind.NotifyDeliver, discarded[0].Kind);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AuditWriter_Throws_TerminalUpdate_StillSucceeds()
|
||||
{
|
||||
// Audit failure NEVER aborts the user-facing action: the dispatcher
|
||||
// must still persist the Delivered status via UpdateAsync.
|
||||
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||
var notification = MakeNotification();
|
||||
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new[] { notification });
|
||||
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
||||
_auditWriter.OnWrite = _ => throw new InvalidOperationException("audit dead");
|
||||
var actor = CreateActor([adapter]);
|
||||
|
||||
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
_outboxRepository.Received(1).UpdateAsync(
|
||||
Arg.Is<Notification>(n => n.Status == NotificationStatus.Delivered),
|
||||
Arg.Any<CancellationToken>());
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user