From 4dc9f9e1596028f37f75e8be83b6f0493876c575 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 01:36:13 -0400 Subject: [PATCH] feat(notification-outbox): add NotificationOutboxActor ingest --- .../Messages/InternalMessages.cs | 29 +++++ .../NotificationOutboxActor.cs | 110 +++++++++++++++++ .../NotificationOutboxActorIngestTests.cs | 115 ++++++++++++++++++ 3 files changed, 254 insertions(+) create mode 100644 src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs create mode 100644 src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs create mode 100644 tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs diff --git a/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs new file mode 100644 index 0000000..0096f34 --- /dev/null +++ b/src/ScadaLink.NotificationOutbox/Messages/InternalMessages.cs @@ -0,0 +1,29 @@ +using Akka.Actor; + +namespace ScadaLink.NotificationOutbox.Messages; + +/// +/// Actor-internal message types for the . These are +/// never sent across the network — they bridge the actor's async repository/delivery work +/// back onto the actor's own mailbox so handlers run single-threaded on the actor. +/// +internal static class InternalMessages +{ + /// + /// Result of an asynchronous ingest persistence attempt, piped back to the actor. + /// Carries the original so the actor can ack the site that + /// submitted the notification once the insert completes. + /// + /// Id of the notification that was submitted. + /// Original submitter to receive the ack. + /// + /// True if persistence completed without error — covers both a fresh insert and an + /// already-existing row (idempotent re-submission). False only when the repository threw. + /// + /// Failure detail when is false; otherwise null. + internal sealed record IngestPersisted( + string NotificationId, + IActorRef Sender, + bool Succeeded, + string? Error); +} diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs new file mode 100644 index 0000000..855a357 --- /dev/null +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -0,0 +1,110 @@ +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.NotificationOutbox; + +/// +/// Central-side actor that owns the notification outbox. This task implements the ingest +/// path only: it accepts messages forwarded from sites, +/// persists each as a row, and acks the submitting site. +/// Dispatch, query, and purge are added by later tasks. +/// +public class NotificationOutboxActor : ReceiveActor +{ + private readonly IServiceProvider _serviceProvider; + private readonly NotificationOutboxOptions _options; + private readonly ILogger _logger; + + public NotificationOutboxActor( + IServiceProvider serviceProvider, + NotificationOutboxOptions options, + ILogger logger) + { + _serviceProvider = serviceProvider; + _options = options; + _logger = logger; + + Receive(HandleSubmit); + Receive(HandleIngestPersisted); + } + + /// + /// Maps an inbound onto a , + /// persists it idempotently, and pipes the outcome back to so the + /// ack is sent from the actor thread with the original sender preserved. + /// + private void HandleSubmit(NotificationSubmit msg) + { + var sender = Sender; + var notification = BuildNotification(msg); + + // The success projection fires for both a fresh insert and an existing row; + // only a thrown repository error reaches the failure projection. + PersistAsync(notification).PipeTo( + Self, + success: () => new InternalMessages.IngestPersisted( + msg.NotificationId, sender, Succeeded: true, Error: null), + failure: ex => new InternalMessages.IngestPersisted( + msg.NotificationId, sender, Succeeded: false, Error: ex.GetBaseException().Message)); + } + + /// + /// Resolves a scoped and inserts the + /// notification if a row with the same id does not already exist. The boolean result + /// of InsertIfNotExistsAsync is intentionally ignored: an existing row is an + /// idempotent re-submission and is acked just like a fresh insert so the site can + /// clear its forward buffer. Only a thrown error must surface to the caller. + /// + private async Task PersistAsync(Notification notification) + { + using var scope = _serviceProvider.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + await repository.InsertIfNotExistsAsync(notification); + } + + /// + /// Acks the original submitter once persistence completes. + /// is Accepted for both a fresh insert and an existing row; only a thrown + /// repository error produces Accepted: false so the site retries the forward. + /// + private void HandleIngestPersisted(InternalMessages.IngestPersisted msg) + { + if (msg.Succeeded) + { + _logger.LogDebug("Notification {NotificationId} ingested into outbox.", msg.NotificationId); + msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: true, Error: null)); + } + else + { + _logger.LogWarning( + "Failed to ingest notification {NotificationId}: {Error}", + msg.NotificationId, msg.Error); + msg.Sender.Tell(new NotificationSubmitAck(msg.NotificationId, Accepted: false, Error: msg.Error)); + } + } + + private static Notification BuildNotification(NotificationSubmit msg) + { + // All current notifications are email; NotificationType has only the Email member. + return new Notification( + msg.NotificationId, + NotificationType.Email, + msg.ListName, + msg.Subject, + msg.Body, + msg.SourceSiteId) + { + SourceInstanceId = msg.SourceInstanceId, + SourceScript = msg.SourceScript, + SiteEnqueuedAt = msg.SiteEnqueuedAt, + CreatedAt = DateTimeOffset.UtcNow, + // Status stays at its Pending default for the dispatch sweep to claim. + }; + } +} diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs new file mode 100644 index 0000000..15edb37 --- /dev/null +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs @@ -0,0 +1,115 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.NotificationOutbox.Tests; + +/// +/// Task 13: Tests for the ingest path — building a +/// from a , persisting it via +/// , and acking the sender. +/// +public class NotificationOutboxActorIngestTests : TestKit +{ + private readonly INotificationOutboxRepository _repository = + Substitute.For(); + + private IServiceProvider BuildServiceProvider() + { + var services = new ServiceCollection(); + services.AddScoped(_ => _repository); + return services.BuildServiceProvider(); + } + + private IActorRef CreateActor() + { + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + BuildServiceProvider(), + new NotificationOutboxOptions(), + NullLogger.Instance))); + } + + private static NotificationSubmit MakeSubmit(string? notificationId = null) + { + return new NotificationSubmit( + NotificationId: notificationId ?? Guid.NewGuid().ToString(), + ListName: "ops-team", + Subject: "Tank overflow", + Body: "Tank 3 level critical", + SourceSiteId: "site-1", + SourceInstanceId: "instance-42", + SourceScript: "AlarmScript", + SiteEnqueuedAt: new DateTimeOffset(2026, 5, 19, 8, 30, 0, TimeSpan.Zero)); + } + + [Fact] + public void NotificationSubmit_PersistsMappedNotification_AndAcksAccepted() + { + _repository.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()) + .Returns(true); + var submit = MakeSubmit(); + var actor = CreateActor(); + + actor.Tell(submit, TestActor); + + var ack = ExpectMsg(); + Assert.Equal(submit.NotificationId, ack.NotificationId); + Assert.True(ack.Accepted); + Assert.Null(ack.Error); + + _repository.Received(1).InsertIfNotExistsAsync( + Arg.Is(n => + n.NotificationId == submit.NotificationId && + n.Type == NotificationType.Email && + n.ListName == submit.ListName && + n.Subject == submit.Subject && + n.Body == submit.Body && + n.SourceSiteId == submit.SourceSiteId && + n.SourceInstanceId == submit.SourceInstanceId && + n.SourceScript == submit.SourceScript && + n.SiteEnqueuedAt == submit.SiteEnqueuedAt && + n.Status == NotificationStatus.Pending && + n.CreatedAt != default), + Arg.Any()); + } + + [Fact] + public void DuplicateSubmit_RepositoryReturnsFalse_StillAcksAccepted() + { + _repository.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()) + .Returns(false); + var submit = MakeSubmit(); + var actor = CreateActor(); + + actor.Tell(submit, TestActor); + + var ack = ExpectMsg(); + Assert.Equal(submit.NotificationId, ack.NotificationId); + Assert.True(ack.Accepted); + Assert.Null(ack.Error); + } + + [Fact] + public void RepositoryThrows_AcksNotAcceptedWithError() + { + _repository.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("database unavailable")); + var submit = MakeSubmit(); + var actor = CreateActor(); + + actor.Tell(submit, TestActor); + + var ack = ExpectMsg(); + Assert.Equal(submit.NotificationId, ack.NotificationId); + Assert.False(ack.Accepted); + Assert.NotNull(ack.Error); + Assert.Contains("database unavailable", ack.Error); + } +}