From 6a77c127351104769a2187586fc4023e90337172 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 02:16:27 -0400 Subject: [PATCH] feat(notification-outbox): forward site S&F notifications to central --- .../Actors/SiteCommunicationActor.cs | 25 +++ .../CommunicationOptions.cs | 8 + .../Actors/AkkaHostedService.cs | 17 +- .../NotificationForwarder.cs | 149 ++++++++++++++++++ .../SiteCommunicationActorTests.cs | 52 ++++++ .../NotificationForwarderTests.cs | 124 +++++++++++++++ 6 files changed, 368 insertions(+), 7 deletions(-) create mode 100644 src/ScadaLink.StoreAndForward/NotificationForwarder.cs create mode 100644 tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 6ce3610..5e5a43d 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -8,6 +8,7 @@ using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.InboundApi; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Messages.Lifecycle; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.RemoteQuery; namespace ScadaLink.Communication.Actors; @@ -165,6 +166,30 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers } }); + // Notification Outbox: forward a buffered notification submitted by the site + // Store-and-Forward Engine to the central cluster. The original Sender (the + // S&F forwarder's Ask) is forwarded as the ClusterClient.Send sender so the + // NotificationSubmitAck routes straight back to the waiting Ask, not here. + Receive(msg => + { + if (_centralClient == null) + { + // No ClusterClient registered yet (e.g. central contact points not + // configured, or registration not yet completed). A non-accepted ack + // makes the S&F forwarder treat this as transient and retry later. + _log.Warning( + "Cannot forward NotificationSubmit {0} — no central ClusterClient registered", + msg.NotificationId); + Sender.Tell(new NotificationSubmitAck( + msg.NotificationId, Accepted: false, Error: "Central ClusterClient not registered")); + return; + } + + _log.Debug("Forwarding NotificationSubmit {0} to central", msg.NotificationId); + _centralClient.Tell( + new ClusterClient.Send("/user/central-communication", msg), Sender); + }); + // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); diff --git a/src/ScadaLink.Communication/CommunicationOptions.cs b/src/ScadaLink.Communication/CommunicationOptions.cs index 3ef5cf1..16a599f 100644 --- a/src/ScadaLink.Communication/CommunicationOptions.cs +++ b/src/ScadaLink.Communication/CommunicationOptions.cs @@ -27,6 +27,14 @@ public class CommunicationOptions /// Timeout for health report acknowledgement (fire-and-forget, but bounded). public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10); + /// + /// Notification Outbox: timeout for forwarding a buffered notification to central + /// and awaiting its NotificationSubmitAck. A timeout is treated as a + /// transient failure — the Store-and-Forward engine keeps the message buffered + /// and retries the forward at the fixed retry interval. + /// + public TimeSpan NotificationForwardTimeout { get; set; } = TimeSpan.FromSeconds(30); + /// /// Contact point addresses for the central cluster (e.g. "akka.tcp://scadalink@central-a:8081"). /// Used by site nodes to create a ClusterClient for reaching central. diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index d1e07f6..797c182 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -422,15 +422,18 @@ akka {{ .GetRequiredService() .DeliverBufferedAsync(msg); }); + // Notification Outbox: a buffered notification is no longer delivered by + // the site over SMTP. "Delivering" it means forwarding it to the central + // cluster via the SiteCommunicationActor and treating central's + // NotificationSubmitAck as the outcome (accepted → delivered; not accepted + // or timeout → throw → transient → keep buffering). Central owns SMTP. + var notificationForwarder = new ScadaLink.StoreAndForward.NotificationForwarder( + siteCommActor, + _nodeOptions.SiteId!, + _communicationOptions.NotificationForwardTimeout); storeAndForwardService.RegisterDeliveryHandler( ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification, - async msg => - { - using var scope = _serviceProvider.CreateScope(); - return await scope.ServiceProvider - .GetRequiredService() - .DeliverBufferedAsync(msg); - }); + notificationForwarder.DeliverAsync); _logger.LogInformation( "Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)"); diff --git a/src/ScadaLink.StoreAndForward/NotificationForwarder.cs b/src/ScadaLink.StoreAndForward/NotificationForwarder.cs new file mode 100644 index 0000000..81bb4d5 --- /dev/null +++ b/src/ScadaLink.StoreAndForward/NotificationForwarder.cs @@ -0,0 +1,149 @@ +using System.Text.Json; +using Akka.Actor; +using ScadaLink.Commons.Messages.Notification; + +namespace ScadaLink.StoreAndForward; + +/// +/// Notification Outbox: the site Store-and-Forward delivery handler for the +/// +/// category. +/// +/// In the outbox design the site no longer sends notification email itself. +/// "Delivering" a buffered notification means forwarding it to the central cluster +/// and treating central's as the outcome: +/// +/// ack Accepted returns +/// true; the S&F engine removes the message from the buffer. +/// ack not Accepted, or the Ask times out / fails → +/// throws; the S&F engine treats any thrown +/// exception as transient and retries the forward at the fixed interval. +/// +/// +/// The forward travels over the ClusterClient command/control transport: the handler +/// Asks +/// the site communication actor, which wraps the message in a +/// ClusterClient.Send("/user/central-communication", …) and routes central's +/// reply straight back to this Ask. +/// +public sealed class NotificationForwarder +{ + private readonly IActorRef _siteCommunicationActor; + private readonly string _sourceSiteId; + private readonly TimeSpan _forwardTimeout; + + /// + /// The site communication actor. It forwards a to + /// central via the registered ClusterClient and replies with the + /// . + /// + /// This site's identifier, stamped on every submit. + /// + /// How long to wait for central's ack before treating the forward as a transient + /// failure. Sourced from host configuration. + /// + public NotificationForwarder( + IActorRef siteCommunicationActor, + string sourceSiteId, + TimeSpan forwardTimeout) + { + _siteCommunicationActor = siteCommunicationActor; + _sourceSiteId = sourceSiteId; + _forwardTimeout = forwardTimeout; + } + + /// + /// Store-and-Forward delivery handler entry point — matches the + /// Func<StoreAndForwardMessage, Task<bool>> handler contract. + /// Returns true when central accepts the notification; throws on a + /// non-accepted ack or an Ask timeout/failure so the engine retries. + /// + public async Task DeliverAsync(StoreAndForwardMessage message) + { + // An unreadable payload cannot be fixed by retrying — park it (return false), + // mirroring how the former SMTP handler treated a corrupt buffered payload. + if (!TryBuildSubmit(message, out var submit)) + { + return false; + } + + // The reply may legitimately be a non-accepted ack, so it is not requested as + // a status-failing Ask: ask for the bare NotificationSubmitAck and classify it + // here. An Ask timeout surfaces as a TimeoutException, which — like any other + // thrown exception — the S&F engine treats as transient. + var ack = await _siteCommunicationActor + .Ask(submit, _forwardTimeout) + .ConfigureAwait(false); + + if (ack.Accepted) + { + return true; + } + + // A non-accepted ack is a transient failure: central could not persist the + // notification right now. Throw so the engine keeps buffering and retries. + throw new NotificationForwardException( + $"Central rejected notification {submit.NotificationId}: {ack.Error ?? "no detail"}"); + } + + /// + /// Maps a buffered S&F notification message onto a , + /// returning false if the payload is unreadable. + /// The is the central idempotency + /// key and must be stable across every retry of the same buffered message, so it is + /// derived from — a stable GUID assigned + /// once at enqueue time. + /// + private bool TryBuildSubmit(StoreAndForwardMessage message, out NotificationSubmit submit) + { + submit = null!; + + BufferedNotificationPayload? payload; + try + { + payload = JsonSerializer.Deserialize(message.PayloadJson); + } + catch (JsonException) + { + return false; + } + + if (payload == null) + { + return false; + } + + submit = new NotificationSubmit( + NotificationId: message.Id, + ListName: payload.ListName ?? message.Target, + Subject: payload.Subject ?? string.Empty, + Body: payload.Message ?? string.Empty, + SourceSiteId: _sourceSiteId, + SourceInstanceId: message.OriginInstanceName, + // The buffered payload does not currently carry the originating script; + // Task 19 (the enqueue side) will add it. Null until then. + SourceScript: null, + SiteEnqueuedAt: message.CreatedAt); + return true; + } + + /// + /// Mirrors the payload shape written by the site notification enqueue path + /// ({ ListName, Subject, Message }). Kept private to this forwarder — Task 19 + /// will reshape the enqueue payload, at which point this is updated alongside it. + /// + private sealed record BufferedNotificationPayload( + string? ListName, string? Subject, string? Message); +} + +/// +/// Raised by on a transient forward failure — +/// a non-accepted central ack. The Store-and-Forward engine treats any thrown +/// exception as transient and retries the forward at the fixed interval. +/// +public sealed class NotificationForwardException : Exception +{ + public NotificationForwardException(string message) : base(message) + { + } +} diff --git a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs index 37a6b1a..b519a47 100644 --- a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs @@ -1,8 +1,10 @@ using Akka.Actor; +using Akka.Cluster.Tools.Client; using Akka.TestKit.Xunit2; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Lifecycle; using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.RemoteQuery; using ScadaLink.Communication.Actors; @@ -103,6 +105,56 @@ public class SiteCommunicationActorTests : TestKit handlerProbe.ExpectMsg(msg => msg.CorrelationId == "corr1"); } + [Fact] + public void NotificationSubmit_WithCentralClient_ForwardedToCentralAndAckRoutedBack() + { + // The site forwards a buffered notification to central over the ClusterClient + // command/control transport; the central ack must route back to the original + // sender (the S&F forwarder's Ask), not to the SiteCommunicationActor. + var dmProbe = CreateTestProbe(); + var centralClientProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new RegisterCentralClient(centralClientProbe.Ref)); + + var submit = new NotificationSubmit( + "notif-1", "Operators", "Subj", "Body", "site1", "inst1", "alarmScript", + DateTimeOffset.UtcNow); + siteActor.Tell(submit); + + // Central client (acting as ClusterClient) receives a ClusterClient.Send wrapping + // the NotificationSubmit, addressed to the central communication actor. Fish past + // any periodic HeartbeatMessage the actor's timer may interleave. + var send = centralClientProbe.FishForMessage( + s => s.Message is NotificationSubmit); + Assert.Equal("/user/central-communication", send.Path); + var forwarded = Assert.IsType(send.Message); + Assert.Equal("notif-1", forwarded.NotificationId); + + // The ack is sent to the ClusterClient.Send's Sender — replying as that probe + // must land back at the test actor (the original Tell sender). + centralClientProbe.Reply(new NotificationSubmitAck("notif-1", Accepted: true, Error: null)); + ExpectMsg(ack => ack.NotificationId == "notif-1" && ack.Accepted); + } + + [Fact] + public void NotificationSubmit_WithoutCentralClient_RepliesWithNonAccepted() + { + // No ClusterClient registered yet: the submit cannot be forwarded, so the actor + // replies with a non-accepted ack and the S&F forwarder treats it as transient. + var dmProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + var submit = new NotificationSubmit( + "notif-2", "Operators", "Subj", "Body", "site1", null, null, + DateTimeOffset.UtcNow); + siteActor.Tell(submit); + + ExpectMsg(ack => ack.NotificationId == "notif-2" && !ack.Accepted); + } + [Fact] public void EventLogQuery_WithoutHandler_ReturnsFailure() { diff --git a/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs b/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs new file mode 100644 index 0000000..832f835 --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs @@ -0,0 +1,124 @@ +using System.Text.Json; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// Notification Outbox: tests for the site Store-and-Forward notification delivery +/// handler. "Delivering" a buffered notification means forwarding it to the central +/// cluster (via the site communication actor) and treating central's +/// as the delivery outcome. +/// +public class NotificationForwarderTests : TestKit +{ + private static readonly TimeSpan ForwardTimeout = TimeSpan.FromSeconds(2); + + /// + /// Builds a buffered notification S&F message whose payload matches the shape + /// produced by the site NotificationDeliveryService enqueue path. + /// + private static StoreAndForwardMessage BufferedNotification( + string id = "msg-1", string listName = "Operators", + string subject = "Pump alarm", string message = "Pump 3 tripped", + string? originInstance = "Plant.Pump3") + { + var payload = JsonSerializer.Serialize(new + { + ListName = listName, + Subject = subject, + Message = message + }); + return new StoreAndForwardMessage + { + Id = id, + Category = StoreAndForwardCategory.Notification, + Target = listName, + PayloadJson = payload, + OriginInstanceName = originInstance, + }; + } + + [Fact] + public async Task Deliver_ForwardsNotificationSubmitToCentralTarget_AndReturnsTrueOnAccept() + { + var centralProbe = CreateTestProbe(); + var forwarder = new NotificationForwarder( + centralProbe.Ref, "site-7", ForwardTimeout); + + var msg = BufferedNotification( + id: "msg-1", listName: "Operators", subject: "Pump alarm", + message: "Pump 3 tripped", originInstance: "Plant.Pump3"); + + var deliverTask = forwarder.DeliverAsync(msg); + + // The central target receives a NotificationSubmit whose fields map from the + // buffered payload; reply Accepted so the handler completes as delivered. + var submit = centralProbe.ExpectMsg(); + Assert.Equal("Operators", submit.ListName); + Assert.Equal("Pump alarm", submit.Subject); + Assert.Equal("Pump 3 tripped", submit.Body); + Assert.Equal("site-7", submit.SourceSiteId); + Assert.Equal("Plant.Pump3", submit.SourceInstanceId); + centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null)); + + Assert.True(await deliverTask); + } + + [Fact] + public async Task Deliver_ThrowsTransient_WhenAckIsNotAccepted() + { + var centralProbe = CreateTestProbe(); + var forwarder = new NotificationForwarder( + centralProbe.Ref, "site-7", ForwardTimeout); + + var deliverTask = forwarder.DeliverAsync(BufferedNotification()); + + var submit = centralProbe.ExpectMsg(); + centralProbe.Reply(new NotificationSubmitAck( + submit.NotificationId, Accepted: false, Error: "central rejected")); + + // A non-accepted ack is a transient failure — the handler throws so the S&F + // engine keeps the message buffered and retries the forward. + await Assert.ThrowsAnyAsync(() => deliverTask); + } + + [Fact] + public async Task Deliver_ThrowsTransient_WhenNoReplyWithinTimeout() + { + // A probe that never replies stands in for central being unreachable. + var centralProbe = CreateTestProbe(); + var forwarder = new NotificationForwarder( + centralProbe.Ref, "site-7", TimeSpan.FromMilliseconds(300)); + + // No reply within the timeout → transient failure → throw. + await Assert.ThrowsAnyAsync(() => forwarder.DeliverAsync(BufferedNotification())); + } + + [Fact] + public async Task Deliver_UsesStableNotificationId_AcrossRetriesOfSameMessage() + { + var centralProbe = CreateTestProbe(); + var forwarder = new NotificationForwarder( + centralProbe.Ref, "site-7", ForwardTimeout); + + var buffered = BufferedNotification(id: "stable-msg-id"); + + var first = forwarder.DeliverAsync(buffered); + var submit1 = centralProbe.ExpectMsg(); + centralProbe.Reply(new NotificationSubmitAck(submit1.NotificationId, true, null)); + await first; + + var second = forwarder.DeliverAsync(buffered); + var submit2 = centralProbe.ExpectMsg(); + centralProbe.Reply(new NotificationSubmitAck(submit2.NotificationId, true, null)); + await second; + + // The NotificationId is the central idempotency key — it must be identical for + // every forward attempt of the same buffered S&F message. + Assert.Equal(submit1.NotificationId, submit2.NotificationId); + Assert.Equal("stable-msg-id", submit1.NotificationId); + } +}