diff --git a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs index 95326da..69f57df 100644 --- a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs +++ b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs @@ -52,4 +52,30 @@ internal static class InternalMessages private DispatchComplete() { } } + + /// + /// Periodic tick that triggers a purge sweep of terminal notification rows. Started as a + /// periodic timer in PreStart at the configured PurgeInterval. A singleton + /// instance is reused so the timer carries no per-tick state. + /// + internal sealed class PurgeTick + { + /// The shared singleton tick instance scheduled by the purge timer. + internal static readonly PurgeTick Instance = new(); + + private PurgeTick() { } + } + + /// + /// Completion signal for an asynchronous purge sweep, piped back to the actor so the + /// sweep's outcome (logged in the pipe projection) is observed on the actor thread. + /// Sent on both success and failure of the sweep. + /// + internal sealed class PurgeComplete + { + /// The shared singleton completion instance. + internal static readonly PurgeComplete Instance = new(); + + private PurgeComplete() { } + } } diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 98cb9f1..430ed37 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -16,11 +16,13 @@ namespace ScadaLink.NotificationOutbox; /// messages forwarded from sites and persists each as a /// row (the ingest path), and runs a periodic dispatch loop /// that claims due notifications, delivers them through the matching channel adapter, and -/// applies the resulting status transition. Query and purge are added by later tasks. +/// applies the resulting status transition. It also runs a periodic purge that bulk-deletes +/// terminal notification rows once they age past the configured retention window. /// public class NotificationOutboxActor : ReceiveActor, IWithTimers { private const string DispatchTimerKey = "dispatch"; + private const string PurgeTimerKey = "purge"; /// Retry policy fallback used when no SMTP configuration row is present. private const int FallbackMaxRetries = 10; @@ -56,6 +58,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers Receive(HandleIngestPersisted); Receive(_ => HandleDispatchTick()); Receive(_ => _dispatching = false); + Receive(_ => HandlePurgeTick()); + Receive(_ => { }); Receive(HandleQuery); Receive(HandleStatusQuery); Receive(HandleRetry); @@ -64,14 +68,17 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers } /// - /// Starts the periodic dispatch timer once the actor is running. The tick cadence is - /// . + /// Starts the periodic timers once the actor is running: the dispatch loop at + /// and the terminal-row purge + /// at . /// protected override void PreStart() { base.PreStart(); Timers.StartPeriodicTimer( DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval); + Timers.StartPeriodicTimer( + PurgeTimerKey, InternalMessages.PurgeTick.Instance, _options.PurgeInterval); } /// @@ -287,6 +294,45 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers await outboxRepository.UpdateAsync(notification); } + /// + /// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal + /// notification rows older than . + /// Purges are daily and idempotent, so no in-flight guard is needed; the outcome is piped + /// back to only so a faulted purge is logged on the actor thread and + /// never wedges anything. + /// + private void HandlePurgeTick() + { + var cutoff = DateTimeOffset.UtcNow - _options.TerminalRetention; + + RunPurgePass(cutoff).PipeTo( + Self, + success: deleted => + { + _logger.LogInformation( + "Purge removed {DeletedCount} terminal notification(s) older than {Cutoff:o}.", + deleted, cutoff); + return InternalMessages.PurgeComplete.Instance; + }, + failure: ex => + { + _logger.LogError(ex, "Purge sweep faulted unexpectedly."); + return InternalMessages.PurgeComplete.Instance; + }); + } + + /// + /// Runs a single purge sweep: resolves a scoped + /// and bulk-deletes terminal rows created before , returning the + /// deleted count. + /// + private async Task RunPurgePass(DateTimeOffset cutoff) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + return await repository.DeleteTerminalOlderThanAsync(cutoff); + } + /// /// Handles a paginated, filtered query over the outbox. Builds a /// from the request (parsing the string status/type diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorPurgeTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorPurgeTests.cs new file mode 100644 index 0000000..ecc1159 --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorPurgeTests.cs @@ -0,0 +1,107 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// Task 16: Tests for the daily purge job — the +/// periodic sweep that bulk-deletes terminal notification rows older than +/// via +/// . +/// +public class NotificationOutboxActorPurgeTests : TestKit +{ + private readonly INotificationOutboxRepository _outboxRepository = + Substitute.For(); + + private readonly INotificationRepository _notificationRepository = + Substitute.For(); + + private IServiceProvider BuildServiceProvider() + { + var services = new ServiceCollection(); + services.AddScoped(_ => _outboxRepository); + services.AddScoped(_ => _notificationRepository); + return services.BuildServiceProvider(); + } + + /// + /// Creates the actor with both the dispatch and purge timers set to a long interval so + /// neither periodic timer fires during a test — purge ticks are sent manually instead. + /// + private IActorRef CreateActor(NotificationOutboxOptions? options = null) + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(), + options ?? new NotificationOutboxOptions + { + DispatchInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromHours(1), + }, + NullLogger.Instance, + new Dictionary()))); + } + + [Fact] + public void PurgeTick_DeletesTerminalRows_WithCutoffAtUtcNowMinusTerminalRetention() + { + var retention = TimeSpan.FromDays(30); + var options = new NotificationOutboxOptions + { + DispatchInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromHours(1), + TerminalRetention = retention, + }; + _outboxRepository + .DeleteTerminalOlderThanAsync(Arg.Any(), Arg.Any()) + .Returns(3); + var actor = CreateActor(options); + + var expectedCutoff = DateTimeOffset.UtcNow - retention; + actor.Tell(InternalMessages.PurgeTick.Instance); + + AwaitAssert(() => + _outboxRepository.Received(1).DeleteTerminalOlderThanAsync( + Arg.Is(cutoff => + (cutoff - expectedCutoff).Duration() < TimeSpan.FromMinutes(1)), + Arg.Any())); + } + + [Fact] + public void FaultedPurge_DoesNotCrashActor_AndSubsequentPurgeStillRuns() + { + var calls = 0; + _outboxRepository + .DeleteTerminalOlderThanAsync(Arg.Any(), Arg.Any()) + .Returns(_ => + { + calls++; + // First purge faults; the second returns normally to prove the actor lives on. + if (calls == 1) + { + throw new InvalidOperationException("db down"); + } + + return Task.FromResult(0); + }); + var actor = CreateActor(); + + // First tick: the purge faults internally but must be handled and not kill the actor. + actor.Tell(InternalMessages.PurgeTick.Instance); + AwaitAssert(() => _outboxRepository.Received(1).DeleteTerminalOlderThanAsync( + Arg.Any(), Arg.Any())); + + // Second tick: if the faulted purge had crashed the actor, this would never run. + actor.Tell(InternalMessages.PurgeTick.Instance); + AwaitAssert(() => _outboxRepository.Received(2).DeleteTerminalOlderThanAsync( + Arg.Any(), Arg.Any())); + } +}