using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using NSubstitute.ExceptionExtensions; using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Notifications; using ScadaLink.NotificationOutbox.Delivery; using ScadaLink.NotificationOutbox.Tests.TestSupport; namespace ScadaLink.NotificationOutbox.Tests; /// /// Task 15: Tests for the query surface — the /// paginated outbox query, single-notification status query, manual retry and discard /// of parked notifications, and the KPI snapshot. /// public class NotificationOutboxActorQueryTests : TestKit { private readonly INotificationOutboxRepository _repository = Substitute.For(); private IServiceProvider BuildServiceProvider() { var services = new ServiceCollection(); services.AddScoped(_ => _repository); return services.BuildServiceProvider(); } private IActorRef CreateActor(NotificationOutboxOptions? options = null) { return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( BuildServiceProvider(), // A long dispatch interval keeps the dispatch loop from interfering with these tests. options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, new NoOpCentralAuditWriter(), NullLogger.Instance))); } private static Notification MakeNotification( NotificationStatus status = NotificationStatus.Pending, DateTimeOffset? createdAt = null, int retryCount = 0, string? lastError = null, DateTimeOffset? deliveredAt = null) { return new Notification( Guid.NewGuid().ToString(), NotificationType.Email, "ops-team", "Subject", "Body", "site-1") { Status = status, CreatedAt = createdAt ?? DateTimeOffset.UtcNow, RetryCount = retryCount, LastError = lastError, DeliveredAt = deliveredAt, SourceInstanceId = "instance-42", }; } [Fact] public void Query_PassesFilterFromRequest_AndMapsRowsToSummaries() { var now = DateTimeOffset.UtcNow; var staleRow = MakeNotification( status: NotificationStatus.Pending, createdAt: now - TimeSpan.FromHours(1)); var freshRow = MakeNotification( status: NotificationStatus.Pending, createdAt: now); _repository.QueryAsync( Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .Returns(((IReadOnlyList)new[] { staleRow, freshRow }, 2)); var actor = CreateActor(); actor.Tell( new NotificationOutboxQueryRequest( CorrelationId: "corr-1", StatusFilter: null, TypeFilter: null, SourceSiteFilter: "site-1", ListNameFilter: "ops-team", StuckOnly: false, SubjectKeyword: "tank", From: null, To: null, PageNumber: 2, PageSize: 25), TestActor); var response = ExpectMsg(); Assert.Equal("corr-1", response.CorrelationId); Assert.True(response.Success); Assert.Null(response.ErrorMessage); Assert.Equal(2, response.TotalCount); Assert.Equal(2, response.Notifications.Count); _repository.Received(1).QueryAsync( Arg.Is(f => f.SourceSiteId == "site-1" && f.ListName == "ops-team" && f.SubjectKeyword == "tank" && f.Status == null && f.Type == null && f.StuckOnly == false), 2, 25, Arg.Any()); // IsStuck: the hour-old Pending row is stuck, the just-created one is not. var staleSummary = response.Notifications.Single(s => s.NotificationId == staleRow.NotificationId); var freshSummary = response.Notifications.Single(s => s.NotificationId == freshRow.NotificationId); Assert.True(staleSummary.IsStuck); Assert.False(freshSummary.IsStuck); Assert.Equal("Pending", staleSummary.Status); Assert.Equal("Email", staleSummary.Type); Assert.Equal("site-1", staleSummary.SourceSiteId); } [Fact] public void Query_WithStatusFilterString_ParsesToEnumOnFilter() { _repository.QueryAsync( Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .Returns(((IReadOnlyList)Array.Empty(), 0)); var actor = CreateActor(); actor.Tell( new NotificationOutboxQueryRequest( CorrelationId: "corr-2", StatusFilter: "Parked", TypeFilter: "Email", SourceSiteFilter: null, ListNameFilter: null, StuckOnly: true, SubjectKeyword: null, From: null, To: null, PageNumber: 1, PageSize: 50), TestActor); ExpectMsg(); _repository.Received(1).QueryAsync( Arg.Is(f => f.Status == NotificationStatus.Parked && f.Type == NotificationType.Email && f.StuckOnly == true && f.StuckCutoff != null), 1, 50, Arg.Any()); } [Fact] public void Query_RepositoryThrows_RepliesFailureWithEmptyList() { _repository.QueryAsync( Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .ThrowsAsync(new InvalidOperationException("db down")); var actor = CreateActor(); actor.Tell( new NotificationOutboxQueryRequest( "corr-err", null, null, null, null, false, null, null, null, 1, 50), TestActor); var response = ExpectMsg(); Assert.Equal("corr-err", response.CorrelationId); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); Assert.Contains("db down", response.ErrorMessage); Assert.Empty(response.Notifications); } [Fact] public void StatusQuery_Found_RepliesWithRowDetail() { var row = MakeNotification( status: NotificationStatus.Retrying, retryCount: 3, lastError: "smtp timeout"); _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new NotificationStatusQuery("corr-3", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.Equal("corr-3", response.CorrelationId); Assert.True(response.Found); Assert.Equal("Retrying", response.Status); Assert.Equal(3, response.RetryCount); Assert.Equal("smtp timeout", response.LastError); } [Fact] public void StatusQuery_NotFound_RepliesFoundFalse() { _repository.GetByIdAsync(Arg.Any(), Arg.Any()) .Returns((Notification?)null); var actor = CreateActor(); actor.Tell(new NotificationStatusQuery("corr-4", "missing-id"), TestActor); var response = ExpectMsg(); Assert.Equal("corr-4", response.CorrelationId); Assert.False(response.Found); Assert.Equal(string.Empty, response.Status); Assert.Equal(0, response.RetryCount); } [Fact] public void Retry_ParkedNotification_ResetsToPending_AndSucceeds() { var row = MakeNotification(status: NotificationStatus.Parked, retryCount: 10, lastError: "gave up"); row.NextAttemptAt = DateTimeOffset.UtcNow; _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new RetryNotificationRequest("corr-5", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.Equal("corr-5", response.CorrelationId); Assert.True(response.Success); Assert.Null(response.ErrorMessage); _repository.Received(1).UpdateAsync( Arg.Is(n => n.NotificationId == row.NotificationId && n.Status == NotificationStatus.Pending && n.RetryCount == 0 && n.NextAttemptAt == null && n.LastError == null), Arg.Any()); } [Fact] public void Retry_NonParkedNotification_Fails() { var row = MakeNotification(status: NotificationStatus.Delivered); _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new RetryNotificationRequest("corr-6", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); _repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any()); } [Fact] public void Retry_MissingNotification_Fails() { _repository.GetByIdAsync(Arg.Any(), Arg.Any()) .Returns((Notification?)null); var actor = CreateActor(); actor.Tell(new RetryNotificationRequest("corr-7", "missing-id"), TestActor); var response = ExpectMsg(); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); Assert.Contains("not found", response.ErrorMessage); } [Fact] public void Discard_ParkedNotification_MarksDiscarded_AndSucceeds() { var row = MakeNotification(status: NotificationStatus.Parked); _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new DiscardNotificationRequest("corr-8", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.Equal("corr-8", response.CorrelationId); Assert.True(response.Success); _repository.Received(1).UpdateAsync( Arg.Is(n => n.Status == NotificationStatus.Discarded), Arg.Any()); } [Fact] public void Discard_NonParkedNotification_Fails() { var row = MakeNotification(status: NotificationStatus.Pending); _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new DiscardNotificationRequest("corr-9", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); _repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any()); } [Fact] public void Discard_MissingNotification_Fails() { _repository.GetByIdAsync(Arg.Any(), Arg.Any()) .Returns((Notification?)null); var actor = CreateActor(); actor.Tell(new DiscardNotificationRequest("corr-10", "missing-id"), TestActor); var response = ExpectMsg(); Assert.False(response.Success); Assert.Contains("not found", response.ErrorMessage); } [Fact] public void DetailRequest_KnownId_ReturnsFullDetail_WithBodyAndResolvedTargets() { var row = MakeNotification( status: NotificationStatus.Delivered, retryCount: 2, lastError: "transient blip"); row.Body = "Tank-7 has exceeded its high-level setpoint."; row.ResolvedTargets = "[\"ops@example.com\",\"oncall@example.com\"]"; row.TypeData = "{\"priority\":\"high\"}"; row.SourceScript = "HighLevelAlarm.csx"; row.SiteEnqueuedAt = DateTimeOffset.UtcNow.AddMinutes(-5); row.DeliveredAt = DateTimeOffset.UtcNow; _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); var actor = CreateActor(); actor.Tell(new NotificationDetailRequest("corr-d1", row.NotificationId), TestActor); var response = ExpectMsg(); Assert.Equal("corr-d1", response.CorrelationId); Assert.True(response.Success); Assert.Null(response.ErrorMessage); Assert.NotNull(response.Detail); var detail = response.Detail!; Assert.Equal(row.NotificationId, detail.NotificationId); Assert.Equal("Email", detail.Type); Assert.Equal("Delivered", detail.Status); Assert.Equal("Tank-7 has exceeded its high-level setpoint.", detail.Body); Assert.Equal("[\"ops@example.com\",\"oncall@example.com\"]", detail.ResolvedTargets); Assert.Equal("{\"priority\":\"high\"}", detail.TypeData); Assert.Equal("HighLevelAlarm.csx", detail.SourceScript); Assert.Equal("instance-42", detail.SourceInstanceId); Assert.Equal(2, detail.RetryCount); Assert.Equal("transient blip", detail.LastError); } [Fact] public void DetailRequest_UnknownId_ReturnsNotFound() { _repository.GetByIdAsync(Arg.Any(), Arg.Any()) .Returns((Notification?)null); var actor = CreateActor(); actor.Tell(new NotificationDetailRequest("corr-d2", "missing-id"), TestActor); var response = ExpectMsg(); Assert.Equal("corr-d2", response.CorrelationId); Assert.False(response.Success); Assert.Null(response.Detail); Assert.NotNull(response.ErrorMessage); Assert.Contains("not found", response.ErrorMessage); } [Fact] public void KpiRequest_ComputesKpis_AndMapsSnapshot() { var snapshot = new NotificationKpiSnapshot( QueueDepth: 7, StuckCount: 2, ParkedCount: 3, DeliveredLastInterval: 12, OldestPendingAge: TimeSpan.FromMinutes(4)); _repository.ComputeKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()) .Returns(snapshot); var actor = CreateActor(); actor.Tell(new NotificationKpiRequest("corr-11"), TestActor); var response = ExpectMsg(); Assert.Equal("corr-11", response.CorrelationId); Assert.True(response.Success); Assert.Null(response.ErrorMessage); Assert.Equal(7, response.QueueDepth); Assert.Equal(2, response.StuckCount); Assert.Equal(3, response.ParkedCount); Assert.Equal(12, response.DeliveredLastInterval); Assert.Equal(TimeSpan.FromMinutes(4), response.OldestPendingAge); _repository.Received(1).ComputeKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()); } [Fact] public void KpiRequest_RepositoryThrows_RepliesFailureResponse() { _repository.ComputeKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()) .ThrowsAsync(new InvalidOperationException("kpi db down")); var actor = CreateActor(); actor.Tell(new NotificationKpiRequest("corr-12"), TestActor); // A repository fault yields a failure NotificationKpiResponse, not a Status.Failure. var response = ExpectMsg(); Assert.Equal("corr-12", response.CorrelationId); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); Assert.Contains("kpi db down", response.ErrorMessage); Assert.Equal(0, response.QueueDepth); Assert.Equal(0, response.StuckCount); Assert.Equal(0, response.ParkedCount); Assert.Equal(0, response.DeliveredLastInterval); Assert.Null(response.OldestPendingAge); } [Fact] public void PerSiteKpiRequest_RepliesWithPerSiteSnapshots() { _repository.ComputePerSiteKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()) .Returns(new List { new("plant-a", 4, 1, 0, 9, TimeSpan.FromMinutes(7)), }); var actor = CreateActor(); actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor); var response = ExpectMsg(); Assert.True(response.Success); Assert.Null(response.ErrorMessage); Assert.Equal("corr-ps", response.CorrelationId); Assert.Single(response.Sites); Assert.Equal("plant-a", response.Sites[0].SourceSiteId); _repository.Received(1).ComputePerSiteKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()); } [Fact] public void PerSiteKpiRequest_RepositoryFault_RepliesUnsuccessful() { _repository.ComputePerSiteKpisAsync( Arg.Any(), Arg.Any(), Arg.Any()) .ThrowsAsync(new InvalidOperationException("db down")); var actor = CreateActor(); actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor); var response = ExpectMsg(); Assert.False(response.Success); Assert.Equal("corr-ps", response.CorrelationId); Assert.NotNull(response.ErrorMessage); Assert.Contains("db down", response.ErrorMessage); Assert.Empty(response.Sites); } }