using System.Text.Json; using Akka.Actor; using Akka.TestKit.Xunit2; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// Notification Outbox: tests for the site Store-and-Forward notification delivery /// handler. "Delivering" a buffered notification means forwarding it to the central /// cluster (via the site communication actor) and treating central's /// as the delivery outcome. /// public class NotificationForwarderTests : TestKit { private static readonly TimeSpan ForwardTimeout = TimeSpan.FromSeconds(2); /// /// Builds a buffered notification S&F message whose payload matches the shape /// produced by the site Notify.Send enqueue path (Task 19): a serialized /// carrying a script-generated /// . The S&F message /// equals that same id. /// private static StoreAndForwardMessage BufferedNotification( string id = "msg-1", string listName = "Operators", string subject = "Pump alarm", string message = "Pump 3 tripped", string? originInstance = "Plant.Pump3", string? sourceScript = "alarmScript") { var payload = JsonSerializer.Serialize(new NotificationSubmit( NotificationId: id, ListName: listName, Subject: subject, Body: message, // SourceSiteId is re-stamped by the forwarder; the enqueue side leaves it blank. SourceSiteId: string.Empty, SourceInstanceId: originInstance, SourceScript: sourceScript, SiteEnqueuedAt: DateTimeOffset.UtcNow)); return new StoreAndForwardMessage { Id = id, Category = StoreAndForwardCategory.Notification, Target = listName, PayloadJson = payload, OriginInstanceName = originInstance, }; } [Fact] public async Task Deliver_ForwardsNotificationSubmitToCentralTarget_AndReturnsTrueOnAccept() { var centralProbe = CreateTestProbe(); var forwarder = new NotificationForwarder( centralProbe.Ref, "site-7", ForwardTimeout); var msg = BufferedNotification( id: "msg-1", listName: "Operators", subject: "Pump alarm", message: "Pump 3 tripped", originInstance: "Plant.Pump3"); var deliverTask = forwarder.DeliverAsync(msg); // The central target receives a NotificationSubmit whose fields map from the // buffered payload; reply Accepted so the handler completes as delivered. var submit = centralProbe.ExpectMsg(); Assert.Equal("msg-1", submit.NotificationId); Assert.Equal("Operators", submit.ListName); Assert.Equal("Pump alarm", submit.Subject); Assert.Equal("Pump 3 tripped", submit.Body); // SourceSiteId is re-stamped by the forwarder from its own site id. Assert.Equal("site-7", submit.SourceSiteId); Assert.Equal("Plant.Pump3", submit.SourceInstanceId); // The originating script travels through from the buffered payload. Assert.Equal("alarmScript", submit.SourceScript); centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null)); Assert.True(await deliverTask); } [Fact] public async Task Deliver_FallsBackToTarget_WhenPayloadListNameIsEmpty() { var centralProbe = CreateTestProbe(); var forwarder = new NotificationForwarder( centralProbe.Ref, "site-7", ForwardTimeout); // A buffered payload carrying an empty-string ListName: the empty value must not // be forwarded — the forwarder falls back to the S&F message Target instead. var payload = JsonSerializer.Serialize(new NotificationSubmit( NotificationId: "msg-empty-list", ListName: "", Subject: "Pump alarm", Body: "Pump 3 tripped", SourceSiteId: string.Empty, SourceInstanceId: "Plant.Pump3", SourceScript: null, SiteEnqueuedAt: DateTimeOffset.UtcNow)); var msg = new StoreAndForwardMessage { Id = "msg-empty-list", Category = StoreAndForwardCategory.Notification, Target = "Operators", PayloadJson = payload, OriginInstanceName = "Plant.Pump3", }; var deliverTask = forwarder.DeliverAsync(msg); var submit = centralProbe.ExpectMsg(); Assert.Equal("Operators", submit.ListName); centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null)); Assert.True(await deliverTask); } [Fact] public async Task Deliver_ThrowsTransient_WhenAckIsNotAccepted() { var centralProbe = CreateTestProbe(); var forwarder = new NotificationForwarder( centralProbe.Ref, "site-7", ForwardTimeout); var deliverTask = forwarder.DeliverAsync(BufferedNotification()); var submit = centralProbe.ExpectMsg(); centralProbe.Reply(new NotificationSubmitAck( submit.NotificationId, Accepted: false, Error: "central rejected")); // A non-accepted ack is a transient failure — the handler throws so the S&F // engine keeps the message buffered and retries the forward. await Assert.ThrowsAnyAsync(() => deliverTask); } [Fact] public async Task Deliver_ThrowsTransient_WhenNoReplyWithinTimeout() { // A probe that never replies stands in for central being unreachable. var centralProbe = CreateTestProbe(); var forwarder = new NotificationForwarder( centralProbe.Ref, "site-7", TimeSpan.FromMilliseconds(300)); // No reply within the timeout → transient failure → throw. await Assert.ThrowsAnyAsync(() => forwarder.DeliverAsync(BufferedNotification())); } [Fact] public async Task Deliver_UsesStableNotificationId_AcrossRetriesOfSameMessage() { var centralProbe = CreateTestProbe(); var forwarder = new NotificationForwarder( centralProbe.Ref, "site-7", ForwardTimeout); var buffered = BufferedNotification(id: "stable-msg-id"); var first = forwarder.DeliverAsync(buffered); var submit1 = centralProbe.ExpectMsg(); centralProbe.Reply(new NotificationSubmitAck(submit1.NotificationId, true, null)); await first; var second = forwarder.DeliverAsync(buffered); var submit2 = centralProbe.ExpectMsg(); centralProbe.Reply(new NotificationSubmitAck(submit2.NotificationId, true, null)); await second; // The NotificationId is the central idempotency key — it must be identical for // every forward attempt of the same buffered S&F message. Assert.Equal(submit1.NotificationId, submit2.NotificationId); Assert.Equal("stable-msg-id", submit1.NotificationId); } }