using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Notifications; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Delivery; using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Messages; using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests.TestSupport; namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests; /// /// Task 14: Tests for the dispatcher loop — the /// periodic sweep that claims due notifications via /// , delivers each through the /// matching , and applies the resulting status /// transition with . /// public class NotificationOutboxActorDispatchTests : TestKit { private readonly INotificationOutboxRepository _outboxRepository = Substitute.For(); private readonly INotificationRepository _notificationRepository = Substitute.For(); private IServiceProvider BuildServiceProvider( IEnumerable adapters) { var services = new ServiceCollection(); services.AddScoped(_ => _outboxRepository); services.AddScoped(_ => _notificationRepository); // The actor resolves the channel adapters from its per-sweep DI scope; register // each stub adapter under the INotificationDeliveryAdapter service. foreach (var adapter in adapters) { services.AddScoped(_ => adapter); } return services.BuildServiceProvider(); } /// /// Stub adapter whose returns a configurable outcome and /// optionally blocks for a delay — used to exercise the overlapping-tick guard. /// private sealed class StubAdapter : INotificationDeliveryAdapter { private readonly Func _outcome; private readonly TimeSpan _delay; public StubAdapter(Func outcome, TimeSpan? delay = null) { _outcome = outcome; _delay = delay ?? TimeSpan.Zero; } public int CallCount; public NotificationType Type => NotificationType.Email; public async Task DeliverAsync( Notification notification, CancellationToken cancellationToken = default) { Interlocked.Increment(ref CallCount); if (_delay > TimeSpan.Zero) { await Task.Delay(_delay, cancellationToken); } return _outcome(); } } private IActorRef CreateActor( IEnumerable adapters, NotificationOutboxOptions? options = null) { return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( BuildServiceProvider(adapters), options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, new NoOpCentralAuditWriter(), NullLogger.Instance))); } private static Notification MakeNotification( NotificationType type = NotificationType.Email, int retryCount = 0) { return new Notification( Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1") { RetryCount = retryCount, CreatedAt = DateTimeOffset.UtcNow, }; } private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay) { var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") { MaxRetries = maxRetries, RetryDelay = retryDelay, }; _notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any()) .Returns(new[] { config }); } /// /// Drives a sweep to completion by re-telling /// on every poll iteration until holds. This is the durable /// barrier for chained-sweep tests: the in-flight guard (_dispatching) is only /// lowered when a sweep's round-trips back /// to the actor — AFTER the side-effect the assertion observes — so a single one-shot tick /// can race the still-raised guard and be silently dropped (the dispatcher's intended /// overlap protection), permanently stalling the count. A dropped tick is never retried, so /// merely widening the wait cannot recover it. Re-telling each poll is idempotent (a tick /// landing while the guard is up is harmlessly dropped) and the assertion's exact-count /// strength is preserved unchanged. /// private void DrivePollingTick(IActorRef actor, Action assertion) { AwaitAssert( () => { actor.Tell(InternalMessages.DispatchTick.Instance); assertion(); }, duration: TimeSpan.FromSeconds(10)); } [Fact] public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any()); Assert.Equal(1, adapter.CallCount); }); } [Fact] public void Success_MarksNotificationDelivered_WithResolvedTargets() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Delivered && n.DeliveredAt != null && n.LastAttemptAt != null && n.ResolvedTargets == "ops@example.com" && n.LastError == null), Arg.Any()); }); } [Fact] public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3)); var notification = MakeNotification(retryCount: 1); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Retrying && n.RetryCount == 2 && n.NextAttemptAt != null && n.LastError == "smtp timeout" && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void TransientFailure_ReachingMaxRetries_MarksParked() { SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1)); // RetryCount starts at max-1; the failed attempt increments it to max. var notification = MakeNotification(retryCount: 2); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.RetryCount == 3), Arg.Any()); }); } [Fact] public void PermanentFailure_MarksParked_WithLastError() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.LastError == "invalid recipient address" && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void NoAdapterForType_MarksParked_WithExplanatoryError() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); // No adapters registered: none resolves for the notification's type. var actor = CreateActor([]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Parked && n.LastError != null && n.LastError.Contains("no delivery adapter") && n.LastAttemptAt != null), Arg.Any()); }); } [Fact] public void FaultedDispatchPass_ClearsInFlightGuard_SoNextTickStillRuns() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); // GetDueAsync throws on every call: the dispatch pass's task could fault if the // failure were not handled, which would leave _dispatching stuck true forever. // Count every claim via a thread-safe counter so the assertions can reason about // the number of sweeps that actually ran without depending on NSubstitute's // exact-count matchers (see the de-race note below). var claimAttempts = 0; _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns>(_ => { Interlocked.Increment(ref claimAttempts); throw new InvalidOperationException("db down"); }); var actor = CreateActor([]); // First tick: the pass faults internally but must still clear the in-flight guard. actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert( () => Assert.True(Volatile.Read(ref claimAttempts) >= 1), duration: TimeSpan.FromSeconds(10)); // The guard must clear after the faulted sweep so a fresh tick runs another sweep. // // De-race (S11): the in-flight guard (_dispatching) is only lowered when the // faulted sweep's DispatchComplete round-trips back to the actor — which happens // AFTER the GetDueAsync side-effect the barrier above observes. So a single second // tick sent right after that barrier can race the guard while it is still up and // be silently dropped (the dispatcher's intended overlap protection), leaving the // claim count stuck at one forever — a dropped tick is never retried, so widening // the timeout cannot recover it. Re-tell the tick on every poll iteration instead: // a tick that lands while the guard is still up is harmlessly dropped, and the // first tick that lands after the guard lowers runs a fresh sweep. The assertion // is "at least two sweeps ran" rather than "exactly two": that is strictly the // wedge invariant under test (a wedged guard would pin the count at one no matter // how many ticks arrive), and an at-least assertion is immune to the surplus ticks // the re-tell driver may land under CPU starvation. AwaitAssert( () => { actor.Tell(InternalMessages.DispatchTick.Instance); Assert.True( Volatile.Read(ref claimAttempts) >= 2, "the in-flight guard wedged: a second dispatch tick never ran a fresh sweep"); }, duration: TimeSpan.FromSeconds(10)); } [Fact] public void TransientFailure_WithZeroMaxRetries_RetriesUsingFallback_DoesNotParkImmediately() { // NO-002: SmtpConfiguration.MaxRetries=0 used to satisfy 1 >= 0 on the very first // transient failure and park the row without a single retry. ResolveRetryPolicyAsync // now clamps non-positive MaxRetries to the FallbackMaxRetries (10) so transient // failures actually retry before parking. SetupSmtpRetryPolicy(maxRetries: 0, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(retryCount: 0); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Retrying && n.RetryCount == 1 && n.NextAttemptAt != null && n.LastError == "smtp timeout"), Arg.Any()); }); } [Fact] public void TransientFailure_WithNegativeMaxRetries_RetriesUsingFallback_DoesNotParkImmediately() { // NO-002: a negative MaxRetries reaches ResolveRetryPolicyAsync just as -1 — same // park-immediately bug. Clamp to FallbackMaxRetries. SetupSmtpRetryPolicy(maxRetries: -1, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(retryCount: 0); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Retrying && n.RetryCount == 1 && n.NextAttemptAt != null && n.LastError == "smtp timeout"), Arg.Any()); }); } [Fact] public void TransientFailure_WithNonPositiveRetryDelay_UsesFallbackDelay_NotZero() { // NO-002: a non-positive RetryDelay would burn-loop the dispatcher because // NextAttemptAt would equal now. Clamp to FallbackRetryDelay (1 min) so the // schedule actually advances. SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.Zero); var before = DateTimeOffset.UtcNow; var notification = MakeNotification(retryCount: 0); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout")); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); AwaitAssert(() => { _outboxRepository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Retrying && n.NextAttemptAt != null && n.NextAttemptAt > before + TimeSpan.FromSeconds(30)), Arg.Any()); }); } [Fact] public void PostStop_CancelsInFlightDelivery_LeavesRowNonTerminal() { // NO-003: the dispatcher used to drop the CancellationToken on its way into // the channel adapter, so a coordinated shutdown had to wait the full SMTP // connect/auth/send timeout per in-flight notification before the sweep // finished. The actor now passes a lifecycle-scoped token; cancelling it on // PostStop must abort the in-flight Task.Delay (standing in for an SMTP // send) and the row must NOT be updated to a terminal state — the next // active node picks it back up. SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); // Long delay simulates a slow SMTP send; the test triggers PostStop before // the delay would naturally elapse, so the only way the delay completes is // if the token wired through. var adapter = new StubAdapter( () => DeliveryOutcome.Success("ops@example.com"), delay: TimeSpan.FromSeconds(30)); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); // Wait until the adapter is actually in flight before stopping. AwaitAssert(() => Assert.Equal(1, adapter.CallCount)); var start = DateTimeOffset.UtcNow; Sys.Stop(actor); // The sweep should observe cancellation promptly (well under the 30s delay). AwaitAssert( () => { // No UpdateAsync was issued — the row is untouched and will be re-claimed // by the next active node. _outboxRepository.DidNotReceive().UpdateAsync( Arg.Any(), Arg.Any()); }, duration: TimeSpan.FromSeconds(5)); Assert.True(DateTimeOffset.UtcNow - start < TimeSpan.FromSeconds(5), "PostStop did not cancel the in-flight delivery promptly."); } // ── NotificationOutbox-006: adapter dictionary cached for the actor's lifetime ── [Fact] public void Dispatch_ResolvesAdaptersOnce_AcrossMultipleSweeps() { // NotificationOutbox-006: adapter registration is static per process lifetime, // so the NotificationType -> adapter lookup must be built ONCE for the actor's // lifetime, not per dispatch sweep. The cache is paired with an actor-lifetime // DI scope (see _adaptersScope) so scoped adapter instances are reused safely. SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); // Isolated substitutes for this test — we replace the dispatcher's per-sweep // INotificationOutboxRepository registration with a private counting factory, // so we don't mutate the shared _outboxRepository field that other tests in // this class configure differently. var outboxRepository = Substitute.For(); // De-race (S11): hand out a fresh due notification for the FIRST THREE claims, then // an empty batch forever. This caps the deliverable work — and therefore the // UpdateAsync count — at exactly three, no matter how many dispatch ticks the // driver below fires. Without this ceiling the exact-count barrier is doubly // brittle: a single one-shot tick that races the in-flight guard is silently // dropped (stalling the count below three), while a tick-per-poll driver lets // queued ticks pile up under CPU starvation and overshoot three. Capping the // claimable batch lets the driver re-tell ticks idempotently — surplus sweeps // claim nothing and never call UpdateAsync — so Received(3) is a stable target // that, once reached, cannot be exceeded. var dueClaims = 0; outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(_ => Interlocked.Increment(ref dueClaims) <= 3 ? new[] { MakeNotification() } : Array.Empty()); // Counting factory: increments each time the DI container resolves an // INotificationDeliveryAdapter. Pre-fix this would have ticked once per // sweep; post-fix it ticks exactly once for the actor's lifetime. var resolutionCount = 0; var services = new ServiceCollection(); services.AddScoped(_ => outboxRepository); services.AddScoped(_ => _notificationRepository); services.AddScoped(_ => { Interlocked.Increment(ref resolutionCount); return new StubAdapter(() => DeliveryOutcome.Success("ops@example.com")); }); var sp = services.BuildServiceProvider(); var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( sp, new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, new NoOpCentralAuditWriter(), NullLogger.Instance))); // Drive dispatch sweeps until all three claimable notifications have been // delivered (Received(3).UpdateAsync). DrivePollingTick re-tells the tick on // every poll iteration; the GetDueAsync cap above makes that idempotent (a // surplus sweep claims an empty batch and calls UpdateAsync zero times), so // the count climbs to exactly three and then holds there — Received(3) is a // stable ceiling rather than a fragile one-shot target. This proves multiple // dispatch sweeps ran across the actor's lifetime, which is the precondition // for the resolve-adapters-once assertion that follows. DrivePollingTick(actor, () => outboxRepository.Received(3).UpdateAsync( Arg.Any(), Arg.Any())); // The adapter resolution must have happened EXACTLY ONCE despite the multiple // dispatch sweeps just driven. Pre-fix this would have been once per sweep. // This is the real invariant under test (NotificationOutbox-006). Assert.Equal(1, resolutionCount); } [Fact] public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently() { SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1)); var notification = MakeNotification(); _outboxRepository.GetDueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new[] { notification }); // Slow adapter keeps the first sweep in flight while the second tick arrives. var adapter = new StubAdapter( () => DeliveryOutcome.Success("ops@example.com"), delay: TimeSpan.FromMilliseconds(800)); var actor = CreateActor([adapter]); actor.Tell(InternalMessages.DispatchTick.Instance); actor.Tell(InternalMessages.DispatchTick.Instance); // Second tick is dropped by the in-flight guard: only one sweep runs. AwaitAssert( () => _outboxRepository.Received(1).GetDueAsync( Arg.Any(), Arg.Any(), Arg.Any()), duration: TimeSpan.FromSeconds(2)); } }