Files
Joseph Doherty 7b0b9c7365 refactor: rename ScadaLink → ZB.MOM.WW.ScadaBridge (code + projects + namespaces)
Solution + 23 src projects + 26 test projects renamed; folders, csproj,
namespaces, and ScadaLinkDbContext/ScadaBridgeDbContext class updated.
ActorSystem "scadalink" → "scadabridge", Akka seed-node URLs migrated.
SQL roles/logins, LDAP domains, CLI command name, and CLI config dir
(~/.scadalink → ~/.scadabridge) also renamed.

Build green; 5 Host.Tests fail awaiting SQL login rename in next commit.
Pre-existing StaleTagMonitor timing flakes unchanged.

Rename script committed at tools/rename-to-scadabridge.sh.
2026-05-28 09:37:45 -04:00

478 lines
21 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Notifications;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Delivery;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Messages;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests.TestSupport;
namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox.Tests;
/// <summary>
/// Task 14: Tests for the <see cref="NotificationOutboxActor"/> dispatcher loop — the
/// periodic sweep that claims due notifications via
/// <see cref="INotificationOutboxRepository.GetDueAsync"/>, delivers each through the
/// matching <see cref="INotificationDeliveryAdapter"/>, and applies the resulting status
/// transition with <see cref="INotificationOutboxRepository.UpdateAsync"/>.
/// </summary>
public class NotificationOutboxActorDispatchTests : TestKit
{
private readonly INotificationOutboxRepository _outboxRepository =
Substitute.For<INotificationOutboxRepository>();
private readonly INotificationRepository _notificationRepository =
Substitute.For<INotificationRepository>();
private IServiceProvider BuildServiceProvider(
IEnumerable<INotificationDeliveryAdapter> adapters)
{
var services = new ServiceCollection();
services.AddScoped(_ => _outboxRepository);
services.AddScoped(_ => _notificationRepository);
// The actor resolves the channel adapters from its per-sweep DI scope; register
// each stub adapter under the INotificationDeliveryAdapter service.
foreach (var adapter in adapters)
{
services.AddScoped<INotificationDeliveryAdapter>(_ => adapter);
}
return services.BuildServiceProvider();
}
/// <summary>
/// Stub adapter whose <see cref="DeliverAsync"/> returns a configurable outcome and
/// optionally blocks for a delay — used to exercise the overlapping-tick guard.
/// </summary>
private sealed class StubAdapter : INotificationDeliveryAdapter
{
private readonly Func<DeliveryOutcome> _outcome;
private readonly TimeSpan _delay;
public StubAdapter(Func<DeliveryOutcome> outcome, TimeSpan? delay = null)
{
_outcome = outcome;
_delay = delay ?? TimeSpan.Zero;
}
public int CallCount;
public NotificationType Type => NotificationType.Email;
public async Task<DeliveryOutcome> DeliverAsync(
Notification notification, CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref CallCount);
if (_delay > TimeSpan.Zero)
{
await Task.Delay(_delay, cancellationToken);
}
return _outcome();
}
}
private IActorRef CreateActor(
IEnumerable<INotificationDeliveryAdapter> adapters,
NotificationOutboxOptions? options = null)
{
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
BuildServiceProvider(adapters),
options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
new NoOpCentralAuditWriter(),
NullLogger<NotificationOutboxActor>.Instance)));
}
private static Notification MakeNotification(
NotificationType type = NotificationType.Email, int retryCount = 0)
{
return new Notification(
Guid.NewGuid().ToString(), type, "ops-team", "Subject", "Body", "site-1")
{
RetryCount = retryCount,
CreatedAt = DateTimeOffset.UtcNow,
};
}
private void SetupSmtpRetryPolicy(int maxRetries, TimeSpan retryDelay)
{
var config = new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
{
MaxRetries = maxRetries,
RetryDelay = retryDelay,
};
_notificationRepository.GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>())
.Returns(new[] { config });
}
[Fact]
public void DispatchTick_ClaimsDueNotifications_AndInvokesAdapter()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>());
Assert.Equal(1, adapter.CallCount);
});
}
[Fact]
public void Success_MarksNotificationDelivered_WithResolvedTargets()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Delivered &&
n.DeliveredAt != null &&
n.LastAttemptAt != null &&
n.ResolvedTargets == "ops@example.com" &&
n.LastError == null),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void TransientFailure_BelowMaxRetries_MarksRetrying_AndSchedulesNextAttempt()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(3));
var notification = MakeNotification(retryCount: 1);
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Retrying &&
n.RetryCount == 2 &&
n.NextAttemptAt != null &&
n.LastError == "smtp timeout" &&
n.LastAttemptAt != null),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void TransientFailure_ReachingMaxRetries_MarksParked()
{
SetupSmtpRetryPolicy(maxRetries: 3, retryDelay: TimeSpan.FromMinutes(1));
// RetryCount starts at max-1; the failed attempt increments it to max.
var notification = MakeNotification(retryCount: 2);
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Parked &&
n.RetryCount == 3),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void PermanentFailure_MarksParked_WithLastError()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Permanent("invalid recipient address"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Parked &&
n.LastError == "invalid recipient address" &&
n.LastAttemptAt != null),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void NoAdapterForType_MarksParked_WithExplanatoryError()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
// No adapters registered: none resolves for the notification's type.
var actor = CreateActor([]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Parked &&
n.LastError != null &&
n.LastError.Contains("no delivery adapter") &&
n.LastAttemptAt != null),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void FaultedDispatchPass_ClearsInFlightGuard_SoNextTickStillRuns()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
// GetDueAsync throws on every call: the dispatch pass's task could fault if the
// failure were not handled, which would leave _dispatching stuck true forever.
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns<IReadOnlyList<Notification>>(_ => throw new InvalidOperationException("db down"));
var actor = CreateActor([]);
// First tick: the pass faults internally but must still clear the in-flight guard.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => _outboxRepository.Received(1).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()));
// Second tick after the first completes: if the guard had wedged, this would be
// dropped and GetDueAsync would still show only one call.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => _outboxRepository.Received(2).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()));
}
[Fact]
public void TransientFailure_WithZeroMaxRetries_RetriesUsingFallback_DoesNotParkImmediately()
{
// NO-002: SmtpConfiguration.MaxRetries=0 used to satisfy 1 >= 0 on the very first
// transient failure and park the row without a single retry. ResolveRetryPolicyAsync
// now clamps non-positive MaxRetries to the FallbackMaxRetries (10) so transient
// failures actually retry before parking.
SetupSmtpRetryPolicy(maxRetries: 0, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification(retryCount: 0);
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Retrying &&
n.RetryCount == 1 &&
n.NextAttemptAt != null &&
n.LastError == "smtp timeout"),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void TransientFailure_WithNegativeMaxRetries_RetriesUsingFallback_DoesNotParkImmediately()
{
// NO-002: a negative MaxRetries reaches ResolveRetryPolicyAsync just as -1 — same
// park-immediately bug. Clamp to FallbackMaxRetries.
SetupSmtpRetryPolicy(maxRetries: -1, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification(retryCount: 0);
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Retrying &&
n.RetryCount == 1 &&
n.NextAttemptAt != null &&
n.LastError == "smtp timeout"),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void TransientFailure_WithNonPositiveRetryDelay_UsesFallbackDelay_NotZero()
{
// NO-002: a non-positive RetryDelay would burn-loop the dispatcher because
// NextAttemptAt would equal now. Clamp to FallbackRetryDelay (1 min) so the
// schedule actually advances.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.Zero);
var before = DateTimeOffset.UtcNow;
var notification = MakeNotification(retryCount: 0);
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
var adapter = new StubAdapter(() => DeliveryOutcome.Transient("smtp timeout"));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() =>
{
_outboxRepository.Received(1).UpdateAsync(
Arg.Is<Notification>(n =>
n.Status == NotificationStatus.Retrying &&
n.NextAttemptAt != null &&
n.NextAttemptAt > before + TimeSpan.FromSeconds(30)),
Arg.Any<CancellationToken>());
});
}
[Fact]
public void PostStop_CancelsInFlightDelivery_LeavesRowNonTerminal()
{
// NO-003: the dispatcher used to drop the CancellationToken on its way into
// the channel adapter, so a coordinated shutdown had to wait the full SMTP
// connect/auth/send timeout per in-flight notification before the sweep
// finished. The actor now passes a lifecycle-scoped token; cancelling it on
// PostStop must abort the in-flight Task.Delay (standing in for an SMTP
// send) and the row must NOT be updated to a terminal state — the next
// active node picks it back up.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
// Long delay simulates a slow SMTP send; the test triggers PostStop before
// the delay would naturally elapse, so the only way the delay completes is
// if the token wired through.
var adapter = new StubAdapter(
() => DeliveryOutcome.Success("ops@example.com"),
delay: TimeSpan.FromSeconds(30));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
// Wait until the adapter is actually in flight before stopping.
AwaitAssert(() => Assert.Equal(1, adapter.CallCount));
var start = DateTimeOffset.UtcNow;
Sys.Stop(actor);
// The sweep should observe cancellation promptly (well under the 30s delay).
AwaitAssert(
() =>
{
// No UpdateAsync was issued — the row is untouched and will be re-claimed
// by the next active node.
_outboxRepository.DidNotReceive().UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>());
},
duration: TimeSpan.FromSeconds(5));
Assert.True(DateTimeOffset.UtcNow - start < TimeSpan.FromSeconds(5),
"PostStop did not cancel the in-flight delivery promptly.");
}
// ── NotificationOutbox-006: adapter dictionary cached for the actor's lifetime ──
[Fact]
public void Dispatch_ResolvesAdaptersOnce_AcrossMultipleSweeps()
{
// NotificationOutbox-006: adapter registration is static per process lifetime,
// so the NotificationType -> adapter lookup must be built ONCE for the actor's
// lifetime, not per dispatch sweep. The cache is paired with an actor-lifetime
// DI scope (see _adaptersScope) so scoped adapter instances are reused safely.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
// Isolated substitutes for this test — we replace the dispatcher's per-sweep
// INotificationOutboxRepository registration with a private counting factory,
// so we don't mutate the shared _outboxRepository field that other tests in
// this class configure differently.
var outboxRepository = Substitute.For<INotificationOutboxRepository>();
outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(_ => new[] { MakeNotification() });
// Counting factory: increments each time the DI container resolves an
// INotificationDeliveryAdapter. Pre-fix this would have ticked once per
// sweep; post-fix it ticks exactly once for the actor's lifetime.
var resolutionCount = 0;
var services = new ServiceCollection();
services.AddScoped(_ => outboxRepository);
services.AddScoped(_ => _notificationRepository);
services.AddScoped<INotificationDeliveryAdapter>(_ =>
{
Interlocked.Increment(ref resolutionCount);
return new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
});
var sp = services.BuildServiceProvider();
var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
sp,
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
new NoOpCentralAuditWriter(),
NullLogger<NotificationOutboxActor>.Instance)));
// Fire three sweeps end-to-end. Each waits on the previous via the
// in-flight guard, so the UpdateAsync count climbs monotonically.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(1).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(2).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(3).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
// The adapter resolution must have happened EXACTLY ONCE despite three
// dispatch sweeps. Pre-fix this would have been 3 (or more).
Assert.Equal(1, resolutionCount);
}
[Fact]
public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently()
{
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
var notification = MakeNotification();
_outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(new[] { notification });
// Slow adapter keeps the first sweep in flight while the second tick arrives.
var adapter = new StubAdapter(
() => DeliveryOutcome.Success("ops@example.com"),
delay: TimeSpan.FromMilliseconds(800));
var actor = CreateActor([adapter]);
actor.Tell(InternalMessages.DispatchTick.Instance);
actor.Tell(InternalMessages.DispatchTick.Instance);
// Second tick is dropped by the in-flight guard: only one sweep runs.
AwaitAssert(
() => _outboxRepository.Received(1).GetDueAsync(
Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>()),
duration: TimeSpan.FromSeconds(2));
}
}