feat(notification-outbox): add NotificationOutboxActor ingest

This commit is contained in:
Joseph Doherty
2026-05-19 01:36:13 -04:00
parent 435c853dce
commit 4dc9f9e159
3 changed files with 254 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
using Akka.Actor;
namespace ScadaLink.NotificationOutbox.Messages;
/// <summary>
/// Actor-internal message types for the <see cref="NotificationOutboxActor"/>. These are
/// never sent across the network — they bridge the actor's async repository/delivery work
/// back onto the actor's own mailbox so handlers run single-threaded on the actor.
/// </summary>
internal static class InternalMessages
{
/// <summary>
/// Result of an asynchronous ingest persistence attempt, piped back to the actor.
/// Carries the original <paramref name="Sender"/> so the actor can ack the site that
/// submitted the notification once the insert completes.
/// </summary>
/// <param name="NotificationId">Id of the notification that was submitted.</param>
/// <param name="Sender">Original submitter to receive the ack.</param>
/// <param name="Succeeded">
/// True if persistence completed without error — covers both a fresh insert and an
/// already-existing row (idempotent re-submission). False only when the repository threw.
/// </param>
/// <param name="Error">Failure detail when <paramref name="Succeeded"/> is false; otherwise null.</param>
internal sealed record IngestPersisted(
string NotificationId,
IActorRef Sender,
bool Succeeded,
string? Error);
}

View File

@@ -0,0 +1,110 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.NotificationOutbox;
/// <summary>
/// Central-side actor that owns the notification outbox. This task implements the ingest
/// path only: it accepts <see cref="NotificationSubmit"/> messages forwarded from sites,
/// persists each as a <see cref="Notification"/> row, and acks the submitting site.
/// Dispatch, query, and purge are added by later tasks.
/// </summary>
public class NotificationOutboxActor : ReceiveActor
{
private readonly IServiceProvider _serviceProvider;
private readonly NotificationOutboxOptions _options;
private readonly ILogger<NotificationOutboxActor> _logger;
public NotificationOutboxActor(
IServiceProvider serviceProvider,
NotificationOutboxOptions options,
ILogger<NotificationOutboxActor> logger)
{
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
Receive<NotificationSubmit>(HandleSubmit);
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
}
/// <summary>
/// Maps an inbound <see cref="NotificationSubmit"/> onto a <see cref="Notification"/>,
/// persists it idempotently, and pipes the outcome back to <see cref="Self"/> so the
/// ack is sent from the actor thread with the original sender preserved.
/// </summary>
private void HandleSubmit(NotificationSubmit msg)
{
var sender = Sender;
var notification = BuildNotification(msg);
// The success projection fires for both a fresh insert and an existing row;
// only a thrown repository error reaches the failure projection.
PersistAsync(notification).PipeTo(
Self,
success: () => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: true, Error: null),
failure: ex => new InternalMessages.IngestPersisted(
msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message));
}
/// <summary>
/// Resolves a scoped <see cref="INotificationOutboxRepository"/> and inserts the
/// notification if a row with the same id does not already exist. The boolean result
/// of <c>InsertIfNotExistsAsync</c> is intentionally ignored: an existing row is an
/// idempotent re-submission and is acked just like a fresh insert so the site can
/// clear its forward buffer. Only a thrown error must surface to the caller.
/// </summary>
private async Task PersistAsync(Notification notification)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
await repository.InsertIfNotExistsAsync(notification);
}
/// <summary>
/// Acks the original submitter once persistence completes. <see cref="NotificationSubmitAck"/>
/// is <c>Accepted</c> for both a fresh insert and an existing row; only a thrown
/// repository error produces <c>Accepted: false</c> so the site retries the forward.
/// </summary>
private void HandleIngestPersisted(InternalMessages.IngestPersisted msg)
{
if (msg.Succeeded)
{
_logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null));
}
else
{
_logger.LogWarning(
"Failed to ingest notification {NotificationId}: {Error}",
msg.NotificationId, msg.Error);
msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error));
}
}
private static Notification BuildNotification(NotificationSubmit msg)
{
// All current notifications are email; NotificationType has only the Email member.
return new Notification(
msg.NotificationId,
NotificationType.Email,
msg.ListName,
msg.Subject,
msg.Body,
msg.SourceSiteId)
{
SourceInstanceId = msg.SourceInstanceId,
SourceScript = msg.SourceScript,
SiteEnqueuedAt = msg.SiteEnqueuedAt,
CreatedAt = DateTimeOffset.UtcNow,
// Status stays at its Pending default for the dispatch sweep to claim.
};
}
}

View File

@@ -0,0 +1,115 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.NotificationOutbox.Tests;
/// <summary>
/// Task 13: Tests for the <see cref="NotificationOutboxActor"/> ingest path — building a
/// <see cref="Notification"/> from a <see cref="NotificationSubmit"/>, persisting it via
/// <see cref="INotificationOutboxRepository.InsertIfNotExistsAsync"/>, and acking the sender.
/// </summary>
public class NotificationOutboxActorIngestTests : TestKit
{
private readonly INotificationOutboxRepository _repository =
Substitute.For<INotificationOutboxRepository>();
private IServiceProvider BuildServiceProvider()
{
var services = new ServiceCollection();
services.AddScoped(_ => _repository);
return services.BuildServiceProvider();
}
private IActorRef CreateActor()
{
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
BuildServiceProvider(),
new NotificationOutboxOptions(),
NullLogger<NotificationOutboxActor>.Instance)));
}
private static NotificationSubmit MakeSubmit(string? notificationId = null)
{
return new NotificationSubmit(
NotificationId: notificationId ?? Guid.NewGuid().ToString(),
ListName: "ops-team",
Subject: "Tank overflow",
Body: "Tank 3 level critical",
SourceSiteId: "site-1",
SourceInstanceId: "instance-42",
SourceScript: "AlarmScript",
SiteEnqueuedAt: new DateTimeOffset(2026, 5, 19, 8, 30, 0, TimeSpan.Zero));
}
[Fact]
public void NotificationSubmit_PersistsMappedNotification_AndAcksAccepted()
{
_repository.InsertIfNotExistsAsync(Arg.Any<Notification>(), Arg.Any<CancellationToken>())
.Returns(true);
var submit = MakeSubmit();
var actor = CreateActor();
actor.Tell(submit, TestActor);
var ack = ExpectMsg<NotificationSubmitAck>();
Assert.Equal(submit.NotificationId, ack.NotificationId);
Assert.True(ack.Accepted);
Assert.Null(ack.Error);
_repository.Received(1).InsertIfNotExistsAsync(
Arg.Is<Notification>(n =>
n.NotificationId == submit.NotificationId &&
n.Type == NotificationType.Email &&
n.ListName == submit.ListName &&
n.Subject == submit.Subject &&
n.Body == submit.Body &&
n.SourceSiteId == submit.SourceSiteId &&
n.SourceInstanceId == submit.SourceInstanceId &&
n.SourceScript == submit.SourceScript &&
n.SiteEnqueuedAt == submit.SiteEnqueuedAt &&
n.Status == NotificationStatus.Pending &&
n.CreatedAt != default),
Arg.Any<CancellationToken>());
}
[Fact]
public void DuplicateSubmit_RepositoryReturnsFalse_StillAcksAccepted()
{
_repository.InsertIfNotExistsAsync(Arg.Any<Notification>(), Arg.Any<CancellationToken>())
.Returns(false);
var submit = MakeSubmit();
var actor = CreateActor();
actor.Tell(submit, TestActor);
var ack = ExpectMsg<NotificationSubmitAck>();
Assert.Equal(submit.NotificationId, ack.NotificationId);
Assert.True(ack.Accepted);
Assert.Null(ack.Error);
}
[Fact]
public void RepositoryThrows_AcksNotAcceptedWithError()
{
_repository.InsertIfNotExistsAsync(Arg.Any<Notification>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new InvalidOperationException("database unavailable"));
var submit = MakeSubmit();
var actor = CreateActor();
actor.Tell(submit, TestActor);
var ack = ExpectMsg<NotificationSubmitAck>();
Assert.Equal(submit.NotificationId, ack.NotificationId);
Assert.False(ack.Accepted);
Assert.NotNull(ack.Error);
Assert.Contains("database unavailable", ack.Error);
}
}