using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Types.Enums; using ScadaLink.NotificationOutbox.Delivery; using ScadaLink.NotificationOutbox.Messages; using ScadaLink.NotificationOutbox.Tests.TestSupport; namespace ScadaLink.NotificationOutbox.Tests; /// /// Task 14: Tests for the dispatcher loop — the /// periodic sweep that claims due notifications via /// , delivers each through the /// matching , and applies the resulting status /// transition with . /// public class NotificationOutboxActorDispatchTests : TestKit { private readonly INotificationOutboxRepository _outboxRepository = Substitute.For(); private readonly INotificationRepository _notificationRepository = Substitute.For(); private IServiceProvider BuildServiceProvider( IEnumerable adapters) { var services = new ServiceCollection(); services.AddScoped(_ => _outboxRepository); services.AddScoped(_ => _notificationRepository); // The actor resolves the channel adapters from its per-sweep DI scope; register // each stub adapter under the INotificationDeliveryAdapter service. foreach (var adapter in adapters) { services.AddScoped(_ => adapter); } return services.BuildServiceProvider(); } /// /// Stub adapter whose returns a configurable outcome and /// optionally blocks for a delay — used to exercise the overlapping-tick guard. /// private sealed class StubAdapter : INotificationDeliveryAdapter { private readonly Func _outcome; private readonly TimeSpan _delay; public StubAdapter(Func outcome, TimeSpan? delay = null) { _outcome = outcome; _delay = delay ?? TimeSpan.Zero; } public int CallCount; public NotificationType Type => NotificationType.Email; public async Task DeliverAsync( Notification notification, CancellationToken cancellationToken = default) { Interlocked.Increment(ref CallCount); if (_delay > TimeSpan.Zero) { await Task.Delay(_delay, cancellationToken); } return _outcome(); } } private IActorRef CreateActor( IEnumerable adapters, NotificationOutboxOptions? options = null) { return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( BuildServiceProvider(adapters), options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, new NoOpCentralAuditWriter(), NullLogger.Instance))); } private static Notification MakeNotification( NotificationType type = NotificationType.Email, int retryCount = 0) { return new Notification( Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1") { RetryCount = retryCount, CreatedAt = DateTimeOffset.UtcNow, }; } private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay) { var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") { MaxRetries = maxRetries, RetryDelay = retryDelay, }; _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any()) .Returns(new[] { config }); } [Fact] public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any()); Assert.Equal(1, adapter.CallCount); }); } [Fact] public void Success_MarksNotificationDelivered_WithResolvedTargets() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Delivered && n.DeliveredAt != null && n.LastAttemptAt != null && n.ResolvedTargets == "ops@example.com" && n.LastError == null), Arg.Any()); }); } [Fact] public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3)); var notification = MakeNotification(retryCount: 1); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Retrying && n.RetryCount == 2 && n.NextAttemptAt != null && n.LastError == "smtp timeout" && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void TransientFailure_ReachingMaxRetries_MarksParked() { SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1)); // RetryCount starts at max-1; the failed attempt increments it to max. var notification = MakeNotification(retryCount: 2); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.RetryCount == 3), Arg.Any()); }); } [Fact] public void PermanentFailure_MarksParked_WithLastError() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.LastError == "invalid recipient address" && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void NoAdapterForType_MarksParked_WithExplanatoryError() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); // No adapters registered: none resolves for the notification's type. var actor = CreateActor([]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.LastError != null && n.LastError.Contains("no delivery adapter") && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void FaultedDispatchPass_ClearsInFlightGuard_SoNextTickStillRuns() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); // GetDueAsync throws on every call: the dispatch pass's task could fault if the // failure were not handled, which would leave _dispatching stuck true forever. _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns>(_ => throw new InvalidOperationException("db down")); var actor = CreateActor([]); // First tick: the pass faults internally but must still clear the in-flight guard. actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => _outboxRepository.Received(1).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any())); // Second tick after the first completes: if the guard had wedged, this would be // dropped and GetDueAsync would still show only one call. actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => _outboxRepository.Received(2).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any())); } [Fact] public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); // Slow adapter keeps the first sweep in flight while the second tick arrives. var adapter = new StubAdapter( () => DeliveryOutcome.Success("ops@example.com"), delay: TimeSpan.FromMilliseconds(800)); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); actor.Tell(InternalMessages.DispatchTick.Instance); // Second tick is dropped by the in-flight guard: only one sweep runs. AwaitAssert( () => _outboxRepository.Received(1).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any()), duration: TimeSpan.FromSeconds(2)); } }