feat(notification-outbox): forward site S&F notifications to central

This commit is contained in:
Joseph Doherty
2026-05-19 02:16:27 -04:00
parent 703cb2d392
commit 6a77c12735
6 changed files with 368 additions and 7 deletions

View File

@@ -8,6 +8,7 @@ using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.InboundApi;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Messages.RemoteQuery;
namespace ScadaLink.Communication.Actors;
@@ -165,6 +166,30 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
}
});
// Notification Outbox: forward a buffered notification submitted by the site
// Store-and-Forward Engine to the central cluster. The original Sender (the
// S&F forwarder's Ask) is forwarded as the ClusterClient.Send sender so the
// NotificationSubmitAck routes straight back to the waiting Ask, not here.
Receive<NotificationSubmit>(msg =>
{
if (_centralClient == null)
{
// No ClusterClient registered yet (e.g. central contact points not
// configured, or registration not yet completed). A non-accepted ack
// makes the S&F forwarder treat this as transient and retry later.
_log.Warning(
"Cannot forward NotificationSubmit {0} — no central ClusterClient registered",
msg.NotificationId);
Sender.Tell(new NotificationSubmitAck(
msg.NotificationId, Accepted: false, Error: "Central ClusterClient not registered"));
return;
}
_log.Debug("Forwarding NotificationSubmit {0} to central", msg.NotificationId);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
});
// Internal: send heartbeat tick
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());

View File

@@ -27,6 +27,14 @@ public class CommunicationOptions
/// <summary>Timeout for health report acknowledgement (fire-and-forget, but bounded).</summary>
public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>
/// Notification Outbox: timeout for forwarding a buffered notification to central
/// and awaiting its <c>NotificationSubmitAck</c>. A timeout is treated as a
/// transient failure — the Store-and-Forward engine keeps the message buffered
/// and retries the forward at the fixed retry interval.
/// </summary>
public TimeSpan NotificationForwardTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Contact point addresses for the central cluster (e.g. "akka.tcp://scadalink@central-a:8081").
/// Used by site nodes to create a ClusterClient for reaching central.

View File

@@ -422,15 +422,18 @@ akka {{
.GetRequiredService<ScadaLink.ExternalSystemGateway.DatabaseGateway>()
.DeliverBufferedAsync(msg);
});
// Notification Outbox: a buffered notification is no longer delivered by
// the site over SMTP. "Delivering" it means forwarding it to the central
// cluster via the SiteCommunicationActor and treating central's
// NotificationSubmitAck as the outcome (accepted → delivered; not accepted
// or timeout → throw → transient → keep buffering). Central owns SMTP.
var notificationForwarder = new ScadaLink.StoreAndForward.NotificationForwarder(
siteCommActor,
_nodeOptions.SiteId!,
_communicationOptions.NotificationForwardTimeout);
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.NotificationService.NotificationDeliveryService>()
.DeliverBufferedAsync(msg);
});
notificationForwarder.DeliverAsync);
_logger.LogInformation(
"Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)");

View File

@@ -0,0 +1,149 @@
using System.Text.Json;
using Akka.Actor;
using ScadaLink.Commons.Messages.Notification;
namespace ScadaLink.StoreAndForward;
/// <summary>
/// Notification Outbox: the site Store-and-Forward delivery handler for the
/// <see cref="ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification"/>
/// category.
///
/// In the outbox design the site no longer sends notification email itself.
/// "Delivering" a buffered notification means forwarding it to the central cluster
/// and treating central's <see cref="NotificationSubmitAck"/> as the outcome:
/// <list type="bullet">
/// <item><description>ack <c>Accepted</c> → <see cref="DeliverAsync"/> returns
/// <c>true</c>; the S&amp;F engine removes the message from the buffer.</description></item>
/// <item><description>ack not <c>Accepted</c>, or the Ask times out / fails →
/// <see cref="DeliverAsync"/> throws; the S&amp;F engine treats any thrown
/// exception as transient and retries the forward at the fixed interval.</description></item>
/// </list>
///
/// The forward travels over the ClusterClient command/control transport: the handler
/// <see cref="ActorRefImplicitSenderExtensions.Ask{T}(ICanTell, object, TimeSpan?)">Asks</see>
/// the site communication actor, which wraps the message in a
/// <c>ClusterClient.Send("/user/central-communication", …)</c> and routes central's
/// reply straight back to this Ask.
/// </summary>
public sealed class NotificationForwarder
{
private readonly IActorRef _siteCommunicationActor;
private readonly string _sourceSiteId;
private readonly TimeSpan _forwardTimeout;
/// <param name="siteCommunicationActor">
/// The site communication actor. It forwards a <see cref="NotificationSubmit"/> to
/// central via the registered ClusterClient and replies with the
/// <see cref="NotificationSubmitAck"/>.
/// </param>
/// <param name="sourceSiteId">This site's identifier, stamped on every submit.</param>
/// <param name="forwardTimeout">
/// How long to wait for central's ack before treating the forward as a transient
/// failure. Sourced from host configuration.
/// </param>
public NotificationForwarder(
IActorRef siteCommunicationActor,
string sourceSiteId,
TimeSpan forwardTimeout)
{
_siteCommunicationActor = siteCommunicationActor;
_sourceSiteId = sourceSiteId;
_forwardTimeout = forwardTimeout;
}
/// <summary>
/// Store-and-Forward delivery handler entry point — matches the
/// <c>Func&lt;StoreAndForwardMessage, Task&lt;bool&gt;&gt;</c> handler contract.
/// Returns <c>true</c> when central accepts the notification; throws on a
/// non-accepted ack or an Ask timeout/failure so the engine retries.
/// </summary>
public async Task<bool> DeliverAsync(StoreAndForwardMessage message)
{
// An unreadable payload cannot be fixed by retrying — park it (return false),
// mirroring how the former SMTP handler treated a corrupt buffered payload.
if (!TryBuildSubmit(message, out var submit))
{
return false;
}
// The reply may legitimately be a non-accepted ack, so it is not requested as
// a status-failing Ask: ask for the bare NotificationSubmitAck and classify it
// here. An Ask timeout surfaces as a TimeoutException, which — like any other
// thrown exception — the S&F engine treats as transient.
var ack = await _siteCommunicationActor
.Ask<NotificationSubmitAck>(submit, _forwardTimeout)
.ConfigureAwait(false);
if (ack.Accepted)
{
return true;
}
// A non-accepted ack is a transient failure: central could not persist the
// notification right now. Throw so the engine keeps buffering and retries.
throw new NotificationForwardException(
$"Central rejected notification {submit.NotificationId}: {ack.Error ?? "no detail"}");
}
/// <summary>
/// Maps a buffered S&amp;F notification message onto a <see cref="NotificationSubmit"/>,
/// returning <c>false</c> if the payload is unreadable.
/// The <see cref="NotificationSubmit.NotificationId"/> is the central idempotency
/// key and must be stable across every retry of the same buffered message, so it is
/// derived from <see cref="StoreAndForwardMessage.Id"/> — a stable GUID assigned
/// once at enqueue time.
/// </summary>
private bool TryBuildSubmit(StoreAndForwardMessage message, out NotificationSubmit submit)
{
submit = null!;
BufferedNotificationPayload? payload;
try
{
payload = JsonSerializer.Deserialize<BufferedNotificationPayload>(message.PayloadJson);
}
catch (JsonException)
{
return false;
}
if (payload == null)
{
return false;
}
submit = new NotificationSubmit(
NotificationId: message.Id,
ListName: payload.ListName ?? message.Target,
Subject: payload.Subject ?? string.Empty,
Body: payload.Message ?? string.Empty,
SourceSiteId: _sourceSiteId,
SourceInstanceId: message.OriginInstanceName,
// The buffered payload does not currently carry the originating script;
// Task 19 (the enqueue side) will add it. Null until then.
SourceScript: null,
SiteEnqueuedAt: message.CreatedAt);
return true;
}
/// <summary>
/// Mirrors the payload shape written by the site notification enqueue path
/// (<c>{ ListName, Subject, Message }</c>). Kept private to this forwarder — Task 19
/// will reshape the enqueue payload, at which point this is updated alongside it.
/// </summary>
private sealed record BufferedNotificationPayload(
string? ListName, string? Subject, string? Message);
}
/// <summary>
/// Raised by <see cref="NotificationForwarder"/> on a transient forward failure —
/// a non-accepted central ack. The Store-and-Forward engine treats any thrown
/// exception as transient and retries the forward at the fixed interval.
/// </summary>
public sealed class NotificationForwardException : Exception
{
public NotificationForwardException(string message) : base(message)
{
}
}