diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 5a2b969..f4abede 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -5,6 +5,7 @@ using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Notifications; using ScadaLink.NotificationOutbox.Delivery; using ScadaLink.NotificationOutbox.Messages; @@ -55,6 +56,11 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers Receive(HandleIngestPersisted); Receive(_ => HandleDispatchTick()); Receive(_ => _dispatching = false); + Receive(HandleQuery); + Receive(HandleStatusQuery); + Receive(HandleRetry); + Receive(HandleDiscard); + Receive(HandleKpiRequest); } /// @@ -281,6 +287,242 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers await outboxRepository.UpdateAsync(notification); } + /// + /// Handles a paginated, filtered query over the outbox. Builds a + /// from the request (parsing the string status/type + /// filters to their enums and deriving the stuck cutoff when StuckOnly is set), + /// runs the query on a scoped repository, and pipes the mapped response back to the + /// captured sender. A repository fault yields a failure response with an empty list. + /// + private void HandleQuery(NotificationOutboxQueryRequest request) + { + var sender = Sender; + var now = DateTimeOffset.UtcNow; + var stuckThreshold = _options.StuckAgeThreshold; + + QueryOutboxAsync(request, now, stuckThreshold).PipeTo( + sender, + success: response => response, + failure: ex => new NotificationOutboxQueryResponse( + request.CorrelationId, + Success: false, + ErrorMessage: ex.GetBaseException().Message, + Notifications: Array.Empty(), + TotalCount: 0)); + } + + private async Task QueryOutboxAsync( + NotificationOutboxQueryRequest request, DateTimeOffset now, TimeSpan stuckThreshold) + { + var filter = new NotificationOutboxFilter( + Status: ParseEnum(request.StatusFilter), + Type: ParseEnum(request.TypeFilter), + SourceSiteId: request.SourceSiteFilter, + ListName: request.ListNameFilter, + SubjectKeyword: request.SubjectKeyword, + StuckOnly: request.StuckOnly, + StuckCutoff: request.StuckOnly ? now - stuckThreshold : null, + From: request.From, + To: request.To); + + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + var (rows, totalCount) = await repository.QueryAsync(filter, request.PageNumber, request.PageSize); + + var stuckCutoff = now - stuckThreshold; + var summaries = rows + .Select(row => new NotificationSummary( + row.NotificationId, + row.Type.ToString(), + row.ListName, + row.Subject, + row.Status.ToString(), + row.RetryCount, + row.LastError, + row.SourceSiteId, + row.SourceInstanceId, + row.CreatedAt, + row.DeliveredAt, + IsStuck: IsStuck(row, stuckCutoff))) + .ToList(); + + return new NotificationOutboxQueryResponse( + request.CorrelationId, Success: true, ErrorMessage: null, summaries, totalCount); + } + + /// + /// Handles a single-notification status query. Replies Found: false with empty + /// detail when no row matches, otherwise the row's current status, retry count, last + /// error, and delivery time. + /// + private void HandleStatusQuery(NotificationStatusQuery query) + { + var sender = Sender; + + StatusQueryAsync(query).PipeTo( + sender, + success: response => response, + failure: _ => new NotificationStatusResponse( + query.CorrelationId, Found: false, Status: string.Empty, + RetryCount: 0, LastError: null, DeliveredAt: null)); + } + + private async Task StatusQueryAsync(NotificationStatusQuery query) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + var notification = await repository.GetByIdAsync(query.NotificationId); + + if (notification is null) + { + return new NotificationStatusResponse( + query.CorrelationId, Found: false, Status: string.Empty, + RetryCount: 0, LastError: null, DeliveredAt: null); + } + + return new NotificationStatusResponse( + query.CorrelationId, + Found: true, + Status: notification.Status.ToString(), + RetryCount: notification.RetryCount, + LastError: notification.LastError, + DeliveredAt: notification.DeliveredAt); + } + + /// + /// Handles a manual retry request. Only a Parked notification can be retried; + /// it is reset to Pending with a cleared retry count, next-attempt time, and + /// last error so the dispatch loop re-claims it on the next sweep. + /// + private void HandleRetry(RetryNotificationRequest request) + { + var sender = Sender; + + RetryAsync(request).PipeTo( + sender, + success: response => response, + failure: ex => new RetryNotificationResponse( + request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message)); + } + + private async Task RetryAsync(RetryNotificationRequest request) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + var notification = await repository.GetByIdAsync(request.NotificationId); + + if (notification is null) + { + return new RetryNotificationResponse( + request.CorrelationId, Success: false, ErrorMessage: "notification not found"); + } + + if (notification.Status != NotificationStatus.Parked) + { + return new RetryNotificationResponse( + request.CorrelationId, Success: false, + ErrorMessage: "only parked notifications can be retried"); + } + + notification.Status = NotificationStatus.Pending; + notification.RetryCount = 0; + notification.NextAttemptAt = null; + notification.LastError = null; + await repository.UpdateAsync(notification); + + return new RetryNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null); + } + + /// + /// Handles a manual discard request. Only a Parked notification can be discarded; + /// it is moved to the terminal Discarded status. + /// + private void HandleDiscard(DiscardNotificationRequest request) + { + var sender = Sender; + + DiscardAsync(request).PipeTo( + sender, + success: response => response, + failure: ex => new DiscardNotificationResponse( + request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message)); + } + + private async Task DiscardAsync(DiscardNotificationRequest request) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + var notification = await repository.GetByIdAsync(request.NotificationId); + + if (notification is null) + { + return new DiscardNotificationResponse( + request.CorrelationId, Success: false, ErrorMessage: "notification not found"); + } + + if (notification.Status != NotificationStatus.Parked) + { + return new DiscardNotificationResponse( + request.CorrelationId, Success: false, + ErrorMessage: "only parked notifications can be discarded"); + } + + notification.Status = NotificationStatus.Discarded; + await repository.UpdateAsync(notification); + + return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null); + } + + /// + /// Handles a KPI snapshot request, computing the outbox metrics with the stuck cutoff + /// derived from and the + /// delivered window from . + /// + private void HandleKpiRequest(NotificationKpiRequest request) + { + var sender = Sender; + var now = DateTimeOffset.UtcNow; + var stuckCutoff = now - _options.StuckAgeThreshold; + var deliveredSince = now - _options.DeliveredKpiWindow; + + ComputeKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(sender); + } + + private async Task ComputeKpisAsync( + string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + var snapshot = await repository.ComputeKpisAsync(stuckCutoff, deliveredSince); + + return new NotificationKpiResponse( + correlationId, + snapshot.QueueDepth, + snapshot.StuckCount, + snapshot.ParkedCount, + snapshot.DeliveredLastInterval, + snapshot.OldestPendingAge); + } + + /// + /// A notification counts as stuck when it is still in a non-terminal status + /// (Pending or Retrying) and was created before the supplied cutoff. + /// + private static bool IsStuck(Notification notification, DateTimeOffset stuckCutoff) + { + return notification.Status is NotificationStatus.Pending or NotificationStatus.Retrying + && notification.CreatedAt < stuckCutoff; + } + + /// + /// Parses a string filter value to a nullable enum, ignoring case. An empty, whitespace, + /// or unrecognised value yields null — meaning "no constraint on that dimension". + /// + private static TEnum? ParseEnum(string? value) where TEnum : struct, Enum + { + return Enum.TryParse(value, ignoreCase: true, out var parsed) ? parsed : null; + } + private static Notification BuildNotification(NotificationSubmit msg) { // All current notifications are email; NotificationType has only the Email member. diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorQueryTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorQueryTests.cs new file mode 100644 index 0000000..77616fe --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorQueryTests.cs @@ -0,0 +1,335 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Notifications; +using ScadaLink.NotificationOutbox.Delivery; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// Task 15: Tests for the query surface — the +/// paginated outbox query, single-notification status query, manual retry and discard +/// of parked notifications, and the KPI snapshot. +/// +public class NotificationOutboxActorQueryTests : TestKit +{ + private readonly INotificationOutboxRepository _repository = + Substitute.For(); + + private IServiceProvider BuildServiceProvider() + { + var services = new ServiceCollection(); + services.AddScoped(_ => _repository); + return services.BuildServiceProvider(); + } + + private IActorRef CreateActor(NotificationOutboxOptions? options = null) + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(), + // A long dispatch interval keeps the dispatch loop from interfering with these tests. + options ?? new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) }, + NullLogger.Instance, + new Dictionary()))); + } + + private static Notification MakeNotification( + NotificationStatus status = NotificationStatus.Pending, + DateTimeOffset? createdAt = null, + int retryCount = 0, + string? lastError = null, + DateTimeOffset? deliveredAt = null) + { + return new Notification( + Guid.NewGuid().ToString(), NotificationType.Email, "ops-team", "Subject", "Body", "site-1") + { + Status = status, + CreatedAt = createdAt ?? DateTimeOffset.UtcNow, + RetryCount = retryCount, + LastError = lastError, + DeliveredAt = deliveredAt, + SourceInstanceId = "instance-42", + }; + } + + [Fact] + public void Query_PassesFilterFromRequest_AndMapsRowsToSummaries() + { + var now = DateTimeOffset.UtcNow; + var staleRow = MakeNotification( + status: NotificationStatus.Pending, createdAt: now - TimeSpan.FromHours(1)); + var freshRow = MakeNotification( + status: NotificationStatus.Pending, createdAt: now); + _repository.QueryAsync( + Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(((IReadOnlyList)new[] { staleRow, freshRow }, 2)); + var actor = CreateActor(); + + actor.Tell( + new NotificationOutboxQueryRequest( + CorrelationId: "corr-1", + StatusFilter: null, + TypeFilter: null, + SourceSiteFilter: "site-1", + ListNameFilter: "ops-team", + StuckOnly: false, + SubjectKeyword: "tank", + From: null, + To: null, + PageNumber: 2, + PageSize: 25), + TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-1", response.CorrelationId); + Assert.True(response.Success); + Assert.Null(response.ErrorMessage); + Assert.Equal(2, response.TotalCount); + Assert.Equal(2, response.Notifications.Count); + + _repository.Received(1).QueryAsync( + Arg.Is(f => + f.SourceSiteId == "site-1" && + f.ListName == "ops-team" && + f.SubjectKeyword == "tank" && + f.Status == null && + f.Type == null && + f.StuckOnly == false), + 2, 25, Arg.Any()); + + // IsStuck: the hour-old Pending row is stuck, the just-created one is not. + var staleSummary = response.Notifications.Single(s => s.NotificationId == staleRow.NotificationId); + var freshSummary = response.Notifications.Single(s => s.NotificationId == freshRow.NotificationId); + Assert.True(staleSummary.IsStuck); + Assert.False(freshSummary.IsStuck); + Assert.Equal("Pending", staleSummary.Status); + Assert.Equal("Email", staleSummary.Type); + Assert.Equal("site-1", staleSummary.SourceSiteId); + } + + [Fact] + public void Query_WithStatusFilterString_ParsesToEnumOnFilter() + { + _repository.QueryAsync( + Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(((IReadOnlyList)Array.Empty(), 0)); + var actor = CreateActor(); + + actor.Tell( + new NotificationOutboxQueryRequest( + CorrelationId: "corr-2", + StatusFilter: "Parked", + TypeFilter: "Email", + SourceSiteFilter: null, + ListNameFilter: null, + StuckOnly: true, + SubjectKeyword: null, + From: null, + To: null, + PageNumber: 1, + PageSize: 50), + TestActor); + + ExpectMsg(); + + _repository.Received(1).QueryAsync( + Arg.Is(f => + f.Status == NotificationStatus.Parked && + f.Type == NotificationType.Email && + f.StuckOnly == true && + f.StuckCutoff != null), + 1, 50, Arg.Any()); + } + + [Fact] + public void Query_RepositoryThrows_RepliesFailureWithEmptyList() + { + _repository.QueryAsync( + Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("db down")); + var actor = CreateActor(); + + actor.Tell( + new NotificationOutboxQueryRequest( + "corr-err", null, null, null, null, false, null, null, null, 1, 50), + TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-err", response.CorrelationId); + Assert.False(response.Success); + Assert.NotNull(response.ErrorMessage); + Assert.Contains("db down", response.ErrorMessage); + Assert.Empty(response.Notifications); + } + + [Fact] + public void StatusQuery_Found_RepliesWithRowDetail() + { + var row = MakeNotification( + status: NotificationStatus.Retrying, retryCount: 3, lastError: "smtp timeout"); + _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); + var actor = CreateActor(); + + actor.Tell(new NotificationStatusQuery("corr-3", row.NotificationId), TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-3", response.CorrelationId); + Assert.True(response.Found); + Assert.Equal("Retrying", response.Status); + Assert.Equal(3, response.RetryCount); + Assert.Equal("smtp timeout", response.LastError); + } + + [Fact] + public void StatusQuery_NotFound_RepliesFoundFalse() + { + _repository.GetByIdAsync(Arg.Any(), Arg.Any()) + .Returns((Notification?)null); + var actor = CreateActor(); + + actor.Tell(new NotificationStatusQuery("corr-4", "missing-id"), TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-4", response.CorrelationId); + Assert.False(response.Found); + Assert.Equal(string.Empty, response.Status); + Assert.Equal(0, response.RetryCount); + } + + [Fact] + public void Retry_ParkedNotification_ResetsToPending_AndSucceeds() + { + var row = MakeNotification(status: NotificationStatus.Parked, retryCount: 10, lastError: "gave up"); + row.NextAttemptAt = DateTimeOffset.UtcNow; + _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); + var actor = CreateActor(); + + actor.Tell(new RetryNotificationRequest("corr-5", row.NotificationId), TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-5", response.CorrelationId); + Assert.True(response.Success); + Assert.Null(response.ErrorMessage); + + _repository.Received(1).UpdateAsync( + Arg.Is(n => + n.NotificationId == row.NotificationId && + n.Status == NotificationStatus.Pending && + n.RetryCount == 0 && + n.NextAttemptAt == null && + n.LastError == null), + Arg.Any()); + } + + [Fact] + public void Retry_NonParkedNotification_Fails() + { + var row = MakeNotification(status: NotificationStatus.Delivered); + _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); + var actor = CreateActor(); + + actor.Tell(new RetryNotificationRequest("corr-6", row.NotificationId), TestActor); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.NotNull(response.ErrorMessage); + _repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any()); + } + + [Fact] + public void Retry_MissingNotification_Fails() + { + _repository.GetByIdAsync(Arg.Any(), Arg.Any()) + .Returns((Notification?)null); + var actor = CreateActor(); + + actor.Tell(new RetryNotificationRequest("corr-7", "missing-id"), TestActor); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.NotNull(response.ErrorMessage); + Assert.Contains("not found", response.ErrorMessage); + } + + [Fact] + public void Discard_ParkedNotification_MarksDiscarded_AndSucceeds() + { + var row = MakeNotification(status: NotificationStatus.Parked); + _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); + var actor = CreateActor(); + + actor.Tell(new DiscardNotificationRequest("corr-8", row.NotificationId), TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-8", response.CorrelationId); + Assert.True(response.Success); + + _repository.Received(1).UpdateAsync( + Arg.Is(n => n.Status == NotificationStatus.Discarded), + Arg.Any()); + } + + [Fact] + public void Discard_NonParkedNotification_Fails() + { + var row = MakeNotification(status: NotificationStatus.Pending); + _repository.GetByIdAsync(row.NotificationId, Arg.Any()).Returns(row); + var actor = CreateActor(); + + actor.Tell(new DiscardNotificationRequest("corr-9", row.NotificationId), TestActor); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.NotNull(response.ErrorMessage); + _repository.DidNotReceive().UpdateAsync(Arg.Any(), Arg.Any()); + } + + [Fact] + public void Discard_MissingNotification_Fails() + { + _repository.GetByIdAsync(Arg.Any(), Arg.Any()) + .Returns((Notification?)null); + var actor = CreateActor(); + + actor.Tell(new DiscardNotificationRequest("corr-10", "missing-id"), TestActor); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.Contains("not found", response.ErrorMessage); + } + + [Fact] + public void KpiRequest_ComputesKpis_AndMapsSnapshot() + { + var snapshot = new NotificationKpiSnapshot( + QueueDepth: 7, + StuckCount: 2, + ParkedCount: 3, + DeliveredLastInterval: 12, + OldestPendingAge: TimeSpan.FromMinutes(4)); + _repository.ComputeKpisAsync( + Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(snapshot); + var actor = CreateActor(); + + actor.Tell(new NotificationKpiRequest("corr-11"), TestActor); + + var response = ExpectMsg(); + Assert.Equal("corr-11", response.CorrelationId); + Assert.Equal(7, response.QueueDepth); + Assert.Equal(2, response.StuckCount); + Assert.Equal(3, response.ParkedCount); + Assert.Equal(12, response.DeliveredLastInterval); + Assert.Equal(TimeSpan.FromMinutes(4), response.OldestPendingAge); + + _repository.Received(1).ComputeKpisAsync( + Arg.Any(), Arg.Any(), Arg.Any()); + } +}