Files
ScadaBridge/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs
T
Joseph Doherty ac96b83b08 fix(high-severity): close 9 of 10 open High findings across 8 modules
Comm-016: delete dead HandleConnectionStateChanged + _debugSubscriptions /
_inProgressDeployments tracking + ConnectionStateChanged message record.
Disconnect detection is owned by the transport layers (gRPC keepalive PING
~25s; Ask-timeout at CommunicationService). Updates the
Component-Communication.md design doc to make that explicit.

SnF-018: NotificationForwarder.DeliverAsync now discards a corrupt buffered
payload (Warning log + return true) instead of returning false and parking
the row — honoring the design's "notifications do not park" invariant.

DM-018: reconciliation no longer force-sets Enabled, preserving an
intentional Disabled state after central failover.

ESG-018: DeliverBufferedAsync (both ExternalSystemClient + DatabaseGateway)
catches JsonException and returns false, turning a corrupt buffered row
into a parked operation instead of a retry-forever poison message.

InboundAPI-022: register ActiveNodeGate as IActiveNodeGate in the Central
DI branch so standby-node gating is actually wired up in production.

NS-019: remove orphaned NotificationDeliveryService /
INotificationDeliveryService / NotificationResult; central notification
delivery now lives entirely in NotificationOutbox.

SEL-016: normalise From/To filters to UTC before ISO-string compare so
non-UTC DateTimeOffset clients no longer get spuriously excluded events.

TE-017: include Description on attributes/alarms and a HashableConnections
projection (protocol, endpoint JSON, failover count) in the revision hash
and DiffService; staleness detection now catches description-only and
connection-endpoint edits.

Transport-001 and Transport-002 (also High) remain Open — they're being
handled in a follow-up batch because both touch BundleImporter.cs and
must serialise.
2026-05-28 05:40:15 -04:00

255 lines
11 KiB
C#

using System.Text.Json;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.StoreAndForward.Tests;
/// <summary>
/// Notification Outbox: tests for the site Store-and-Forward notification delivery
/// handler. "Delivering" a buffered notification means forwarding it to the central
/// cluster (via the site communication actor) and treating central's
/// <see cref="NotificationSubmitAck"/> as the delivery outcome.
/// </summary>
public class NotificationForwarderTests : TestKit
{
private static readonly TimeSpan ForwardTimeout = TimeSpan.FromSeconds(2);
/// <summary>
/// Builds a buffered notification S&amp;F message whose payload matches the shape
/// produced by the site <c>Notify.Send</c> enqueue path (Task 19): a serialized
/// <see cref="NotificationSubmit"/> carrying a script-generated
/// <see cref="NotificationSubmit.NotificationId"/>. The S&amp;F message
/// <see cref="StoreAndForwardMessage.Id"/> equals that same id.
/// </summary>
private static StoreAndForwardMessage BufferedNotification(
string id = "msg-1", string listName = "Operators",
string subject = "Pump alarm", string message = "Pump 3 tripped",
string? originInstance = "Plant.Pump3", string? sourceScript = "alarmScript",
Guid? originExecutionId = null)
{
var payload = JsonSerializer.Serialize(new NotificationSubmit(
NotificationId: id,
ListName: listName,
Subject: subject,
Body: message,
// SourceSiteId is re-stamped by the forwarder; the enqueue side leaves it blank.
SourceSiteId: string.Empty,
SourceInstanceId: originInstance,
SourceScript: sourceScript,
SiteEnqueuedAt: DateTimeOffset.UtcNow,
OriginExecutionId: originExecutionId));
return new StoreAndForwardMessage
{
Id = id,
Category = StoreAndForwardCategory.Notification,
Target = listName,
PayloadJson = payload,
OriginInstanceName = originInstance,
};
}
[Fact]
public async Task Deliver_ForwardsNotificationSubmitToCentralTarget_AndReturnsTrueOnAccept()
{
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var msg = BufferedNotification(
id: "msg-1", listName: "Operators", subject: "Pump alarm",
message: "Pump 3 tripped", originInstance: "Plant.Pump3");
var deliverTask = forwarder.DeliverAsync(msg);
// The central target receives a NotificationSubmit whose fields map from the
// buffered payload; reply Accepted so the handler completes as delivered.
var submit = centralProbe.ExpectMsg<NotificationSubmit>();
Assert.Equal("msg-1", submit.NotificationId);
Assert.Equal("Operators", submit.ListName);
Assert.Equal("Pump alarm", submit.Subject);
Assert.Equal("Pump 3 tripped", submit.Body);
// SourceSiteId is re-stamped by the forwarder from its own site id.
Assert.Equal("site-7", submit.SourceSiteId);
Assert.Equal("Plant.Pump3", submit.SourceInstanceId);
// The originating script travels through from the buffered payload.
Assert.Equal("alarmScript", submit.SourceScript);
centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null));
Assert.True(await deliverTask);
}
[Fact]
public async Task Deliver_PreservesOriginExecutionId_FromBufferedPayload()
{
// Audit Log #23: the buffered payload's OriginExecutionId is the per-run
// id stamped at Notify.Send time. The forwarder re-stamps only the four
// fields it authoritatively owns (NotificationId, ListName, SourceSiteId,
// SourceInstanceId) via the `with` expression — OriginExecutionId is
// preserved precisely BY being absent from that `with` block. This test
// pins that: if OriginExecutionId is ever added to the `with` expression
// (e.g. reset to null), the forwarded NotificationSubmit would lose the
// per-run id and central could not echo it onto NotifyDeliver rows.
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var executionId = Guid.NewGuid();
var msg = BufferedNotification(id: "msg-exec", originExecutionId: executionId);
var deliverTask = forwarder.DeliverAsync(msg);
var submit = centralProbe.ExpectMsg<NotificationSubmit>();
Assert.Equal(executionId, submit.OriginExecutionId);
centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null));
Assert.True(await deliverTask);
}
[Fact]
public async Task Deliver_FallsBackToTarget_WhenPayloadListNameIsEmpty()
{
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
// A buffered payload carrying an empty-string ListName: the empty value must not
// be forwarded — the forwarder falls back to the S&F message Target instead.
var payload = JsonSerializer.Serialize(new NotificationSubmit(
NotificationId: "msg-empty-list",
ListName: "",
Subject: "Pump alarm",
Body: "Pump 3 tripped",
SourceSiteId: string.Empty,
SourceInstanceId: "Plant.Pump3",
SourceScript: null,
SiteEnqueuedAt: DateTimeOffset.UtcNow));
var msg = new StoreAndForwardMessage
{
Id = "msg-empty-list",
Category = StoreAndForwardCategory.Notification,
Target = "Operators",
PayloadJson = payload,
OriginInstanceName = "Plant.Pump3",
};
var deliverTask = forwarder.DeliverAsync(msg);
var submit = centralProbe.ExpectMsg<NotificationSubmit>();
Assert.Equal("Operators", submit.ListName);
centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null));
Assert.True(await deliverTask);
}
[Fact]
public async Task Deliver_ThrowsTransient_WhenAckIsNotAccepted()
{
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var deliverTask = forwarder.DeliverAsync(BufferedNotification());
var submit = centralProbe.ExpectMsg<NotificationSubmit>();
centralProbe.Reply(new NotificationSubmitAck(
submit.NotificationId, Accepted: false, Error: "central rejected"));
// A non-accepted ack is a transient failure — the handler throws so the S&F
// engine keeps the message buffered and retries the forward.
await Assert.ThrowsAnyAsync<Exception>(() => deliverTask);
}
[Fact]
public async Task Deliver_ThrowsTransient_WhenNoReplyWithinTimeout()
{
// A probe that never replies stands in for central being unreachable.
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", TimeSpan.FromMilliseconds(300));
// No reply within the timeout → transient failure → throw.
await Assert.ThrowsAnyAsync<Exception>(() => forwarder.DeliverAsync(BufferedNotification()));
}
[Fact]
public async Task Deliver_UsesStableNotificationId_AcrossRetriesOfSameMessage()
{
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var buffered = BufferedNotification(id: "stable-msg-id");
var first = forwarder.DeliverAsync(buffered);
var submit1 = centralProbe.ExpectMsg<NotificationSubmit>();
centralProbe.Reply(new NotificationSubmitAck(submit1.NotificationId, true, null));
await first;
var second = forwarder.DeliverAsync(buffered);
var submit2 = centralProbe.ExpectMsg<NotificationSubmit>();
centralProbe.Reply(new NotificationSubmitAck(submit2.NotificationId, true, null));
await second;
// The NotificationId is the central idempotency key — it must be identical for
// every forward attempt of the same buffered S&F message.
Assert.Equal(submit1.NotificationId, submit2.NotificationId);
Assert.Equal("stable-msg-id", submit1.NotificationId);
}
[Fact]
public async Task Deliver_CorruptJsonPayload_ReturnsTrue_AndDoesNotForwardAnything()
{
// Regression test for StoreAndForward-018. The design doc forbids parking
// notifications ("notifications do not park — they are retried at the fixed
// forward interval until central acks"; Component-StoreAndForward.md). The
// previous implementation returned false on a corrupt payload, which the S&F
// engine interprets as a permanent failure and parks the row — contradicting
// the invariant. The fix: discard a corrupt buffered notification by
// returning true (engine clears the buffer via its normal success path),
// with a Warning log line carrying the row id and a payload preview.
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var corrupt = new StoreAndForwardMessage
{
Id = "msg-corrupt",
Category = StoreAndForwardCategory.Notification,
Target = "Operators",
PayloadJson = "{not-valid-json",
OriginInstanceName = "Plant.Pump3",
};
Assert.True(await forwarder.DeliverAsync(corrupt));
// The corrupt-payload path must NOT round-trip to central — no
// NotificationSubmit / no Ask. ExpectNoMsg confirms nothing was forwarded.
centralProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
}
[Fact]
public async Task Deliver_NullDeserializedPayload_ReturnsTrue_AndDoesNotForwardAnything()
{
// The companion case to corrupt JSON: the payload is valid JSON but
// deserialises to null (e.g. "null"). Same treatment per StoreAndForward-018
// — discard rather than park.
var centralProbe = CreateTestProbe();
var forwarder = new NotificationForwarder(
centralProbe.Ref, "site-7", ForwardTimeout);
var nullPayload = new StoreAndForwardMessage
{
Id = "msg-null",
Category = StoreAndForwardCategory.Notification,
Target = "Operators",
PayloadJson = "null",
OriginInstanceName = "Plant.Pump3",
};
Assert.True(await forwarder.DeliverAsync(nullPayload));
centralProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
}
}