diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index e00d9e4..505c516 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Health; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.HealthMonitoring; namespace ScadaLink.Communication.Actors; @@ -66,6 +67,15 @@ public class CentralCommunicationActor : ReceiveActor private ICancelable? _refreshSchedule; + /// + /// Proxy for the central NotificationOutboxActor cluster singleton. + /// Set via — the Host creates the singleton proxy + /// after this actor and registers it (mirrors how the site-side actor receives its + /// runtime s). Null until registration completes; a notification + /// arriving before then is rejected with a non-accepted ack so the site retries. + /// + private IActorRef? _notificationOutboxProxy; + /// /// DistributedPubSub topic used to fan health reports out to the peer /// central node so both per-node aggregators stay in sync. See @@ -105,6 +115,61 @@ public class CentralCommunicationActor : ReceiveActor // Route enveloped messages to sites Receive(HandleSiteEnvelope); + // Notification Outbox: the Host registers the outbox singleton proxy after this + // actor is created (the proxy cannot exist before this actor's construction). + Receive(msg => + { + _notificationOutboxProxy = msg.OutboxProxy; + _log.Info("Registered notification outbox proxy"); + }); + + // Notification Outbox ingest: a site forwards a buffered NotificationSubmit to the + // central cluster via ClusterClient. Forward to the outbox proxy so the original + // Sender (the site's ClusterClient path) is preserved and the NotificationSubmitAck + // routes straight back to the site. + Receive(HandleNotificationSubmit); + + // Notification Outbox status query: forward to the outbox proxy, preserving Sender + // so the NotificationStatusResponse routes back to the querying site. + Receive(HandleNotificationStatusQuery); + + } + + private void HandleNotificationSubmit(NotificationSubmit msg) + { + if (_notificationOutboxProxy == null) + { + // No outbox proxy registered yet. A non-accepted ack makes the site's + // Store-and-Forward forwarder treat this as transient and retry later. + _log.Warning( + "Cannot route NotificationSubmit {0} — notification outbox not available", + msg.NotificationId); + Sender.Tell(new NotificationSubmitAck( + msg.NotificationId, Accepted: false, Error: "notification outbox not available")); + return; + } + + _log.Debug("Routing NotificationSubmit {0} to the notification outbox", msg.NotificationId); + _notificationOutboxProxy.Forward(msg); + } + + private void HandleNotificationStatusQuery(NotificationStatusQuery msg) + { + if (_notificationOutboxProxy == null) + { + // No outbox proxy registered yet. Reply Found: false so the querying site + // falls back to its local Store-and-Forward buffer to resolve the status. + _log.Warning( + "Cannot route NotificationStatusQuery {0} — notification outbox not available", + msg.NotificationId); + Sender.Tell(new NotificationStatusResponse( + msg.CorrelationId, Found: false, Status: "Unknown", + RetryCount: 0, LastError: null, DeliveredAt: null)); + return; + } + + _log.Debug("Routing NotificationStatusQuery {0} to the notification outbox", msg.NotificationId); + _notificationOutboxProxy.Forward(msg); } private void HandleHeartbeat(HeartbeatMessage heartbeat) @@ -391,3 +456,11 @@ internal record SiteAddressCacheLoaded(Dictionary> SiteCont /// due to site disconnection (WP-5). /// public record DebugStreamTerminated(string SiteId, string CorrelationId); + +/// +/// Registers the central NotificationOutboxActor singleton proxy with the +/// so site-forwarded +/// and messages can be routed to it. Sent by the Host +/// after the outbox singleton proxy is created. +/// +public record RegisterNotificationOutbox(IActorRef OutboxProxy); diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs index 2a60ff5..60480d0 100644 --- a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs @@ -10,6 +10,7 @@ using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Health; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.Communication.Actors; using ScadaLink.HealthMonitoring; using Akka.TestKit; @@ -251,6 +252,75 @@ public class CentralCommunicationActorTests : TestKit Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId); } + private NotificationSubmit CreateSubmit(string id = "notif1") => + new(id, "ops-list", "Subject", "Body", "site1", "inst1", "script.cs", DateTimeOffset.UtcNow); + + [Fact] + public void NotificationSubmit_ForwardedToOutboxProxy_AckRoutesBackToSite() + { + var (actor, _, _) = CreateActorWithMockRepo(); + var outboxProbe = CreateTestProbe(); + actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref)); + + // A second probe stands in for the site's ClusterClient (the original Sender). + var siteProbe = CreateTestProbe(); + var submit = CreateSubmit(); + actor.Tell(submit, siteProbe.Ref); + + // The outbox proxy receives the NotificationSubmit with the site as the sender, + // so an ack it sends routes straight back to the site, not the central actor. + outboxProbe.ExpectMsg(m => m.NotificationId == "notif1"); + outboxProbe.Reply(new NotificationSubmitAck("notif1", Accepted: true, Error: null)); + siteProbe.ExpectMsg(a => a.NotificationId == "notif1" && a.Accepted); + } + + [Fact] + public void NotificationStatusQuery_ForwardedToOutboxProxy_ResponseRoutesBackToSite() + { + var (actor, _, _) = CreateActorWithMockRepo(); + var outboxProbe = CreateTestProbe(); + actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref)); + + var siteProbe = CreateTestProbe(); + var query = new NotificationStatusQuery("corr1", "notif1"); + actor.Tell(query, siteProbe.Ref); + + outboxProbe.ExpectMsg(m => m.CorrelationId == "corr1"); + outboxProbe.Reply(new NotificationStatusResponse( + "corr1", Found: true, Status: "Delivered", RetryCount: 0, + LastError: null, DeliveredAt: DateTimeOffset.UtcNow)); + siteProbe.ExpectMsg(r => r.CorrelationId == "corr1" && r.Found); + } + + [Fact] + public void NotificationSubmit_NoOutboxConfigured_RepliesNonAccepted() + { + var (actor, _, _) = CreateActorWithMockRepo(); + + // No RegisterNotificationOutbox sent — the proxy is null. + var submit = CreateSubmit(); + actor.Tell(submit); + + var ack = ExpectMsg(); + Assert.Equal("notif1", ack.NotificationId); + Assert.False(ack.Accepted); + Assert.NotNull(ack.Error); + } + + [Fact] + public void NotificationStatusQuery_NoOutboxConfigured_RepliesNotFound() + { + var (actor, _, _) = CreateActorWithMockRepo(); + + // No RegisterNotificationOutbox sent — the proxy is null. + var query = new NotificationStatusQuery("corr1", "notif1"); + actor.Tell(query); + + var response = ExpectMsg(); + Assert.Equal("corr1", response.CorrelationId); + Assert.False(response.Found); + } + [Fact] public void BothContactPoints_UsedInSingleClient() {