540 lines
25 KiB
C#
540 lines
25 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NSubstitute;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Notifications;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
|
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Delivery;
|
|
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Messages;
|
|
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests.TestSupport;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests;
|
|
|
|
/// <summary>
|
|
/// Task 14: Tests for the <see cref="NotificationOutboxActor"/> dispatcher loop — the
|
|
/// periodic sweep that claims due notifications via
|
|
/// <see cref="INotificationOutboxRepository.GetDueAsync"/>, delivers each through the
|
|
/// matching <see cref="INotificationDeliveryAdapter"/>, and applies the resulting status
|
|
/// transition with <see cref="INotificationOutboxRepository.UpdateAsync"/>.
|
|
/// </summary>
|
|
public class NotificationOutboxActorDispatchTests : TestKit
|
|
{
|
|
private readonly INotificationOutboxRepository _outboxRepository =
|
|
Substitute.For<INotificationOutboxRepository>();
|
|
|
|
private readonly INotificationRepository _notificationRepository =
|
|
Substitute.For<INotificationRepository>();
|
|
|
|
private IServiceProvider BuildServiceProvider(
|
|
IEnumerable<INotificationDeliveryAdapter> adapters)
|
|
{
|
|
var services = new ServiceCollection();
|
|
services.AddScoped(_ => _outboxRepository);
|
|
services.AddScoped(_ => _notificationRepository);
|
|
// The actor resolves the channel adapters from its per-sweep DI scope; register
|
|
// each stub adapter under the INotificationDeliveryAdapter service.
|
|
foreach (var adapter in adapters)
|
|
{
|
|
services.AddScoped<INotificationDeliveryAdapter>(_ => adapter);
|
|
}
|
|
|
|
return services.BuildServiceProvider();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stub adapter whose <see cref="DeliverAsync"/> returns a configurable outcome and
|
|
/// optionally blocks for a delay — used to exercise the overlapping-tick guard.
|
|
/// </summary>
|
|
private sealed class StubAdapter : INotificationDeliveryAdapter
|
|
{
|
|
private readonly Func<DeliveryOutcome> _outcome;
|
|
private readonly TimeSpan _delay;
|
|
|
|
public StubAdapter(Func<DeliveryOutcome> outcome, TimeSpan? delay = null)
|
|
{
|
|
_outcome = outcome;
|
|
_delay = delay ?? TimeSpan.Zero;
|
|
}
|
|
|
|
public int CallCount;
|
|
|
|
public NotificationType Type => NotificationType.Email;
|
|
|
|
public async Task<DeliveryOutcome> DeliverAsync(
|
|
Notification notification, CancellationToken cancellationToken = default)
|
|
{
|
|
Interlocked.Increment(ref CallCount);
|
|
if (_delay > TimeSpan.Zero)
|
|
{
|
|
await Task.Delay(_delay, cancellationToken);
|
|
}
|
|
|
|
return _outcome();
|
|
}
|
|
}
|
|
|
|
private IActorRef CreateActor(
|
|
IEnumerable<INotificationDeliveryAdapter> adapters,
|
|
NotificationOutboxOptions? options = null)
|
|
{
|
|
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
|
BuildServiceProvider(adapters),
|
|
options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
|
|
new NoOpCentralAuditWriter(),
|
|
NullLogger<NotificationOutboxActor>.Instance)));
|
|
}
|
|
|
|
private static Notification MakeNotification(
|
|
NotificationType type = NotificationType.Email, int retryCount = 0)
|
|
{
|
|
return new Notification(
|
|
Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1")
|
|
{
|
|
RetryCount = retryCount,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
};
|
|
}
|
|
|
|
private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
|
|
{
|
|
var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
|
|
{
|
|
MaxRetries = maxRetries,
|
|
RetryDelay = retryDelay,
|
|
};
|
|
_notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>())
|
|
.Returns(new[] { config });
|
|
}
|
|
|
|
/// <summary>
|
|
/// Drives a sweep to completion by re-telling <see cref="InternalMessages.DispatchTick"/>
|
|
/// on every poll iteration until <paramref name="assertion"/> holds. This is the durable
|
|
/// barrier for chained-sweep tests: the in-flight guard (<c>_dispatching</c>) is only
|
|
/// lowered when a sweep's <see cref="InternalMessages.DispatchComplete"/> round-trips back
|
|
/// to the actor — AFTER the side-effect the assertion observes — so a single one-shot tick
|
|
/// can race the still-raised guard and be silently dropped (the dispatcher's intended
|
|
/// overlap protection), permanently stalling the count. A dropped tick is never retried, so
|
|
/// merely widening the wait cannot recover it. Re-telling each poll is idempotent (a tick
|
|
/// landing while the guard is up is harmlessly dropped) and the assertion's exact-count
|
|
/// strength is preserved unchanged.
|
|
/// </summary>
|
|
private void DrivePollingTick(IActorRef actor, Action assertion)
|
|
{
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
assertion();
|
|
},
|
|
duration: TimeSpan.FromSeconds(10));
|
|
}
|
|
|
|
[Fact]
|
|
public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).GetDueAsync(
|
|
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>());
|
|
Assert.Equal(1, adapter.CallCount);
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void Success_MarksNotificationDelivered_WithResolvedTargets()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Delivered &&
|
|
n.DeliveredAt != null &&
|
|
n.LastAttemptAt != null &&
|
|
n.ResolvedTargets == "ops@example.com" &&
|
|
n.LastError == null),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3));
|
|
var notification = MakeNotification(retryCount: 1);
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Retrying &&
|
|
n.RetryCount == 2 &&
|
|
n.NextAttemptAt != null &&
|
|
n.LastError == "smtp timeout" &&
|
|
n.LastAttemptAt != null),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void TransientFailure_ReachingMaxRetries_MarksParked()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1));
|
|
// RetryCount starts at max-1; the failed attempt increments it to max.
|
|
var notification = MakeNotification(retryCount: 2);
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Parked &&
|
|
n.RetryCount == 3),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void PermanentFailure_MarksParked_WithLastError()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Parked &&
|
|
n.LastError == "invalid recipient address" &&
|
|
n.LastAttemptAt != null),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void NoAdapterForType_MarksParked_WithExplanatoryError()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
// No adapters registered: none resolves for the notification's type.
|
|
var actor = CreateActor([]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Parked &&
|
|
n.LastError != null &&
|
|
n.LastError.Contains("no delivery adapter") &&
|
|
n.LastAttemptAt != null),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void FaultedDispatchPass_ClearsInFlightGuard_SoNextTickStillRuns()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
// GetDueAsync throws on every call: the dispatch pass's task could fault if the
|
|
// failure were not handled, which would leave _dispatching stuck true forever.
|
|
// Count every claim via a thread-safe counter so the assertions can reason about
|
|
// the number of sweeps that actually ran without depending on NSubstitute's
|
|
// exact-count matchers (see the de-race note below).
|
|
var claimAttempts = 0;
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns<IReadOnlyList<Notification>>(_ =>
|
|
{
|
|
Interlocked.Increment(ref claimAttempts);
|
|
throw new InvalidOperationException("db down");
|
|
});
|
|
var actor = CreateActor([]);
|
|
|
|
// First tick: the pass faults internally but must still clear the in-flight guard.
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
AwaitAssert(
|
|
() => Assert.True(Volatile.Read(ref claimAttempts) >= 1),
|
|
duration: TimeSpan.FromSeconds(10));
|
|
|
|
// The guard must clear after the faulted sweep so a fresh tick runs another sweep.
|
|
//
|
|
// De-race (S11): the in-flight guard (_dispatching) is only lowered when the
|
|
// faulted sweep's DispatchComplete round-trips back to the actor — which happens
|
|
// AFTER the GetDueAsync side-effect the barrier above observes. So a single second
|
|
// tick sent right after that barrier can race the guard while it is still up and
|
|
// be silently dropped (the dispatcher's intended overlap protection), leaving the
|
|
// claim count stuck at one forever — a dropped tick is never retried, so widening
|
|
// the timeout cannot recover it. Re-tell the tick on every poll iteration instead:
|
|
// a tick that lands while the guard is still up is harmlessly dropped, and the
|
|
// first tick that lands after the guard lowers runs a fresh sweep. The assertion
|
|
// is "at least two sweeps ran" rather than "exactly two": that is strictly the
|
|
// wedge invariant under test (a wedged guard would pin the count at one no matter
|
|
// how many ticks arrive), and an at-least assertion is immune to the surplus ticks
|
|
// the re-tell driver may land under CPU starvation.
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
Assert.True(
|
|
Volatile.Read(ref claimAttempts) >= 2,
|
|
"the in-flight guard wedged: a second dispatch tick never ran a fresh sweep");
|
|
},
|
|
duration: TimeSpan.FromSeconds(10));
|
|
}
|
|
|
|
[Fact]
|
|
public void TransientFailure_WithZeroMaxRetries_RetriesUsingFallback_DoesNotParkImmediately()
|
|
{
|
|
// NO-002: SmtpConfiguration.MaxRetries=0 used to satisfy 1 >= 0 on the very first
|
|
// transient failure and park the row without a single retry. ResolveRetryPolicyAsync
|
|
// now clamps non-positive MaxRetries to the FallbackMaxRetries (10) so transient
|
|
// failures actually retry before parking.
|
|
SetupSmtpRetryPolicy(maxRetries: 0, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification(retryCount: 0);
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Retrying &&
|
|
n.RetryCount == 1 &&
|
|
n.NextAttemptAt != null &&
|
|
n.LastError == "smtp timeout"),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void TransientFailure_WithNegativeMaxRetries_RetriesUsingFallback_DoesNotParkImmediately()
|
|
{
|
|
// NO-002: a negative MaxRetries reaches ResolveRetryPolicyAsync just as -1 — same
|
|
// park-immediately bug. Clamp to FallbackMaxRetries.
|
|
SetupSmtpRetryPolicy(maxRetries: -1, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification(retryCount: 0);
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Retrying &&
|
|
n.RetryCount == 1 &&
|
|
n.NextAttemptAt != null &&
|
|
n.LastError == "smtp timeout"),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void TransientFailure_WithNonPositiveRetryDelay_UsesFallbackDelay_NotZero()
|
|
{
|
|
// NO-002: a non-positive RetryDelay would burn-loop the dispatcher because
|
|
// NextAttemptAt would equal now. Clamp to FallbackRetryDelay (1 min) so the
|
|
// schedule actually advances.
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.Zero);
|
|
var before = DateTimeOffset.UtcNow;
|
|
var notification = MakeNotification(retryCount: 0);
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
_outboxRepository.Received(1).UpdateAsync(
|
|
Arg.Is<Notification>(n =>
|
|
n.Status == NotificationStatus.Retrying &&
|
|
n.NextAttemptAt != null &&
|
|
n.NextAttemptAt > before + TimeSpan.FromSeconds(30)),
|
|
Arg.Any<CancellationToken>());
|
|
});
|
|
}
|
|
|
|
[Fact]
|
|
public void PostStop_CancelsInFlightDelivery_LeavesRowNonTerminal()
|
|
{
|
|
// NO-003: the dispatcher used to drop the CancellationToken on its way into
|
|
// the channel adapter, so a coordinated shutdown had to wait the full SMTP
|
|
// connect/auth/send timeout per in-flight notification before the sweep
|
|
// finished. The actor now passes a lifecycle-scoped token; cancelling it on
|
|
// PostStop must abort the in-flight Task.Delay (standing in for an SMTP
|
|
// send) and the row must NOT be updated to a terminal state — the next
|
|
// active node picks it back up.
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
// Long delay simulates a slow SMTP send; the test triggers PostStop before
|
|
// the delay would naturally elapse, so the only way the delay completes is
|
|
// if the token wired through.
|
|
var adapter = new StubAdapter(
|
|
() => DeliveryOutcome.Success("ops@example.com"),
|
|
delay: TimeSpan.FromSeconds(30));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
// Wait until the adapter is actually in flight before stopping.
|
|
AwaitAssert(() => Assert.Equal(1, adapter.CallCount));
|
|
|
|
var start = DateTimeOffset.UtcNow;
|
|
Sys.Stop(actor);
|
|
|
|
// The sweep should observe cancellation promptly (well under the 30s delay).
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
// No UpdateAsync was issued — the row is untouched and will be re-claimed
|
|
// by the next active node.
|
|
_outboxRepository.DidNotReceive().UpdateAsync(
|
|
Arg.Any<Notification>(), Arg.Any<CancellationToken>());
|
|
},
|
|
duration: TimeSpan.FromSeconds(5));
|
|
|
|
Assert.True(DateTimeOffset.UtcNow - start < TimeSpan.FromSeconds(5),
|
|
"PostStop did not cancel the in-flight delivery promptly.");
|
|
}
|
|
|
|
// ── NotificationOutbox-006: adapter dictionary cached for the actor's lifetime ──
|
|
|
|
[Fact]
|
|
public void Dispatch_ResolvesAdaptersOnce_AcrossMultipleSweeps()
|
|
{
|
|
// NotificationOutbox-006: adapter registration is static per process lifetime,
|
|
// so the NotificationType -> adapter lookup must be built ONCE for the actor's
|
|
// lifetime, not per dispatch sweep. The cache is paired with an actor-lifetime
|
|
// DI scope (see _adaptersScope) so scoped adapter instances are reused safely.
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
// Isolated substitutes for this test — we replace the dispatcher's per-sweep
|
|
// INotificationOutboxRepository registration with a private counting factory,
|
|
// so we don't mutate the shared _outboxRepository field that other tests in
|
|
// this class configure differently.
|
|
var outboxRepository = Substitute.For<INotificationOutboxRepository>();
|
|
// De-race (S11): hand out a fresh due notification for the FIRST THREE claims, then
|
|
// an empty batch forever. This caps the deliverable work — and therefore the
|
|
// UpdateAsync count — at exactly three, no matter how many dispatch ticks the
|
|
// driver below fires. Without this ceiling the exact-count barrier is doubly
|
|
// brittle: a single one-shot tick that races the in-flight guard is silently
|
|
// dropped (stalling the count below three), while a tick-per-poll driver lets
|
|
// queued ticks pile up under CPU starvation and overshoot three. Capping the
|
|
// claimable batch lets the driver re-tell ticks idempotently — surplus sweeps
|
|
// claim nothing and never call UpdateAsync — so Received(3) is a stable target
|
|
// that, once reached, cannot be exceeded.
|
|
var dueClaims = 0;
|
|
outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ => Interlocked.Increment(ref dueClaims) <= 3
|
|
? new[] { MakeNotification() }
|
|
: Array.Empty<Notification>());
|
|
|
|
// Counting factory: increments each time the DI container resolves an
|
|
// INotificationDeliveryAdapter. Pre-fix this would have ticked once per
|
|
// sweep; post-fix it ticks exactly once for the actor's lifetime.
|
|
var resolutionCount = 0;
|
|
var services = new ServiceCollection();
|
|
services.AddScoped(_ => outboxRepository);
|
|
services.AddScoped(_ => _notificationRepository);
|
|
services.AddScoped<INotificationDeliveryAdapter>(_ =>
|
|
{
|
|
Interlocked.Increment(ref resolutionCount);
|
|
return new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
|
|
});
|
|
var sp = services.BuildServiceProvider();
|
|
|
|
var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
|
sp,
|
|
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
|
|
new NoOpCentralAuditWriter(),
|
|
NullLogger<NotificationOutboxActor>.Instance)));
|
|
|
|
// Drive dispatch sweeps until all three claimable notifications have been
|
|
// delivered (Received(3).UpdateAsync). DrivePollingTick re-tells the tick on
|
|
// every poll iteration; the GetDueAsync cap above makes that idempotent (a
|
|
// surplus sweep claims an empty batch and calls UpdateAsync zero times), so
|
|
// the count climbs to exactly three and then holds there — Received(3) is a
|
|
// stable ceiling rather than a fragile one-shot target. This proves multiple
|
|
// dispatch sweeps ran across the actor's lifetime, which is the precondition
|
|
// for the resolve-adapters-once assertion that follows.
|
|
DrivePollingTick(actor, () => outboxRepository.Received(3).UpdateAsync(
|
|
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
|
|
|
|
// The adapter resolution must have happened EXACTLY ONCE despite the multiple
|
|
// dispatch sweeps just driven. Pre-fix this would have been once per sweep.
|
|
// This is the real invariant under test (NotificationOutbox-006).
|
|
Assert.Equal(1, resolutionCount);
|
|
}
|
|
|
|
[Fact]
|
|
public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently()
|
|
{
|
|
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
|
|
var notification = MakeNotification();
|
|
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
|
|
.Returns(new[] { notification });
|
|
// Slow adapter keeps the first sweep in flight while the second tick arrives.
|
|
var adapter = new StubAdapter(
|
|
() => DeliveryOutcome.Success("ops@example.com"),
|
|
delay: TimeSpan.FromMilliseconds(800));
|
|
var actor = CreateActor([adapter]);
|
|
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
actor.Tell(InternalMessages.DispatchTick.Instance);
|
|
|
|
// Second tick is dropped by the in-flight guard: only one sweep runs.
|
|
AwaitAssert(
|
|
() => _outboxRepository.Received(1).GetDueAsync(
|
|
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()),
|
|
duration: TimeSpan.FromSeconds(2));
|
|
}
|
|
}
|