diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 9c0d02d..0681d81 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -270,6 +270,24 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers /// status transition. A missing adapter parks the notification; otherwise the /// drives the transition. The updated row is always persisted. /// + /// + /// + /// M4 Bundle B2: a single + /// / + /// row is emitted with per attempt + /// (success, transient, permanent). The emission is wrapped in a + /// try/catch so a thrown audit writer NEVER aborts the user-facing + /// dispatch — the itself swallows + /// internal failures, but the dispatcher wraps defensively per + /// alog.md §13. The missing-adapter park path also emits an Attempted + /// row because it IS an attempt from the dispatcher's point of view. + /// + /// + /// Attempt duration is measured around the adapter call and recorded on + /// the Attempted row so downstream KPIs can compute per-attempt latency + /// without joining to the row update timestamps. + /// + /// private async Task DeliverOneAsync( Notification notification, DateTimeOffset now, @@ -280,14 +298,28 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers { if (!adapters.TryGetValue(notification.Type, out var adapter)) { + // Missing-adapter park: from the dispatcher's perspective this is an + // attempt that resolved to a park, so we emit the Attempted row + // alongside the row update. + var missingAdapterError = $"no delivery adapter for type {notification.Type}"; notification.Status = NotificationStatus.Parked; - notification.LastError = $"no delivery adapter for type {notification.Type}"; + notification.LastError = missingAdapterError; notification.LastAttemptAt = now; await outboxRepository.UpdateAsync(notification); + EmitAttemptAudit( + notification, + now, + durationMs: 0, + errorMessage: missingAdapterError); return; } + // Measure the attempt duration around the adapter call so the + // Attempted row carries it for KPI use. + var attemptStart = DateTimeOffset.UtcNow; var outcome = await adapter.DeliverAsync(notification); + var durationMs = (int)Math.Min( + int.MaxValue, Math.Max(0, (DateTimeOffset.UtcNow - attemptStart).TotalMilliseconds)); switch (outcome.Result) { @@ -322,6 +354,86 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers } await outboxRepository.UpdateAsync(notification); + + // Emit the per-attempt Attempted row exactly once regardless of the + // outcome (B2). The error message comes from the outcome, not from + // notification.LastError, so a success row is null and a transient + // row carries the SMTP failure reason verbatim. + EmitAttemptAudit( + notification, + now, + durationMs: durationMs, + errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error); + } + + /// + /// Emits a single + /// / + /// audit row with . Wrapped in + /// try/catch so an audit-write failure never propagates back into the + /// dispatcher loop — the already + /// swallows, this is defensive (alog.md §13). + /// + private void EmitAttemptAudit( + Notification notification, + DateTimeOffset now, + int durationMs, + string? errorMessage) + { + try + { + var evt = BuildNotifyDeliverEvent(notification, now, AuditStatus.Attempted, errorMessage) + with { DurationMs = durationMs }; + // Fire-and-forget — we do NOT await: the dispatcher loop must not + // be blocked by audit IO, and the writer swallows its own faults. + // PipeTo is not used because the writer never throws. + _ = _auditWriter.WriteAsync(evt); + } + catch (Exception ex) + { + _logger.LogWarning( + ex, + "Failed to emit Attempted audit row for notification {NotificationId}.", + notification.NotificationId); + } + } + + /// + /// Builds a / + /// row with the per-notification provenance fields (correlation id, list + /// name, source site/instance/script) populated from + /// . + /// parses the notification's id as a Guid; sites generate the id with + /// Guid.NewGuid().ToString("N") so the parse always succeeds, but + /// a non-Guid id is recorded as null rather than crashing the dispatcher. + /// + private static AuditEvent BuildNotifyDeliverEvent( + Notification notification, + DateTimeOffset now, + AuditStatus status, + string? errorMessage) + { + Guid? correlationId = Guid.TryParse(notification.NotificationId, out var parsed) + ? parsed + : null; + + return new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = now.UtcDateTime, + Channel = AuditChannel.Notification, + Kind = AuditKind.NotifyDeliver, + CorrelationId = correlationId, + // Central dispatch — no authenticated actor (the originating + // script's identity is captured on the upstream NotifySend row). + Actor = null, + SourceSiteId = notification.SourceSiteId, + SourceInstanceId = notification.SourceInstanceId, + SourceScript = notification.SourceScript, + Target = notification.ListName, + Status = status, + ErrorMessage = errorMessage, + }; } /// diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorAttemptEmissionTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorAttemptEmissionTests.cs new file mode 100644 index 0000000..728ddc1 --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorAttemptEmissionTests.cs @@ -0,0 +1,252 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// M4 Bundle B (B2) — verifies the +/// dispatcher loop emits exactly ONE +/// / +/// audit row with per attempt regardless of +/// the delivery outcome (success, transient, permanent). Terminal-state +/// emission is covered separately in +/// . +/// +public class NotificationOutboxActorAttemptEmissionTests : TestKit +{ + private readonly INotificationOutboxRepository _outboxRepository = + Substitute.For(); + + private readonly INotificationRepository _notificationRepository = + Substitute.For(); + + private readonly RecordingCentralAuditWriter _auditWriter = new(); + + /// + /// Recording writer so each test can assert on the events captured during + /// one dispatch tick without depending on a concrete implementation. + /// + private sealed class RecordingCentralAuditWriter : ICentralAuditWriter + { + public List Events { get; } = new(); + public Func? OnWrite { get; set; } + + public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + lock (Events) + { + Events.Add(evt); + } + + return OnWrite?.Invoke(evt) ?? Task.CompletedTask; + } + } + + private IServiceProvider BuildServiceProvider(IEnumerable adapters) + { + var services = new ServiceCollection(); + services.AddScoped(_ => _outboxRepository); + services.AddScoped(_ => _notificationRepository); + foreach (var adapter in adapters) + { + services.AddScoped(_ => adapter); + } + + return services.BuildServiceProvider(); + } + + private sealed class StubAdapter : INotificationDeliveryAdapter + { + private readonly Func _outcome; + public int CallCount; + + public StubAdapter(Func outcome) { _outcome = outcome; } + + public NotificationType Type => NotificationType.Email; + + public Task DeliverAsync( + Notification notification, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref CallCount); + return Task.FromResult(_outcome()); + } + } + + private IActorRef CreateActor(IEnumerable adapters) + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(adapters), + new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, + (ICentralAuditWriter)_auditWriter, + NullLogger.Instance))); + } + + private static Notification MakeNotification( + Guid? notificationId = null, + string sourceSite = "site-1", + int retryCount = 0) + { + return new Notification( + (notificationId ?? Guid.NewGuid()).ToString("D"), + NotificationType.Email, + "ops-team", + "Tank overflow", + "Tank 3 level critical", + sourceSite) + { + RetryCount = retryCount, + CreatedAt = DateTimeOffset.UtcNow, + SourceInstanceId = "instance-42", + SourceScript = "AlarmScript", + }; + } + + private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay) + { + var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") + { + MaxRetries = maxRetries, + RetryDelay = retryDelay, + }; + _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any()) + .Returns(new[] { config }); + } + + private List EventsByStatus(AuditStatus status) + { + lock (_auditWriter.Events) + { + return _auditWriter.Events.Where(e => e.Status == status).ToList(); + } + } + + [Fact] + public void Attempt_Success_EmitsOneEvent_KindNotifyDeliver_StatusAttempted() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var id = Guid.NewGuid(); + var notification = MakeNotification(notificationId: id, sourceSite: "site-alpha"); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var attempted = EventsByStatus(AuditStatus.Attempted); + Assert.Single(attempted); + var evt = attempted[0]; + Assert.Equal(AuditChannel.Notification, evt.Channel); + Assert.Equal(AuditKind.NotifyDeliver, evt.Kind); + Assert.Equal(id, evt.CorrelationId); + Assert.Equal("ops-team", evt.Target); + Assert.Equal("site-alpha", evt.SourceSiteId); + Assert.Equal("instance-42", evt.SourceInstanceId); + Assert.Equal("AlarmScript", evt.SourceScript); + // Central dispatch: actor is null (no authenticated end-user). + Assert.Null(evt.Actor); + // Successful attempt: no error message. + Assert.Null(evt.ErrorMessage); + }); + } + + [Fact] + public void Attempt_TransientFailure_EmitsEvent_StatusAttempted_ErrorMessageSet() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(retryCount: 1); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var attempted = EventsByStatus(AuditStatus.Attempted); + Assert.Single(attempted); + Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind); + Assert.Equal("smtp timeout", attempted[0].ErrorMessage); + }); + } + + [Fact] + public void Attempt_PermanentFailure_EmitsEvent_StatusAttempted_ErrorMessageSet() + { + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var attempted = EventsByStatus(AuditStatus.Attempted); + Assert.Single(attempted); + Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind); + Assert.Equal("invalid recipient address", attempted[0].ErrorMessage); + }); + } + + [Fact] + public void AuditWriter_Throws_DeliveryStateUpdate_StillSucceeds() + { + // Audit failure must NEVER abort the user-facing action: the delivery + // outcome must still be persisted via UpdateAsync. + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + _auditWriter.OnWrite = _ => throw new InvalidOperationException("audit dead"); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + // Update of the notification row must still happen. + AwaitAssert(() => + { + _outboxRepository.Received(1).UpdateAsync( + Arg.Is(n => n.Status == NotificationStatus.Delivered), + Arg.Any()); + }); + } + + [Fact] + public void Attempt_RecordsOccurredAtUtc_AsUtc() + { + // The OccurredAtUtc on the emitted event must be UTC (all timestamps + // are UTC throughout the system). + SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); + var notification = MakeNotification(); + _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new[] { notification }); + var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); + var actor = CreateActor([adapter]); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + AwaitAssert(() => + { + var attempted = EventsByStatus(AuditStatus.Attempted); + Assert.Single(attempted); + Assert.Equal(DateTimeKind.Utc, attempted[0].OccurredAtUtc.Kind); + }); + } +}