From 6de377a39eb3a5fe6c3936feb7870fb8c3805582 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 16:12:44 -0400 Subject: [PATCH] feat(notif): emit NotifyDeliver(terminal) on terminal transitions (#23 M4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M4 Bundle B (B3) — NotificationOutboxActor emits a second NotifyDeliver audit row carrying the terminal AuditStatus whenever a notification transitions to a terminal state (Delivered, Parked, Discarded). - Dispatcher: after the B2 Attempted row, a Delivered or Parked row is emitted when the post-outcome status is terminal. Discarded is never produced by the dispatcher — only by the manual discard path. - Missing-adapter park: now emits both Attempted and terminal Parked, both carrying the same explanatory error. - Manual discard (DiscardAsync): after the row update, emits a terminal Discarded NotifyDeliver row with no error message (operator-driven cancellation, not a delivery error). - MapNotificationStatusToAuditStatus + IsTerminal helpers added; terminal emission shares BuildNotifyDeliverEvent with the B2 Attempted path so the two rows carry identical correlation/provenance fields. Audit failure NEVER aborts the user-facing action: every emission is wrapped in try/catch (defensive — the CentralAuditWriter itself swallows). --- .../NotificationOutboxActor.cs | 100 ++++++- ...icationOutboxActorTerminalEmissionTests.cs | 283 ++++++++++++++++++ 2 files changed, 374 insertions(+), 9 deletions(-) create mode 100644 tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorTerminalEmissionTests.cs diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 0681d81..400f6d2 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -272,15 +272,17 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers /// /// /// - /// M4 Bundle B2: a single + /// M4 Bundle B2 + B3: a single /// / /// row is emitted with per attempt - /// (success, transient, permanent). The emission is wrapped in a - /// try/catch so a thrown audit writer NEVER aborts the user-facing - /// dispatch — the itself swallows - /// internal failures, but the dispatcher wraps defensively per - /// alog.md §13. The missing-adapter park path also emits an Attempted - /// row because it IS an attempt from the dispatcher's point of view. + /// (success, transient, permanent); when the post-outcome status is a + /// terminal one (Delivered, Parked) a SECOND row is emitted carrying + /// that terminal status. Both emissions are wrapped in a try/catch so a + /// thrown audit writer NEVER aborts the user-facing dispatch — the + /// itself swallows internal failures, + /// but the dispatcher wraps defensively per alog.md §13. The + /// missing-adapter park path also emits both rows because it IS an + /// attempt that resolved to a park from the dispatcher's point of view. /// /// /// Attempt duration is measured around the adapter call and recorded on @@ -299,8 +301,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers if (!adapters.TryGetValue(notification.Type, out var adapter)) { // Missing-adapter park: from the dispatcher's perspective this is an - // attempt that resolved to a park, so we emit the Attempted row - // alongside the row update. + // attempt that resolved to a terminal park. Emit Attempted then the + // terminal Parked row, both carrying the same explanatory error. var missingAdapterError = $"no delivery adapter for type {notification.Type}"; notification.Status = NotificationStatus.Parked; notification.LastError = missingAdapterError; @@ -311,6 +313,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers now, durationMs: 0, errorMessage: missingAdapterError); + EmitTerminalAudit(notification, now, errorMessage: missingAdapterError); return; } @@ -364,6 +367,78 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers now, durationMs: durationMs, errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error); + + // If the post-outcome status is terminal (Delivered or Parked — the + // dispatcher never sets Discarded; that lives on the manual discard + // path), emit the terminal NotifyDeliver row (B3). The error message + // on a Delivered terminal is null; on Parked it carries the outcome's + // reason so downstream consumers can link Attempted+Parked rows. + if (IsTerminal(notification.Status)) + { + EmitTerminalAudit( + notification, + now, + errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error); + } + } + + /// + /// True for , + /// , or + /// — the three terminal states + /// on the central outbox lifecycle. Used by the dispatcher and the manual + /// discard handler to decide when to emit the terminal NotifyDeliver row. + /// + private static bool IsTerminal(NotificationStatus status) + { + return status is NotificationStatus.Delivered + or NotificationStatus.Parked + or NotificationStatus.Discarded; + } + + /// + /// Emits a single + /// / + /// audit row carrying the terminal status (Delivered, Parked, or + /// Discarded) of . Wrapped in try/catch + /// for the same defensive reason as . + /// + private void EmitTerminalAudit( + Notification notification, + DateTimeOffset now, + string? errorMessage) + { + try + { + var terminalStatus = MapNotificationStatusToAuditStatus(notification.Status); + var evt = BuildNotifyDeliverEvent(notification, now, terminalStatus, errorMessage); + _ = _auditWriter.WriteAsync(evt); + } + catch (Exception ex) + { + _logger.LogWarning( + ex, + "Failed to emit terminal {Status} audit row for notification {NotificationId}.", + notification.Status, notification.NotificationId); + } + } + + /// + /// Maps the central-outbox terminal + /// values onto the corresponding values used by + /// AuditLog (#23). Non-terminal statuses throw — the caller must gate on + /// . + /// + private static AuditStatus MapNotificationStatusToAuditStatus(NotificationStatus status) + { + return status switch + { + NotificationStatus.Delivered => AuditStatus.Delivered, + NotificationStatus.Parked => AuditStatus.Parked, + NotificationStatus.Discarded => AuditStatus.Discarded, + _ => throw new ArgumentOutOfRangeException( + nameof(status), status, "non-terminal status has no audit terminal mapping"), + }; } /// @@ -680,6 +755,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers notification.Status = NotificationStatus.Discarded; await repository.UpdateAsync(notification); + // M4 Bundle B3: a manual discard is the OTHER code path that produces + // a terminal NotificationStatus transition (alongside the dispatcher). + // Emit a Discarded NotifyDeliver row to match the dispatcher's + // Delivered/Parked emissions; the row carries no error message because + // the discard is an operator-driven cancellation, not a delivery error. + EmitTerminalAudit(notification, DateTimeOffset.UtcNow, errorMessage: null); + return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null); } diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorTerminalEmissionTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorTerminalEmissionTests.cs new file mode 100644 index 0000000..eab39ac --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorTerminalEmissionTests.cs @@ -0,0 +1,283 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// M4 Bundle B (B3) — verifies the +/// emits a second +/// / +/// audit row carrying the terminal status (Delivered, Parked, Discarded) on +/// every terminal-state transition. The B2 Attempted row is still emitted +/// alongside the terminal one — these tests assert ONLY the terminal row +/// presence and status. +/// +public class NotificationOutboxActorTerminalEmissionTests : TestKit +{ + private readonly INotificationOutboxRepository _outboxRepository = + Substitute.For(); + + private readonly INotificationRepository _notificationRepository = + Substitute.For(); + + private readonly RecordingCentralAuditWriter _auditWriter = new(); + + private sealed class RecordingCentralAuditWriter : ICentralAuditWriter + { + public List Events { get; } = new(); + public Func? OnWrite { get; set; } + + public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + lock (Events) + { + Events.Add(evt); + } + + return OnWrite?.Invoke(evt) ?? Task.CompletedTask; + } + } + + private IServiceProvider BuildServiceProvider(IEnumerable adapters) + { + var services = new ServiceCollection(); + services.AddScoped(_ => _outboxRepository); + services.AddScoped(_ => _notificationRepository); + foreach (var adapter in adapters) + { + services.AddScoped(_ => adapter); + } + + return services.BuildServiceProvider(); + } + + private sealed class StubAdapter : INotificationDeliveryAdapter + { + private readonly Func _outcome; + + public StubAdapter(Func outcome) { _outcome = outcome; } + + public NotificationType Type => NotificationType.Email; + + public Task DeliverAsync( + Notification notification, CancellationToken cancellationToken = default) + => Task.FromResult(_outcome()); + } + + private IActorRef CreateActor(IEnumerable adapters) + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(adapters), + new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, + (ICentralAuditWriter)_auditWriter, + NullLogger.Instance))); + } + + private static Notification MakeNotification( + NotificationStatus status = NotificationStatus.Pending, + int retryCount = 0, + Guid? notificationId = null) + { + return new Notification( + (notificationId ?? Guid.NewGuid()).ToString("D"), + NotificationType.Email, + "ops-team", + "Tank overflow", + "Tank 3 level critical", + "site-1") + { + Status = status, + RetryCount = retryCount, + CreatedAt = DateTimeOffset.UtcNow, + }; + } + + private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay) + { + var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") + { + MaxRetries = maxRetries, + RetryDelay = retryDelay, + }; + _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any()) + .Returns(new[] { config }); + } + + private List EventsByStatus(AuditStatus status) + { + lock (_auditWriter.Events) + { + return _auditWriter.Events.Where(e => e.Status == status).ToList(); + } + } + + [Fact] + public void Terminal_Delivered_EmitsEvent_StatusDelivered() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var delivered = EventsByStatus(AuditStatus.Delivered); + Assert.Single(delivered); + var evt = delivered[0]; + Assert.Equal(AuditChannel.Notification, evt.Channel); + Assert.Equal(AuditKind.NotifyDeliver, evt.Kind); + Assert.Equal("ops-team", evt.Target); + }); + } + + [Fact] + public void Terminal_Parked_OnPermanentFailure_EmitsEvent_StatusParked() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var parked = EventsByStatus(AuditStatus.Parked); + Assert.Single(parked); + Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind); + Assert.Equal("invalid recipient address", parked[0].ErrorMessage); + }); + } + + [Fact] + public void Terminal_Parked_OnTransientReachingMaxRetries_EmitsEvent_StatusParked() + { + SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1)); + // RetryCount starts at max-1; the failed attempt increments it to max + // which triggers the Parked terminal transition. + var notification = MakeNotification(retryCount: 2); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var parked = EventsByStatus(AuditStatus.Parked); + Assert.Single(parked); + Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind); + }); + } + + [Fact] + public void Terminal_Parked_OnMissingAdapter_EmitsEvent_StatusParked() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + // No adapters registered: the missing-adapter park path runs. + var actor = CreateActor([]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var parked = EventsByStatus(AuditStatus.Parked); + Assert.Single(parked); + Assert.Equal(AuditKind.NotifyDeliver, parked[0].Kind); + Assert.Contains("no delivery adapter", parked[0].ErrorMessage!); + }); + } + + [Fact] + public void Transient_BelowMaxRetries_DoesNotEmitTerminalRow() + { + // A transient failure that does not reach max-retries leaves the row + // in Retrying — non-terminal, so no terminal audit row should be + // emitted (only the Attempted row from B2). + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(retryCount: 0); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + // Wait for the Attempted row to land so we know dispatch has run. + AwaitAssert(() => Assert.Single(EventsByStatus(AuditStatus.Attempted))); + + // No terminal rows of any kind. + Assert.Empty(EventsByStatus(AuditStatus.Delivered)); + Assert.Empty(EventsByStatus(AuditStatus.Parked)); + Assert.Empty(EventsByStatus(AuditStatus.Discarded)); + } + + [Fact] + public void Terminal_Discarded_OnManualDiscard_EmitsEvent_StatusDiscarded() + { + // Wire the actor with a parked row that GetByIdAsync returns; the + // discard handler must emit a terminal Discarded audit row. + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(status: NotificationStatus.Parked); + _outboxRepository.GetByIdAsync(notification.NotificationId, Arg.Any()) + .Returns(notification); + var actor = CreateActor([]); + + actor.Tell(new DiscardNotificationRequest( + CorrelationId: "test-corr", NotificationId: notification.NotificationId)); + + // First wait for the discard handler to reply (handshake), then assert + // the audit row landed. + ExpectMsg(r => r.Success); + AwaitAssert(() => + { + var discarded = EventsByStatus(AuditStatus.Discarded); + Assert.Single(discarded); + Assert.Equal(AuditKind.NotifyDeliver, discarded[0].Kind); + }); + } + + [Fact] + public void AuditWriter_Throws_TerminalUpdate_StillSucceeds() + { + // Audit failure NEVER aborts the user-facing action: the dispatcher + // must still persist the Delivered status via UpdateAsync. + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + _auditWriter.OnWrite = _ => throw new InvalidOperationException("audit dead"); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => n.Status == NotificationStatus.Delivered), + Arg.Any()); + }); + } +}