using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Commons.Types.Notifications;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Tests.TestSupport;
namespace ScadaLink.NotificationOutbox.Tests;
///
/// Task 15: Tests for the query surface — the
/// paginated outbox query, single-notification status query, manual retry and discard
/// of parked notifications, and the KPI snapshot.
///
public class NotificationOutboxActorQueryTests : TestKit
{
private readonly INotificationOutboxRepository _repository =
Substitute.For();
private IServiceProvider BuildServiceProvider()
{
var services = new ServiceCollection();
services.AddScoped(_ => _repository);
return services.BuildServiceProvider();
}
private IActorRef CreateActor(NotificationOutboxOptions? options = null)
{
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
BuildServiceProvider(),
// A long dispatch interval keeps the dispatch loop from interfering with these tests.
options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
new NoOpCentralAuditWriter(),
NullLogger.Instance)));
}
private static Notification MakeNotification(
NotificationStatus status = NotificationStatus.Pending,
DateTimeOffset? createdAt = null,
int retryCount = 0,
string? lastError = null,
DateTimeOffset? deliveredAt = null)
{
return new Notification(
Guid.NewGuid().ToString(), NotificationType.Email, "ops-team", "Subject", "Body", "site-1")
{
Status = status,
CreatedAt = createdAt ?? DateTimeOffset.UtcNow,
RetryCount = retryCount,
LastError = lastError,
DeliveredAt = deliveredAt,
SourceInstanceId = "instance-42",
};
}
[Fact]
public void Query_PassesFilterFromRequest_AndMapsRowsToSummaries()
{
var now = DateTimeOffset.UtcNow;
var staleRow = MakeNotification(
status: NotificationStatus.Pending, createdAt: now - TimeSpan.FromHours(1));
var freshRow = MakeNotification(
status: NotificationStatus.Pending, createdAt: now);
_repository.QueryAsync(
Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(((IReadOnlyList)new[] { staleRow, freshRow }, 2));
var actor = CreateActor();
actor.Tell(
new NotificationOutboxQueryRequest(
CorrelationId: "corr-1",
StatusFilter: null,
TypeFilter: null,
SourceSiteFilter: "site-1",
ListNameFilter: "ops-team",
StuckOnly: false,
SubjectKeyword: "tank",
From: null,
To: null,
PageNumber: 2,
PageSize: 25),
TestActor);
var response = ExpectMsg();
Assert.Equal("corr-1", response.CorrelationId);
Assert.True(response.Success);
Assert.Null(response.ErrorMessage);
Assert.Equal(2, response.TotalCount);
Assert.Equal(2, response.Notifications.Count);
_repository.Received(1).QueryAsync(
Arg.Is(f =>
f.SourceSiteId == "site-1" &&
f.ListName == "ops-team" &&
f.SubjectKeyword == "tank" &&
f.Status == null &&
f.Type == null &&
f.StuckOnly == false),
2, 25, Arg.Any());
// IsStuck: the hour-old Pending row is stuck, the just-created one is not.
var staleSummary = response.Notifications.Single(s => s.NotificationId == staleRow.NotificationId);
var freshSummary = response.Notifications.Single(s => s.NotificationId == freshRow.NotificationId);
Assert.True(staleSummary.IsStuck);
Assert.False(freshSummary.IsStuck);
Assert.Equal("Pending", staleSummary.Status);
Assert.Equal("Email", staleSummary.Type);
Assert.Equal("site-1", staleSummary.SourceSiteId);
}
[Fact]
public void Query_WithStatusFilterString_ParsesToEnumOnFilter()
{
_repository.QueryAsync(
Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(((IReadOnlyList)Array.Empty(), 0));
var actor = CreateActor();
actor.Tell(
new NotificationOutboxQueryRequest(
CorrelationId: "corr-2",
StatusFilter: "Parked",
TypeFilter: "Email",
SourceSiteFilter: null,
ListNameFilter: null,
StuckOnly: true,
SubjectKeyword: null,
From: null,
To: null,
PageNumber: 1,
PageSize: 50),
TestActor);
ExpectMsg();
_repository.Received(1).QueryAsync(
Arg.Is(f =>
f.Status == NotificationStatus.Parked &&
f.Type == NotificationType.Email &&
f.StuckOnly == true &&
f.StuckCutoff != null),
1, 50, Arg.Any());
}
[Fact]
public void Query_RepositoryThrows_RepliesFailureWithEmptyList()
{
_repository.QueryAsync(
Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.ThrowsAsync(new InvalidOperationException("db down"));
var actor = CreateActor();
actor.Tell(
new NotificationOutboxQueryRequest(
"corr-err", null, null, null, null, false, null, null, null, 1, 50),
TestActor);
var response = ExpectMsg();
Assert.Equal("corr-err", response.CorrelationId);
Assert.False(response.Success);
Assert.NotNull(response.ErrorMessage);
Assert.Contains("db down", response.ErrorMessage);
Assert.Empty(response.Notifications);
}
[Fact]
public void StatusQuery_Found_RepliesWithRowDetail()
{
var row = MakeNotification(
status: NotificationStatus.Retrying, retryCount: 3, lastError: "smtp timeout");
_repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row);
var actor = CreateActor();
actor.Tell(new NotificationStatusQuery("corr-3", row.NotificationId), TestActor);
var response = ExpectMsg();
Assert.Equal("corr-3", response.CorrelationId);
Assert.True(response.Found);
Assert.Equal("Retrying", response.Status);
Assert.Equal(3, response.RetryCount);
Assert.Equal("smtp timeout", response.LastError);
}
[Fact]
public void StatusQuery_NotFound_RepliesFoundFalse()
{
_repository.GetByIdAsync(Arg.Any(), Arg.Any())
.Returns((Notification?)null);
var actor = CreateActor();
actor.Tell(new NotificationStatusQuery("corr-4", "missing-id"), TestActor);
var response = ExpectMsg();
Assert.Equal("corr-4", response.CorrelationId);
Assert.False(response.Found);
Assert.Equal(string.Empty, response.Status);
Assert.Equal(0, response.RetryCount);
}
[Fact]
public void Retry_ParkedNotification_ResetsToPending_AndSucceeds()
{
var row = MakeNotification(status: NotificationStatus.Parked, retryCount: 10, lastError: "gave up");
row.NextAttemptAt = DateTimeOffset.UtcNow;
_repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row);
var actor = CreateActor();
actor.Tell(new RetryNotificationRequest("corr-5", row.NotificationId), TestActor);
var response = ExpectMsg();
Assert.Equal("corr-5", response.CorrelationId);
Assert.True(response.Success);
Assert.Null(response.ErrorMessage);
_repository.Received(1).UpdateAsync(
Arg.Is(n =>
n.NotificationId == row.NotificationId &&
n.Status == NotificationStatus.Pending &&
n.RetryCount == 0 &&
n.NextAttemptAt == null &&
n.LastError == null),
Arg.Any());
}
[Fact]
public void Retry_NonParkedNotification_Fails()
{
var row = MakeNotification(status: NotificationStatus.Delivered);
_repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row);
var actor = CreateActor();
actor.Tell(new RetryNotificationRequest("corr-6", row.NotificationId), TestActor);
var response = ExpectMsg();
Assert.False(response.Success);
Assert.NotNull(response.ErrorMessage);
_repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any());
}
[Fact]
public void Retry_MissingNotification_Fails()
{
_repository.GetByIdAsync(Arg.Any(), Arg.Any())
.Returns((Notification?)null);
var actor = CreateActor();
actor.Tell(new RetryNotificationRequest("corr-7", "missing-id"), TestActor);
var response = ExpectMsg();
Assert.False(response.Success);
Assert.NotNull(response.ErrorMessage);
Assert.Contains("not found", response.ErrorMessage);
}
[Fact]
public void Discard_ParkedNotification_MarksDiscarded_AndSucceeds()
{
var row = MakeNotification(status: NotificationStatus.Parked);
_repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row);
var actor = CreateActor();
actor.Tell(new DiscardNotificationRequest("corr-8", row.NotificationId), TestActor);
var response = ExpectMsg();
Assert.Equal("corr-8", response.CorrelationId);
Assert.True(response.Success);
_repository.Received(1).UpdateAsync(
Arg.Is(n => n.Status == NotificationStatus.Discarded),
Arg.Any());
}
[Fact]
public void Discard_NonParkedNotification_Fails()
{
var row = MakeNotification(status: NotificationStatus.Pending);
_repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row);
var actor = CreateActor();
actor.Tell(new DiscardNotificationRequest("corr-9", row.NotificationId), TestActor);
var response = ExpectMsg();
Assert.False(response.Success);
Assert.NotNull(response.ErrorMessage);
_repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any());
}
[Fact]
public void Discard_MissingNotification_Fails()
{
_repository.GetByIdAsync(Arg.Any(), Arg.Any())
.Returns((Notification?)null);
var actor = CreateActor();
actor.Tell(new DiscardNotificationRequest("corr-10", "missing-id"), TestActor);
var response = ExpectMsg();
Assert.False(response.Success);
Assert.Contains("not found", response.ErrorMessage);
}
[Fact]
public void KpiRequest_ComputesKpis_AndMapsSnapshot()
{
var snapshot = new NotificationKpiSnapshot(
QueueDepth: 7,
StuckCount: 2,
ParkedCount: 3,
DeliveredLastInterval: 12,
OldestPendingAge: TimeSpan.FromMinutes(4));
_repository.ComputeKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any())
.Returns(snapshot);
var actor = CreateActor();
actor.Tell(new NotificationKpiRequest("corr-11"), TestActor);
var response = ExpectMsg();
Assert.Equal("corr-11", response.CorrelationId);
Assert.True(response.Success);
Assert.Null(response.ErrorMessage);
Assert.Equal(7, response.QueueDepth);
Assert.Equal(2, response.StuckCount);
Assert.Equal(3, response.ParkedCount);
Assert.Equal(12, response.DeliveredLastInterval);
Assert.Equal(TimeSpan.FromMinutes(4), response.OldestPendingAge);
_repository.Received(1).ComputeKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any());
}
[Fact]
public void KpiRequest_RepositoryThrows_RepliesFailureResponse()
{
_repository.ComputeKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any())
.ThrowsAsync(new InvalidOperationException("kpi db down"));
var actor = CreateActor();
actor.Tell(new NotificationKpiRequest("corr-12"), TestActor);
// A repository fault yields a failure NotificationKpiResponse, not a Status.Failure.
var response = ExpectMsg();
Assert.Equal("corr-12", response.CorrelationId);
Assert.False(response.Success);
Assert.NotNull(response.ErrorMessage);
Assert.Contains("kpi db down", response.ErrorMessage);
Assert.Equal(0, response.QueueDepth);
Assert.Equal(0, response.StuckCount);
Assert.Equal(0, response.ParkedCount);
Assert.Equal(0, response.DeliveredLastInterval);
Assert.Null(response.OldestPendingAge);
}
[Fact]
public void PerSiteKpiRequest_RepliesWithPerSiteSnapshots()
{
_repository.ComputePerSiteKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any())
.Returns(new List
{
new("plant-a", 4, 1, 0, 9, TimeSpan.FromMinutes(7)),
});
var actor = CreateActor();
actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor);
var response = ExpectMsg();
Assert.True(response.Success);
Assert.Null(response.ErrorMessage);
Assert.Equal("corr-ps", response.CorrelationId);
Assert.Single(response.Sites);
Assert.Equal("plant-a", response.Sites[0].SourceSiteId);
_repository.Received(1).ComputePerSiteKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any());
}
[Fact]
public void PerSiteKpiRequest_RepositoryFault_RepliesUnsuccessful()
{
_repository.ComputePerSiteKpisAsync(
Arg.Any(), Arg.Any(), Arg.Any())
.ThrowsAsync(new InvalidOperationException("db down"));
var actor = CreateActor();
actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor);
var response = ExpectMsg();
Assert.False(response.Success);
Assert.Equal("corr-ps", response.CorrelationId);
Assert.NotNull(response.ErrorMessage);
Assert.Contains("db down", response.ErrorMessage);
Assert.Empty(response.Sites);
}
}