feat(notif): emit NotifyDeliver(Attempted) per dispatcher attempt (#23 M4)
M4 Bundle B (B2) — NotificationOutboxActor's dispatcher loop emits a single
AuditChannel.Notification / AuditKind.NotifyDeliver row with AuditStatus.Attempted
for every delivery attempt (success, transient failure, permanent failure,
and the missing-adapter park).
- BuildNotifyDeliverEvent helper populates correlation id (parsed from the
string NotificationId — sites generate Guid.NewGuid().ToString("N"),
non-Guid ids fall through as null), list-name target, source site/instance/script
provenance, and Actor=null (central dispatch has no authenticated end-user).
- Attempt duration is measured around the adapter call and recorded as
DurationMs so KPIs can compute per-attempt latency.
- Emission is fire-and-forget (the writer swallows internally) and wrapped
in try/catch — audit failure NEVER aborts the user-facing dispatch.
Terminal-state emission lands separately in B3.
This commit is contained in:
@@ -270,6 +270,24 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
/// status transition. A missing adapter parks the notification; otherwise the
|
/// status transition. A missing adapter parks the notification; otherwise the
|
||||||
/// <see cref="DeliveryOutcome"/> drives the transition. The updated row is always persisted.
|
/// <see cref="DeliveryOutcome"/> drives the transition. The updated row is always persisted.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// M4 Bundle B2: a single
|
||||||
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||||
|
/// row is emitted with <see cref="AuditStatus.Attempted"/> per attempt
|
||||||
|
/// (success, transient, permanent). The emission is wrapped in a
|
||||||
|
/// try/catch so a thrown audit writer NEVER aborts the user-facing
|
||||||
|
/// dispatch — the <see cref="CentralAuditWriter"/> itself swallows
|
||||||
|
/// internal failures, but the dispatcher wraps defensively per
|
||||||
|
/// alog.md §13. The missing-adapter park path also emits an Attempted
|
||||||
|
/// row because it IS an attempt from the dispatcher's point of view.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Attempt duration is measured around the adapter call and recorded on
|
||||||
|
/// the Attempted row so downstream KPIs can compute per-attempt latency
|
||||||
|
/// without joining to the row update timestamps.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
private async Task DeliverOneAsync(
|
private async Task DeliverOneAsync(
|
||||||
Notification notification,
|
Notification notification,
|
||||||
DateTimeOffset now,
|
DateTimeOffset now,
|
||||||
@@ -280,14 +298,28 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
{
|
{
|
||||||
if (!adapters.TryGetValue(notification.Type, out var adapter))
|
if (!adapters.TryGetValue(notification.Type, out var adapter))
|
||||||
{
|
{
|
||||||
|
// Missing-adapter park: from the dispatcher's perspective this is an
|
||||||
|
// attempt that resolved to a park, so we emit the Attempted row
|
||||||
|
// alongside the row update.
|
||||||
|
var missingAdapterError = $"no delivery adapter for type {notification.Type}";
|
||||||
notification.Status = NotificationStatus.Parked;
|
notification.Status = NotificationStatus.Parked;
|
||||||
notification.LastError = $"no delivery adapter for type {notification.Type}";
|
notification.LastError = missingAdapterError;
|
||||||
notification.LastAttemptAt = now;
|
notification.LastAttemptAt = now;
|
||||||
await outboxRepository.UpdateAsync(notification);
|
await outboxRepository.UpdateAsync(notification);
|
||||||
|
EmitAttemptAudit(
|
||||||
|
notification,
|
||||||
|
now,
|
||||||
|
durationMs: 0,
|
||||||
|
errorMessage: missingAdapterError);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Measure the attempt duration around the adapter call so the
|
||||||
|
// Attempted row carries it for KPI use.
|
||||||
|
var attemptStart = DateTimeOffset.UtcNow;
|
||||||
var outcome = await adapter.DeliverAsync(notification);
|
var outcome = await adapter.DeliverAsync(notification);
|
||||||
|
var durationMs = (int)Math.Min(
|
||||||
|
int.MaxValue, Math.Max(0, (DateTimeOffset.UtcNow - attemptStart).TotalMilliseconds));
|
||||||
|
|
||||||
switch (outcome.Result)
|
switch (outcome.Result)
|
||||||
{
|
{
|
||||||
@@ -322,6 +354,86 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
}
|
}
|
||||||
|
|
||||||
await outboxRepository.UpdateAsync(notification);
|
await outboxRepository.UpdateAsync(notification);
|
||||||
|
|
||||||
|
// Emit the per-attempt Attempted row exactly once regardless of the
|
||||||
|
// outcome (B2). The error message comes from the outcome, not from
|
||||||
|
// notification.LastError, so a success row is null and a transient
|
||||||
|
// row carries the SMTP failure reason verbatim.
|
||||||
|
EmitAttemptAudit(
|
||||||
|
notification,
|
||||||
|
now,
|
||||||
|
durationMs: durationMs,
|
||||||
|
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Emits a single
|
||||||
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||||
|
/// audit row with <see cref="AuditStatus.Attempted"/>. Wrapped in
|
||||||
|
/// try/catch so an audit-write failure never propagates back into the
|
||||||
|
/// dispatcher loop — the <see cref="CentralAuditWriter"/> already
|
||||||
|
/// swallows, this is defensive (alog.md §13).
|
||||||
|
/// </summary>
|
||||||
|
private void EmitAttemptAudit(
|
||||||
|
Notification notification,
|
||||||
|
DateTimeOffset now,
|
||||||
|
int durationMs,
|
||||||
|
string? errorMessage)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var evt = BuildNotifyDeliverEvent(notification, now, AuditStatus.Attempted, errorMessage)
|
||||||
|
with { DurationMs = durationMs };
|
||||||
|
// Fire-and-forget — we do NOT await: the dispatcher loop must not
|
||||||
|
// be blocked by audit IO, and the writer swallows its own faults.
|
||||||
|
// PipeTo is not used because the writer never throws.
|
||||||
|
_ = _auditWriter.WriteAsync(evt);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
ex,
|
||||||
|
"Failed to emit Attempted audit row for notification {NotificationId}.",
|
||||||
|
notification.NotificationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds a <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||||
|
/// row with the per-notification provenance fields (correlation id, list
|
||||||
|
/// name, source site/instance/script) populated from
|
||||||
|
/// <paramref name="notification"/>. <see cref="AuditEvent.CorrelationId"/>
|
||||||
|
/// parses the notification's id as a Guid; sites generate the id with
|
||||||
|
/// <c>Guid.NewGuid().ToString("N")</c> so the parse always succeeds, but
|
||||||
|
/// a non-Guid id is recorded as null rather than crashing the dispatcher.
|
||||||
|
/// </summary>
|
||||||
|
private static AuditEvent BuildNotifyDeliverEvent(
|
||||||
|
Notification notification,
|
||||||
|
DateTimeOffset now,
|
||||||
|
AuditStatus status,
|
||||||
|
string? errorMessage)
|
||||||
|
{
|
||||||
|
Guid? correlationId = Guid.TryParse(notification.NotificationId, out var parsed)
|
||||||
|
? parsed
|
||||||
|
: null;
|
||||||
|
|
||||||
|
return new AuditEvent
|
||||||
|
{
|
||||||
|
EventId = Guid.NewGuid(),
|
||||||
|
OccurredAtUtc = now.UtcDateTime,
|
||||||
|
Channel = AuditChannel.Notification,
|
||||||
|
Kind = AuditKind.NotifyDeliver,
|
||||||
|
CorrelationId = correlationId,
|
||||||
|
// Central dispatch — no authenticated actor (the originating
|
||||||
|
// script's identity is captured on the upstream NotifySend row).
|
||||||
|
Actor = null,
|
||||||
|
SourceSiteId = notification.SourceSiteId,
|
||||||
|
SourceInstanceId = notification.SourceInstanceId,
|
||||||
|
SourceScript = notification.SourceScript,
|
||||||
|
Target = notification.ListName,
|
||||||
|
Status = status,
|
||||||
|
ErrorMessage = errorMessage,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -0,0 +1,252 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Entities.Notifications;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Interfaces.Services;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.NotificationOutbox.Delivery;
|
||||||
|
using ScadaLink.NotificationOutbox.Messages;
|
||||||
|
|
||||||
|
namespace ScadaLink.NotificationOutbox.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// M4 Bundle B (B2) — verifies the <see cref="NotificationOutboxActor"/>
|
||||||
|
/// dispatcher loop emits exactly ONE
|
||||||
|
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
|
||||||
|
/// audit row with <see cref="AuditStatus.Attempted"/> per attempt regardless of
|
||||||
|
/// the delivery outcome (success, transient, permanent). Terminal-state
|
||||||
|
/// emission is covered separately in
|
||||||
|
/// <see cref="NotificationOutboxActorTerminalEmissionTests"/>.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationOutboxActorAttemptEmissionTests : TestKit
|
||||||
|
{
|
||||||
|
private readonly INotificationOutboxRepository _outboxRepository =
|
||||||
|
Substitute.For<INotificationOutboxRepository>();
|
||||||
|
|
||||||
|
private readonly INotificationRepository _notificationRepository =
|
||||||
|
Substitute.For<INotificationRepository>();
|
||||||
|
|
||||||
|
private readonly RecordingCentralAuditWriter _auditWriter = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Recording writer so each test can assert on the events captured during
|
||||||
|
/// one dispatch tick without depending on a concrete implementation.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class RecordingCentralAuditWriter : ICentralAuditWriter
|
||||||
|
{
|
||||||
|
public List<AuditEvent> Events { get; } = new();
|
||||||
|
public Func<AuditEvent, Task>? OnWrite { get; set; }
|
||||||
|
|
||||||
|
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
lock (Events)
|
||||||
|
{
|
||||||
|
Events.Add(evt);
|
||||||
|
}
|
||||||
|
|
||||||
|
return OnWrite?.Invoke(evt) ?? Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private IServiceProvider BuildServiceProvider(IEnumerable<INotificationDeliveryAdapter> adapters)
|
||||||
|
{
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddScoped(_ => _outboxRepository);
|
||||||
|
services.AddScoped(_ => _notificationRepository);
|
||||||
|
foreach (var adapter in adapters)
|
||||||
|
{
|
||||||
|
services.AddScoped<INotificationDeliveryAdapter>(_ => adapter);
|
||||||
|
}
|
||||||
|
|
||||||
|
return services.BuildServiceProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class StubAdapter : INotificationDeliveryAdapter
|
||||||
|
{
|
||||||
|
private readonly Func<DeliveryOutcome> _outcome;
|
||||||
|
public int CallCount;
|
||||||
|
|
||||||
|
public StubAdapter(Func<DeliveryOutcome> outcome) { _outcome = outcome; }
|
||||||
|
|
||||||
|
public NotificationType Type => NotificationType.Email;
|
||||||
|
|
||||||
|
public Task<DeliveryOutcome> DeliverAsync(
|
||||||
|
Notification notification, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref CallCount);
|
||||||
|
return Task.FromResult(_outcome());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private IActorRef CreateActor(IEnumerable<INotificationDeliveryAdapter> adapters)
|
||||||
|
{
|
||||||
|
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
||||||
|
BuildServiceProvider(adapters),
|
||||||
|
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
|
||||||
|
(ICentralAuditWriter)_auditWriter,
|
||||||
|
NullLogger<NotificationOutboxActor>.Instance)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Notification MakeNotification(
|
||||||
|
Guid? notificationId = null,
|
||||||
|
string sourceSite = "site-1",
|
||||||
|
int retryCount = 0)
|
||||||
|
{
|
||||||
|
return new Notification(
|
||||||
|
(notificationId ?? Guid.NewGuid()).ToString("D"),
|
||||||
|
NotificationType.Email,
|
||||||
|
"ops-team",
|
||||||
|
"Tank overflow",
|
||||||
|
"Tank 3 level critical",
|
||||||
|
sourceSite)
|
||||||
|
{
|
||||||
|
RetryCount = retryCount,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
SourceInstanceId = "instance-42",
|
||||||
|
SourceScript = "AlarmScript",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
|
||||||
|
{
|
||||||
|
var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
|
||||||
|
{
|
||||||
|
MaxRetries = maxRetries,
|
||||||
|
RetryDelay = retryDelay,
|
||||||
|
};
|
||||||
|
_notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { config });
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<AuditEvent> EventsByStatus(AuditStatus status)
|
||||||
|
{
|
||||||
|
lock (_auditWriter.Events)
|
||||||
|
{
|
||||||
|
return _auditWriter.Events.Where(e => e.Status == status).ToList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Attempt_Success_EmitsOneEvent_KindNotifyDeliver_StatusAttempted()
|
||||||
|
{
|
||||||
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||||
|
var id = Guid.NewGuid();
|
||||||
|
var notification = MakeNotification(notificationId: id, sourceSite: "site-alpha");
|
||||||
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { notification });
|
||||||
|
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
||||||
|
var actor = CreateActor([adapter]);
|
||||||
|
|
||||||
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var attempted = EventsByStatus(AuditStatus.Attempted);
|
||||||
|
Assert.Single(attempted);
|
||||||
|
var evt = attempted[0];
|
||||||
|
Assert.Equal(AuditChannel.Notification, evt.Channel);
|
||||||
|
Assert.Equal(AuditKind.NotifyDeliver, evt.Kind);
|
||||||
|
Assert.Equal(id, evt.CorrelationId);
|
||||||
|
Assert.Equal("ops-team", evt.Target);
|
||||||
|
Assert.Equal("site-alpha", evt.SourceSiteId);
|
||||||
|
Assert.Equal("instance-42", evt.SourceInstanceId);
|
||||||
|
Assert.Equal("AlarmScript", evt.SourceScript);
|
||||||
|
// Central dispatch: actor is null (no authenticated end-user).
|
||||||
|
Assert.Null(evt.Actor);
|
||||||
|
// Successful attempt: no error message.
|
||||||
|
Assert.Null(evt.ErrorMessage);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Attempt_TransientFailure_EmitsEvent_StatusAttempted_ErrorMessageSet()
|
||||||
|
{
|
||||||
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||||
|
var notification = MakeNotification(retryCount: 1);
|
||||||
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { notification });
|
||||||
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
||||||
|
var actor = CreateActor([adapter]);
|
||||||
|
|
||||||
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var attempted = EventsByStatus(AuditStatus.Attempted);
|
||||||
|
Assert.Single(attempted);
|
||||||
|
Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind);
|
||||||
|
Assert.Equal("smtp timeout", attempted[0].ErrorMessage);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Attempt_PermanentFailure_EmitsEvent_StatusAttempted_ErrorMessageSet()
|
||||||
|
{
|
||||||
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||||
|
var notification = MakeNotification();
|
||||||
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { notification });
|
||||||
|
var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
|
||||||
|
var actor = CreateActor([adapter]);
|
||||||
|
|
||||||
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var attempted = EventsByStatus(AuditStatus.Attempted);
|
||||||
|
Assert.Single(attempted);
|
||||||
|
Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind);
|
||||||
|
Assert.Equal("invalid recipient address", attempted[0].ErrorMessage);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AuditWriter_Throws_DeliveryStateUpdate_StillSucceeds()
|
||||||
|
{
|
||||||
|
// Audit failure must NEVER abort the user-facing action: the delivery
|
||||||
|
// outcome must still be persisted via UpdateAsync.
|
||||||
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||||
|
var notification = MakeNotification();
|
||||||
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { notification });
|
||||||
|
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
||||||
|
_auditWriter.OnWrite = _ => throw new InvalidOperationException("audit dead");
|
||||||
|
var actor = CreateActor([adapter]);
|
||||||
|
|
||||||
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||||
|
|
||||||
|
// Update of the notification row must still happen.
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
_outboxRepository.Received(1).UpdateAsync(
|
||||||
|
Arg.Is<Notification>(n => n.Status == NotificationStatus.Delivered),
|
||||||
|
Arg.Any<CancellationToken>());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Attempt_RecordsOccurredAtUtc_AsUtc()
|
||||||
|
{
|
||||||
|
// The OccurredAtUtc on the emitted event must be UTC (all timestamps
|
||||||
|
// are UTC throughout the system).
|
||||||
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
||||||
|
var notification = MakeNotification();
|
||||||
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[] { notification });
|
||||||
|
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
||||||
|
var actor = CreateActor([adapter]);
|
||||||
|
|
||||||
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var attempted = EventsByStatus(AuditStatus.Attempted);
|
||||||
|
Assert.Single(attempted);
|
||||||
|
Assert.Equal(DateTimeKind.Utc, attempted[0].OccurredAtUtc.Kind);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user