feat(notification-outbox): add daily terminal-row purge
This commit is contained in:
@@ -52,4 +52,30 @@ internal static class InternalMessages
|
|||||||
|
|
||||||
private DispatchComplete() { }
|
private DispatchComplete() { }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Periodic tick that triggers a purge sweep of terminal notification rows. Started as a
|
||||||
|
/// periodic timer in <c>PreStart</c> at the configured <c>PurgeInterval</c>. A singleton
|
||||||
|
/// instance is reused so the timer carries no per-tick state.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class PurgeTick
|
||||||
|
{
|
||||||
|
/// <summary>The shared singleton tick instance scheduled by the purge timer.</summary>
|
||||||
|
internal static readonly PurgeTick Instance = new();
|
||||||
|
|
||||||
|
private PurgeTick() { }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Completion signal for an asynchronous purge sweep, piped back to the actor so the
|
||||||
|
/// sweep's outcome (logged in the pipe projection) is observed on the actor thread.
|
||||||
|
/// Sent on both success and failure of the sweep.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class PurgeComplete
|
||||||
|
{
|
||||||
|
/// <summary>The shared singleton completion instance.</summary>
|
||||||
|
internal static readonly PurgeComplete Instance = new();
|
||||||
|
|
||||||
|
private PurgeComplete() { }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,11 +16,13 @@ namespace ScadaLink.NotificationOutbox;
|
|||||||
/// <see cref="NotificationSubmit"/> messages forwarded from sites and persists each as a
|
/// <see cref="NotificationSubmit"/> messages forwarded from sites and persists each as a
|
||||||
/// <see cref="Notification"/> row (the ingest path), and runs a periodic dispatch loop
|
/// <see cref="Notification"/> row (the ingest path), and runs a periodic dispatch loop
|
||||||
/// that claims due notifications, delivers them through the matching channel adapter, and
|
/// that claims due notifications, delivers them through the matching channel adapter, and
|
||||||
/// applies the resulting status transition. Query and purge are added by later tasks.
|
/// applies the resulting status transition. It also runs a periodic purge that bulk-deletes
|
||||||
|
/// terminal notification rows once they age past the configured retention window.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||||
{
|
{
|
||||||
private const string DispatchTimerKey = "dispatch";
|
private const string DispatchTimerKey = "dispatch";
|
||||||
|
private const string PurgeTimerKey = "purge";
|
||||||
|
|
||||||
/// <summary>Retry policy fallback used when no SMTP configuration row is present.</summary>
|
/// <summary>Retry policy fallback used when no SMTP configuration row is present.</summary>
|
||||||
private const int FallbackMaxRetries = 10;
|
private const int FallbackMaxRetries = 10;
|
||||||
@@ -56,6 +58,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
|
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
|
||||||
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
|
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
|
||||||
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
|
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
|
||||||
|
Receive<InternalMessages.PurgeTick>(_ => HandlePurgeTick());
|
||||||
|
Receive<InternalMessages.PurgeComplete>(_ => { });
|
||||||
Receive<NotificationOutboxQueryRequest>(HandleQuery);
|
Receive<NotificationOutboxQueryRequest>(HandleQuery);
|
||||||
Receive<NotificationStatusQuery>(HandleStatusQuery);
|
Receive<NotificationStatusQuery>(HandleStatusQuery);
|
||||||
Receive<RetryNotificationRequest>(HandleRetry);
|
Receive<RetryNotificationRequest>(HandleRetry);
|
||||||
@@ -64,14 +68,17 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Starts the periodic dispatch timer once the actor is running. The tick cadence is
|
/// Starts the periodic timers once the actor is running: the dispatch loop at
|
||||||
/// <see cref="NotificationOutboxOptions.DispatchInterval"/>.
|
/// <see cref="NotificationOutboxOptions.DispatchInterval"/> and the terminal-row purge
|
||||||
|
/// at <see cref="NotificationOutboxOptions.PurgeInterval"/>.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected override void PreStart()
|
protected override void PreStart()
|
||||||
{
|
{
|
||||||
base.PreStart();
|
base.PreStart();
|
||||||
Timers.StartPeriodicTimer(
|
Timers.StartPeriodicTimer(
|
||||||
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
|
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
|
||||||
|
Timers.StartPeriodicTimer(
|
||||||
|
PurgeTimerKey, InternalMessages.PurgeTick.Instance, _options.PurgeInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -287,6 +294,45 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
|||||||
await outboxRepository.UpdateAsync(notification);
|
await outboxRepository.UpdateAsync(notification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handles a purge tick by launching an asynchronous sweep that bulk-deletes terminal
|
||||||
|
/// notification rows older than <see cref="NotificationOutboxOptions.TerminalRetention"/>.
|
||||||
|
/// Purges are daily and idempotent, so no in-flight guard is needed; the outcome is piped
|
||||||
|
/// back to <see cref="Self"/> only so a faulted purge is logged on the actor thread and
|
||||||
|
/// never wedges anything.
|
||||||
|
/// </summary>
|
||||||
|
private void HandlePurgeTick()
|
||||||
|
{
|
||||||
|
var cutoff = DateTimeOffset.UtcNow - _options.TerminalRetention;
|
||||||
|
|
||||||
|
RunPurgePass(cutoff).PipeTo(
|
||||||
|
Self,
|
||||||
|
success: deleted =>
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Purge removed {DeletedCount} terminal notification(s) older than {Cutoff:o}.",
|
||||||
|
deleted, cutoff);
|
||||||
|
return InternalMessages.PurgeComplete.Instance;
|
||||||
|
},
|
||||||
|
failure: ex =>
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Purge sweep faulted unexpectedly.");
|
||||||
|
return InternalMessages.PurgeComplete.Instance;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Runs a single purge sweep: resolves a scoped <see cref="INotificationOutboxRepository"/>
|
||||||
|
/// and bulk-deletes terminal rows created before <paramref name="cutoff"/>, returning the
|
||||||
|
/// deleted count.
|
||||||
|
/// </summary>
|
||||||
|
private async Task<int> RunPurgePass(DateTimeOffset cutoff)
|
||||||
|
{
|
||||||
|
using var scope = _serviceProvider.CreateScope();
|
||||||
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||||
|
return await repository.DeleteTerminalOlderThanAsync(cutoff);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handles a paginated, filtered query over the outbox. Builds a
|
/// Handles a paginated, filtered query over the outbox. Builds a
|
||||||
/// <see cref="NotificationOutboxFilter"/> from the request (parsing the string status/type
|
/// <see cref="NotificationOutboxFilter"/> from the request (parsing the string status/type
|
||||||
|
|||||||
@@ -0,0 +1,107 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.Commons.Entities.Notifications;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.NotificationOutbox.Delivery;
|
||||||
|
using ScadaLink.NotificationOutbox.Messages;
|
||||||
|
|
||||||
|
namespace ScadaLink.NotificationOutbox.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Task 16: Tests for the <see cref="NotificationOutboxActor"/> daily purge job — the
|
||||||
|
/// periodic sweep that bulk-deletes terminal notification rows older than
|
||||||
|
/// <see cref="NotificationOutboxOptions.TerminalRetention"/> via
|
||||||
|
/// <see cref="INotificationOutboxRepository.DeleteTerminalOlderThanAsync"/>.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationOutboxActorPurgeTests : TestKit
|
||||||
|
{
|
||||||
|
private readonly INotificationOutboxRepository _outboxRepository =
|
||||||
|
Substitute.For<INotificationOutboxRepository>();
|
||||||
|
|
||||||
|
private readonly INotificationRepository _notificationRepository =
|
||||||
|
Substitute.For<INotificationRepository>();
|
||||||
|
|
||||||
|
private IServiceProvider BuildServiceProvider()
|
||||||
|
{
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddScoped(_ => _outboxRepository);
|
||||||
|
services.AddScoped(_ => _notificationRepository);
|
||||||
|
return services.BuildServiceProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates the actor with both the dispatch and purge timers set to a long interval so
|
||||||
|
/// neither periodic timer fires during a test — purge ticks are sent manually instead.
|
||||||
|
/// </summary>
|
||||||
|
private IActorRef CreateActor(NotificationOutboxOptions? options = null)
|
||||||
|
{
|
||||||
|
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
||||||
|
BuildServiceProvider(),
|
||||||
|
options ?? new NotificationOutboxOptions
|
||||||
|
{
|
||||||
|
DispatchInterval = TimeSpan.FromHours(1),
|
||||||
|
PurgeInterval = TimeSpan.FromHours(1),
|
||||||
|
},
|
||||||
|
NullLogger<NotificationOutboxActor>.Instance,
|
||||||
|
new Dictionary<NotificationType, INotificationDeliveryAdapter>())));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PurgeTick_DeletesTerminalRows_WithCutoffAtUtcNowMinusTerminalRetention()
|
||||||
|
{
|
||||||
|
var retention = TimeSpan.FromDays(30);
|
||||||
|
var options = new NotificationOutboxOptions
|
||||||
|
{
|
||||||
|
DispatchInterval = TimeSpan.FromHours(1),
|
||||||
|
PurgeInterval = TimeSpan.FromHours(1),
|
||||||
|
TerminalRetention = retention,
|
||||||
|
};
|
||||||
|
_outboxRepository
|
||||||
|
.DeleteTerminalOlderThanAsync(Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(3);
|
||||||
|
var actor = CreateActor(options);
|
||||||
|
|
||||||
|
var expectedCutoff = DateTimeOffset.UtcNow - retention;
|
||||||
|
actor.Tell(InternalMessages.PurgeTick.Instance);
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
_outboxRepository.Received(1).DeleteTerminalOlderThanAsync(
|
||||||
|
Arg.Is<DateTimeOffset>(cutoff =>
|
||||||
|
(cutoff - expectedCutoff).Duration() < TimeSpan.FromMinutes(1)),
|
||||||
|
Arg.Any<CancellationToken>()));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void FaultedPurge_DoesNotCrashActor_AndSubsequentPurgeStillRuns()
|
||||||
|
{
|
||||||
|
var calls = 0;
|
||||||
|
_outboxRepository
|
||||||
|
.DeleteTerminalOlderThanAsync(Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(_ =>
|
||||||
|
{
|
||||||
|
calls++;
|
||||||
|
// First purge faults; the second returns normally to prove the actor lives on.
|
||||||
|
if (calls == 1)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("db down");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(0);
|
||||||
|
});
|
||||||
|
var actor = CreateActor();
|
||||||
|
|
||||||
|
// First tick: the purge faults internally but must be handled and not kill the actor.
|
||||||
|
actor.Tell(InternalMessages.PurgeTick.Instance);
|
||||||
|
AwaitAssert(() => _outboxRepository.Received(1).DeleteTerminalOlderThanAsync(
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>()));
|
||||||
|
|
||||||
|
// Second tick: if the faulted purge had crashed the actor, this would never run.
|
||||||
|
actor.Tell(InternalMessages.PurgeTick.Instance);
|
||||||
|
AwaitAssert(() => _outboxRepository.Received(2).DeleteTerminalOlderThanAsync(
|
||||||
|
Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>()));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user