280 lines
11 KiB
C#
280 lines
11 KiB
C#
using System.Text.Json;
|
|
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Messages.Notification;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.SiteRuntime.Scripts;
|
|
using ScadaLink.StoreAndForward;
|
|
|
|
namespace ScadaLink.SiteRuntime.Tests.Scripts;
|
|
|
|
/// <summary>
|
|
/// Notification Outbox (Task 19): tests for the async <c>Notify.Send</c> /
|
|
/// <c>Notify.Status</c> script API.
|
|
///
|
|
/// In the outbox design <c>Notify.To("list").Send(...)</c> no longer delivers email
|
|
/// inline — it generates a stable <c>NotificationId</c>, enqueues a
|
|
/// <see cref="StoreAndForwardCategory.Notification"/> message into the site
|
|
/// Store-and-Forward Engine (which Task 18 retargets to forward to central), and
|
|
/// returns the <c>NotificationId</c> immediately. <c>Notify.Status(id)</c> queries
|
|
/// central for delivery status, reporting the site-local <c>Forwarding</c> state
|
|
/// while the notification is still buffered at the site.
|
|
/// </summary>
|
|
public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable
|
|
{
|
|
private readonly SqliteConnection _keepAlive;
|
|
private readonly StoreAndForwardStorage _storage;
|
|
private readonly StoreAndForwardService _saf;
|
|
|
|
public NotifyHelperTests()
|
|
{
|
|
var dbName = $"NotifyTests_{Guid.NewGuid():N}";
|
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
|
_keepAlive = new SqliteConnection(connStr);
|
|
_keepAlive.Open();
|
|
|
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
|
var options = new StoreAndForwardOptions
|
|
{
|
|
DefaultRetryInterval = TimeSpan.Zero,
|
|
DefaultMaxRetries = 3,
|
|
RetryTimerInterval = TimeSpan.FromMinutes(10)
|
|
};
|
|
_saf = new StoreAndForwardService(_storage, options, NullLogger<StoreAndForwardService>.Instance);
|
|
}
|
|
|
|
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
|
|
|
public Task DisposeAsync() => Task.CompletedTask;
|
|
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
if (disposing)
|
|
{
|
|
_keepAlive.Dispose();
|
|
}
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
private ScriptRuntimeContext.NotifyHelper CreateHelper(
|
|
IActorRef siteCommunicationActor,
|
|
string? sourceScript = null,
|
|
Guid? executionId = null,
|
|
Guid? parentExecutionId = null)
|
|
{
|
|
return new ScriptRuntimeContext.NotifyHelper(
|
|
_saf,
|
|
siteCommunicationActor,
|
|
"site-7",
|
|
"Plant.Pump3",
|
|
sourceScript,
|
|
TimeSpan.FromSeconds(3),
|
|
NullLogger.Instance,
|
|
executionId ?? Guid.NewGuid(),
|
|
auditWriter: null,
|
|
parentExecutionId: parentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_EnqueuesNotificationIntoStoreAndForward_AndReturnsNotificationIdImmediately()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
// Send returns a non-empty NotificationId string immediately (no central round-trip).
|
|
Assert.False(string.IsNullOrEmpty(notificationId));
|
|
|
|
// Exactly one Notification-category message was buffered for the S&F forwarder.
|
|
var depth = await _saf.GetBufferDepthAsync();
|
|
Assert.Equal(1, depth.GetValueOrDefault(StoreAndForwardCategory.Notification));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_BufferedPayload_CarriesListSubjectBodyAndNotificationId()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
Assert.Equal(StoreAndForwardCategory.Notification, buffered!.Category);
|
|
Assert.Equal("Operators", buffered.Target);
|
|
Assert.Equal("Plant.Pump3", buffered.OriginInstanceName);
|
|
|
|
// The S&F message Id is the NotificationId — the single idempotency key.
|
|
Assert.Equal(notificationId, buffered.Id);
|
|
|
|
// The payload is a NotificationSubmit carrying the same NotificationId and the
|
|
// list / subject / body the script supplied — the shape the forwarder reads.
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered.PayloadJson);
|
|
Assert.NotNull(payload);
|
|
Assert.Equal(notificationId, payload!.NotificationId);
|
|
Assert.Equal("Operators", payload.ListName);
|
|
Assert.Equal("Pump alarm", payload.Subject);
|
|
Assert.Equal("Pump 3 tripped", payload.Body);
|
|
Assert.Equal("Plant.Pump3", payload.SourceInstanceId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_WhenHelperHasSourceScript_StampsItOnTheNotificationSubmit()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref, sourceScript: "ScriptActor:MonitorSpeed");
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
|
|
Assert.NotNull(payload);
|
|
// FU3: the executing script name is threaded down and stamped for the audit trail.
|
|
Assert.Equal("ScriptActor:MonitorSpeed", payload!.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_StampsExecutionId_OnTheNotificationSubmitPayload()
|
|
{
|
|
// Audit Log #23 (ExecutionId Task 5): Notify.Send must stamp the
|
|
// script run's ExecutionId onto the NotificationSubmit so it rides
|
|
// inside the serialized S&F payload to central, where the dispatcher
|
|
// echoes it onto the NotifyDeliver rows. This is the SAME id stamped
|
|
// onto the site-emitted NotifySend audit row.
|
|
var executionId = Guid.NewGuid();
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref, executionId: executionId);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
|
|
Assert.NotNull(payload);
|
|
Assert.Equal(executionId, payload!.OriginExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_StampsParentExecutionId_OnTheNotificationSubmitPayload()
|
|
{
|
|
// Audit Log ParentExecutionId (Task 7): for an inbound-API-routed run,
|
|
// Notify.Send must stamp the routed run's parent ExecutionId onto the
|
|
// NotificationSubmit so it rides inside the serialized S&F payload to
|
|
// central, where the dispatcher echoes it onto the NotifyDeliver rows.
|
|
// This is the SAME parent id stamped onto the site-emitted NotifySend row.
|
|
var parentExecutionId = Guid.NewGuid();
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref, parentExecutionId: parentExecutionId);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
|
|
Assert.NotNull(payload);
|
|
Assert.Equal(parentExecutionId, payload!.OriginParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_NonRoutedRun_LeavesOriginParentExecutionIdNull()
|
|
{
|
|
// Non-routed runs have no parent execution — OriginParentExecutionId
|
|
// stays null on the NotificationSubmit payload.
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref, parentExecutionId: null);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
|
|
Assert.NotNull(payload);
|
|
Assert.Null(payload!.OriginParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Send_WhenHelperHasNoSourceScript_LeavesSourceScriptNull()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref, sourceScript: null);
|
|
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var buffered = await _saf.GetMessageByIdAsync(notificationId);
|
|
Assert.NotNull(buffered);
|
|
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
|
|
Assert.Null(payload!.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Status_WhenStillBufferedAtSite_ReportsForwarding()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref);
|
|
|
|
// Enqueue but never let it forward — the message stays buffered at the site.
|
|
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
|
|
|
|
var statusTask = notify.Status(notificationId);
|
|
|
|
// The status query goes to central; central has no row for an un-forwarded
|
|
// notification, so it answers Found: false.
|
|
var query = await commProbe.ExpectMsgAsync<NotificationStatusQuery>();
|
|
Assert.Equal(notificationId, query.NotificationId);
|
|
commProbe.Reply(new NotificationStatusResponse(
|
|
query.CorrelationId, Found: false, Status: "Unknown",
|
|
RetryCount: 0, LastError: null, DeliveredAt: null));
|
|
|
|
var status = await statusTask;
|
|
|
|
// Found: false AND still in the site S&F buffer → the site-local Forwarding state.
|
|
Assert.Equal("Forwarding", status.Status);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Status_WhenCentralReportsDelivered_MapsTheCentralResponse()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref);
|
|
|
|
var deliveredAt = DateTimeOffset.UtcNow;
|
|
var statusTask = notify.Status("not-buffered-id");
|
|
|
|
var query = await commProbe.ExpectMsgAsync<NotificationStatusQuery>();
|
|
commProbe.Reply(new NotificationStatusResponse(
|
|
query.CorrelationId, Found: true, Status: "Delivered",
|
|
RetryCount: 2, LastError: "earlier transient", DeliveredAt: deliveredAt));
|
|
|
|
var status = await statusTask;
|
|
|
|
Assert.Equal("Delivered", status.Status);
|
|
Assert.Equal(2, status.RetryCount);
|
|
Assert.Equal("earlier transient", status.LastError);
|
|
Assert.Equal(deliveredAt, status.DeliveredAt);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Status_WhenCentralNotFoundAndNotBuffered_ReportsUnknown()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var notify = CreateHelper(commProbe.Ref);
|
|
|
|
var statusTask = notify.Status("never-existed-id");
|
|
|
|
var query = await commProbe.ExpectMsgAsync<NotificationStatusQuery>();
|
|
commProbe.Reply(new NotificationStatusResponse(
|
|
query.CorrelationId, Found: false, Status: "Unknown",
|
|
RetryCount: 0, LastError: null, DeliveredAt: null));
|
|
|
|
var status = await statusTask;
|
|
|
|
// Not at central, not in the site buffer → genuinely unknown, NOT Forwarding.
|
|
Assert.Equal("Unknown", status.Status);
|
|
}
|
|
}
|