using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.NotificationOutbox.Tests;
///
/// M4 Bundle B (B2) — verifies the
/// dispatcher loop emits exactly ONE
/// /
/// audit row with per attempt regardless of
/// the delivery outcome (success, transient, permanent). Terminal-state
/// emission is covered separately in
/// .
///
public class NotificationOutboxActorAttemptEmissionTests : TestKit
{
private readonly INotificationOutboxRepository _outboxRepository =
Substitute.For();
private readonly INotificationRepository _notificationRepository =
Substitute.For();
private readonly RecordingCentralAuditWriter _auditWriter = new();
///
/// Recording writer so each test can assert on the events captured during
/// one dispatch tick without depending on a concrete implementation.
///
private sealed class RecordingCentralAuditWriter : ICentralAuditWriter
{
public List Events { get; } = new();
public Func? OnWrite { get; set; }
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
lock (Events)
{
Events.Add(evt);
}
return OnWrite?.Invoke(evt) ?? Task.CompletedTask;
}
}
private IServiceProvider BuildServiceProvider(IEnumerable adapters)
{
var services = new ServiceCollection();
services.AddScoped(_ => _outboxRepository);
services.AddScoped(_ => _notificationRepository);
foreach (var adapter in adapters)
{
services.AddScoped(_ => adapter);
}
return services.BuildServiceProvider();
}
private sealed class StubAdapter : INotificationDeliveryAdapter
{
private readonly Func _outcome;
public int CallCount;
public StubAdapter(Func outcome) { _outcome = outcome; }
public NotificationType Type => NotificationType.Email;
public Task DeliverAsync(
Notification notification, CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref CallCount);
return Task.FromResult(_outcome());
}
}
private IActorRef CreateActor(IEnumerable adapters)
{
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
BuildServiceProvider(adapters),
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
(ICentralAuditWriter)_auditWriter,
NullLogger.Instance)));
}
private static Notification MakeNotification(
Guid? notificationId = null,
string sourceSite = "site-1",
int retryCount = 0,
Guid? originExecutionId = null,
Guid? originParentExecutionId = null)
{
return new Notification(
(notificationId ?? Guid.NewGuid()).ToString("D"),
NotificationType.Email,
"ops-team",
"Tank overflow",
"Tank 3 level critical",
sourceSite)
{
RetryCount = retryCount,
CreatedAt = DateTimeOffset.UtcNow,
SourceInstanceId = "instance-42",
SourceScript = "AlarmScript",
OriginExecutionId = originExecutionId,
OriginParentExecutionId = originParentExecutionId,
};
}
private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
{
var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
{
MaxRetries = maxRetries,
RetryDelay = retryDelay,
};
_notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any())
.Returns(new[] { config });
}
private List EventsByStatus(AuditStatus status)
{
lock (_auditWriter.Events)
{
return _auditWriter.Events.Where(e => e.Status == status).ToList();
}
}
[Fact]
public void Attempt_Success_EmitsOneEvent_KindNotifyDeliver_StatusAttempted()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var id = Guid.NewGuid();
var notification = MakeNotification(notificationId: id, sourceSite: "site-alpha");
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
var evt = attempted[0];
Assert.Equal(AuditChannel.Notification, evt.Channel);
Assert.Equal(AuditKind.NotifyDeliver, evt.Kind);
Assert.Equal(id, evt.CorrelationId);
Assert.Equal("ops-team", evt.Target);
Assert.Equal("site-alpha", evt.SourceSiteId);
Assert.Equal("instance-42", evt.SourceInstanceId);
Assert.Equal("AlarmScript", evt.SourceScript);
// Central dispatch: Actor is the system identity (no per-call user).
Assert.Equal("system", evt.Actor);
// Successful attempt: no error message.
Assert.Null(evt.ErrorMessage);
});
}
[Fact]
public void Attempt_CarriesOriginExecutionId_AsExecutionId()
{
// Audit Log #23: the Attempted NotifyDeliver row must echo the
// notification's OriginExecutionId so all rows for one run share an id.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var executionId = Guid.NewGuid();
var notification = MakeNotification(originExecutionId: executionId);
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Equal(executionId, attempted[0].ExecutionId);
});
}
[Fact]
public void Attempt_NullOriginExecutionId_HasNullExecutionId()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification(originExecutionId: null);
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Null(attempted[0].ExecutionId);
});
}
[Fact]
public void Attempt_CarriesOriginParentExecutionId_AsParentExecutionId()
{
// Audit Log ParentExecutionId: the Attempted NotifyDeliver row must echo
// the notification's OriginParentExecutionId so the central dispatcher's
// rows carry the routed run's parent id.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var parentExecutionId = Guid.NewGuid();
var notification = MakeNotification(originParentExecutionId: parentExecutionId);
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Equal(parentExecutionId, attempted[0].ParentExecutionId);
});
}
[Fact]
public void Attempt_NullOriginParentExecutionId_HasNullParentExecutionId()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification(originParentExecutionId: null);
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Null(attempted[0].ParentExecutionId);
});
}
[Fact]
public void Attempt_TransientFailure_EmitsEvent_StatusAttempted_ErrorMessageSet()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification(retryCount: 1);
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind);
Assert.Equal("smtp timeout", attempted[0].ErrorMessage);
});
}
[Fact]
public void Attempt_PermanentFailure_EmitsEvent_StatusAttempted_ErrorMessageSet()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Equal(AuditKind.NotifyDeliver, attempted[0].Kind);
Assert.Equal("invalid recipient address", attempted[0].ErrorMessage);
});
}
[Fact]
public void AuditWriter_Throws_DeliveryStateUpdate_StillSucceeds()
{
// Audit failure must NEVER abort the user-facing action: the delivery
// outcome must still be persisted via UpdateAsync.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
_auditWriter.OnWrite = _ => throw new InvalidOperationException("audit dead");
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
// Update of the notification row must still happen.
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is(n => n.Status == NotificationStatus.Delivered),
Arg.Any());
});
}
[Fact]
public void Attempt_RecordsOccurredAtUtc_AsUtc()
{
// The OccurredAtUtc on the emitted event must be UTC (all timestamps
// are UTC throughout the system).
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
var attempted = EventsByStatus(AuditStatus.Attempted);
Assert.Single(attempted);
Assert.Equal(DateTimeKind.Utc, attempted[0].OccurredAtUtc.Kind);
});
}
}