feat(notification-outbox): route NotificationSubmit to the outbox actor

This commit is contained in:
Joseph Doherty
2026-05-19 02:38:04 -04:00
parent b88c75c116
commit 2ff62a2ceb
2 changed files with 143 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.HealthMonitoring; using ScadaLink.HealthMonitoring;
namespace ScadaLink.Communication.Actors; namespace ScadaLink.Communication.Actors;
@@ -66,6 +67,15 @@ public class CentralCommunicationActor : ReceiveActor
private ICancelable? _refreshSchedule; private ICancelable? _refreshSchedule;
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central NotificationOutboxActor cluster singleton.
/// Set via <see cref="RegisterNotificationOutbox"/> — the Host creates the singleton proxy
/// after this actor and registers it (mirrors how the site-side actor receives its
/// runtime <see cref="IActorRef"/>s). Null until registration completes; a notification
/// arriving before then is rejected with a non-accepted ack so the site retries.
/// </summary>
private IActorRef? _notificationOutboxProxy;
/// <summary> /// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer /// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See /// central node so both per-node aggregators stay in sync. See
@@ -105,6 +115,61 @@ public class CentralCommunicationActor : ReceiveActor
// Route enveloped messages to sites // Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope); Receive<SiteEnvelope>(HandleSiteEnvelope);
// Notification Outbox: the Host registers the outbox singleton proxy after this
// actor is created (the proxy cannot exist before this actor's construction).
Receive<RegisterNotificationOutbox>(msg =>
{
_notificationOutboxProxy = msg.OutboxProxy;
_log.Info("Registered notification outbox proxy");
});
// Notification Outbox ingest: a site forwards a buffered NotificationSubmit to the
// central cluster via ClusterClient. Forward to the outbox proxy so the original
// Sender (the site's ClusterClient path) is preserved and the NotificationSubmitAck
// routes straight back to the site.
Receive<NotificationSubmit>(HandleNotificationSubmit);
// Notification Outbox status query: forward to the outbox proxy, preserving Sender
// so the NotificationStatusResponse routes back to the querying site.
Receive<NotificationStatusQuery>(HandleNotificationStatusQuery);
}
private void HandleNotificationSubmit(NotificationSubmit msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. A non-accepted ack makes the site's
// Store-and-Forward forwarder treat this as transient and retry later.
_log.Warning(
"Cannot route NotificationSubmit {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationSubmitAck(
msg.NotificationId, Accepted: false, Error: "notification outbox not available"));
return;
}
_log.Debug("Routing NotificationSubmit {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
}
private void HandleNotificationStatusQuery(NotificationStatusQuery msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. Reply Found: false so the querying site
// falls back to its local Store-and-Forward buffer to resolve the status.
_log.Warning(
"Cannot route NotificationStatusQuery {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationStatusResponse(
msg.CorrelationId, Found: false, Status: "Unknown",
RetryCount: 0, LastError: null, DeliveredAt: null));
return;
}
_log.Debug("Routing NotificationStatusQuery {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
} }
private void HandleHeartbeat(HeartbeatMessage heartbeat) private void HandleHeartbeat(HeartbeatMessage heartbeat)
@@ -391,3 +456,11 @@ internal record SiteAddressCacheLoaded(Dictionary<string, List<string>> SiteCont
/// due to site disconnection (WP-5). /// due to site disconnection (WP-5).
/// </summary> /// </summary>
public record DebugStreamTerminated(string SiteId, string CorrelationId); public record DebugStreamTerminated(string SiteId, string CorrelationId);
/// <summary>
/// Registers the central NotificationOutboxActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded <see cref="NotificationSubmit"/>
/// and <see cref="NotificationStatusQuery"/> messages can be routed to it. Sent by the Host
/// after the outbox singleton proxy is created.
/// </summary>
public record RegisterNotificationOutbox(IActorRef OutboxProxy);

View File

@@ -10,6 +10,7 @@ using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Communication.Actors; using ScadaLink.Communication.Actors;
using ScadaLink.HealthMonitoring; using ScadaLink.HealthMonitoring;
using Akka.TestKit; using Akka.TestKit;
@@ -251,6 +252,75 @@ public class CentralCommunicationActorTests : TestKit
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId); Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
} }
private NotificationSubmit CreateSubmit(string id = "notif1") =>
new(id, "ops-list", "Subject", "Body", "site1", "inst1", "script.cs", DateTimeOffset.UtcNow);
[Fact]
public void NotificationSubmit_ForwardedToOutboxProxy_AckRoutesBackToSite()
{
var (actor, _, _) = CreateActorWithMockRepo();
var outboxProbe = CreateTestProbe();
actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref));
// A second probe stands in for the site's ClusterClient (the original Sender).
var siteProbe = CreateTestProbe();
var submit = CreateSubmit();
actor.Tell(submit, siteProbe.Ref);
// The outbox proxy receives the NotificationSubmit with the site as the sender,
// so an ack it sends routes straight back to the site, not the central actor.
outboxProbe.ExpectMsg<NotificationSubmit>(m => m.NotificationId == "notif1");
outboxProbe.Reply(new NotificationSubmitAck("notif1", Accepted: true, Error: null));
siteProbe.ExpectMsg<NotificationSubmitAck>(a => a.NotificationId == "notif1" && a.Accepted);
}
[Fact]
public void NotificationStatusQuery_ForwardedToOutboxProxy_ResponseRoutesBackToSite()
{
var (actor, _, _) = CreateActorWithMockRepo();
var outboxProbe = CreateTestProbe();
actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref));
var siteProbe = CreateTestProbe();
var query = new NotificationStatusQuery("corr1", "notif1");
actor.Tell(query, siteProbe.Ref);
outboxProbe.ExpectMsg<NotificationStatusQuery>(m => m.CorrelationId == "corr1");
outboxProbe.Reply(new NotificationStatusResponse(
"corr1", Found: true, Status: "Delivered", RetryCount: 0,
LastError: null, DeliveredAt: DateTimeOffset.UtcNow));
siteProbe.ExpectMsg<NotificationStatusResponse>(r => r.CorrelationId == "corr1" && r.Found);
}
[Fact]
public void NotificationSubmit_NoOutboxConfigured_RepliesNonAccepted()
{
var (actor, _, _) = CreateActorWithMockRepo();
// No RegisterNotificationOutbox sent — the proxy is null.
var submit = CreateSubmit();
actor.Tell(submit);
var ack = ExpectMsg<NotificationSubmitAck>();
Assert.Equal("notif1", ack.NotificationId);
Assert.False(ack.Accepted);
Assert.NotNull(ack.Error);
}
[Fact]
public void NotificationStatusQuery_NoOutboxConfigured_RepliesNotFound()
{
var (actor, _, _) = CreateActorWithMockRepo();
// No RegisterNotificationOutbox sent — the proxy is null.
var query = new NotificationStatusQuery("corr1", "notif1");
actor.Tell(query);
var response = ExpectMsg<NotificationStatusResponse>();
Assert.Equal("corr1", response.CorrelationId);
Assert.False(response.Found);
}
[Fact] [Fact]
public void BothContactPoints_UsedInSingleClient() public void BothContactPoints_UsedInSingleClient()
{ {