using System.Text.Json; using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Types.Enums; using ScadaLink.SiteRuntime.Scripts; using ScadaLink.StoreAndForward; namespace ScadaLink.SiteRuntime.Tests.Scripts; /// /// Notification Outbox (Task 19): tests for the async Notify.Send / /// Notify.Status script API. /// /// In the outbox design Notify.To("list").Send(...) no longer delivers email /// inline — it generates a stable NotificationId, enqueues a /// message into the site /// Store-and-Forward Engine (which Task 18 retargets to forward to central), and /// returns the NotificationId immediately. Notify.Status(id) queries /// central for delivery status, reporting the site-local Forwarding state /// while the notification is still buffered at the site. /// public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _saf; public NotifyHelperTests() { var dbName = $"NotifyTests_{Guid.NewGuid():N}"; var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); var options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 3, RetryTimerInterval = TimeSpan.FromMinutes(10) }; _saf = new StoreAndForwardService(_storage, options, NullLogger.Instance); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; protected override void Dispose(bool disposing) { if (disposing) { _keepAlive.Dispose(); } base.Dispose(disposing); } private ScriptRuntimeContext.NotifyHelper CreateHelper( IActorRef siteCommunicationActor, string? sourceScript = null) { return new ScriptRuntimeContext.NotifyHelper( _saf, siteCommunicationActor, "site-7", "Plant.Pump3", sourceScript, TimeSpan.FromSeconds(3), NullLogger.Instance); } [Fact] public async Task Send_EnqueuesNotificationIntoStoreAndForward_AndReturnsNotificationIdImmediately() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref); var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); // Send returns a non-empty NotificationId string immediately (no central round-trip). Assert.False(string.IsNullOrEmpty(notificationId)); // Exactly one Notification-category message was buffered for the S&F forwarder. var depth = await _saf.GetBufferDepthAsync(); Assert.Equal(1, depth.GetValueOrDefault(StoreAndForwardCategory.Notification)); } [Fact] public async Task Send_BufferedPayload_CarriesListSubjectBodyAndNotificationId() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref); var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); var buffered = await _saf.GetMessageByIdAsync(notificationId); Assert.NotNull(buffered); Assert.Equal(StoreAndForwardCategory.Notification, buffered!.Category); Assert.Equal("Operators", buffered.Target); Assert.Equal("Plant.Pump3", buffered.OriginInstanceName); // The S&F message Id is the NotificationId — the single idempotency key. Assert.Equal(notificationId, buffered.Id); // The payload is a NotificationSubmit carrying the same NotificationId and the // list / subject / body the script supplied — the shape the forwarder reads. var payload = JsonSerializer.Deserialize(buffered.PayloadJson); Assert.NotNull(payload); Assert.Equal(notificationId, payload!.NotificationId); Assert.Equal("Operators", payload.ListName); Assert.Equal("Pump alarm", payload.Subject); Assert.Equal("Pump 3 tripped", payload.Body); Assert.Equal("Plant.Pump3", payload.SourceInstanceId); } [Fact] public async Task Send_WhenHelperHasSourceScript_StampsItOnTheNotificationSubmit() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref, sourceScript: "ScriptActor:MonitorSpeed"); var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); var buffered = await _saf.GetMessageByIdAsync(notificationId); Assert.NotNull(buffered); var payload = JsonSerializer.Deserialize(buffered!.PayloadJson); Assert.NotNull(payload); // FU3: the executing script name is threaded down and stamped for the audit trail. Assert.Equal("ScriptActor:MonitorSpeed", payload!.SourceScript); } [Fact] public async Task Send_WhenHelperHasNoSourceScript_LeavesSourceScriptNull() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref, sourceScript: null); var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); var buffered = await _saf.GetMessageByIdAsync(notificationId); Assert.NotNull(buffered); var payload = JsonSerializer.Deserialize(buffered!.PayloadJson); Assert.Null(payload!.SourceScript); } [Fact] public async Task Status_WhenStillBufferedAtSite_ReportsForwarding() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref); // Enqueue but never let it forward — the message stays buffered at the site. var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); var statusTask = notify.Status(notificationId); // The status query goes to central; central has no row for an un-forwarded // notification, so it answers Found: false. var query = await commProbe.ExpectMsgAsync(); Assert.Equal(notificationId, query.NotificationId); commProbe.Reply(new NotificationStatusResponse( query.CorrelationId, Found: false, Status: "Unknown", RetryCount: 0, LastError: null, DeliveredAt: null)); var status = await statusTask; // Found: false AND still in the site S&F buffer → the site-local Forwarding state. Assert.Equal("Forwarding", status.Status); } [Fact] public async Task Status_WhenCentralReportsDelivered_MapsTheCentralResponse() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref); var deliveredAt = DateTimeOffset.UtcNow; var statusTask = notify.Status("not-buffered-id"); var query = await commProbe.ExpectMsgAsync(); commProbe.Reply(new NotificationStatusResponse( query.CorrelationId, Found: true, Status: "Delivered", RetryCount: 2, LastError: "earlier transient", DeliveredAt: deliveredAt)); var status = await statusTask; Assert.Equal("Delivered", status.Status); Assert.Equal(2, status.RetryCount); Assert.Equal("earlier transient", status.LastError); Assert.Equal(deliveredAt, status.DeliveredAt); } [Fact] public async Task Status_WhenCentralNotFoundAndNotBuffered_ReportsUnknown() { var commProbe = CreateTestProbe(); var notify = CreateHelper(commProbe.Ref); var statusTask = notify.Status("never-existed-id"); var query = await commProbe.ExpectMsgAsync(); commProbe.Reply(new NotificationStatusResponse( query.CorrelationId, Found: false, Status: "Unknown", RetryCount: 0, LastError: null, DeliveredAt: null)); var status = await statusTask; // Not at central, not in the site buffer → genuinely unknown, NOT Forwarding. Assert.Equal("Unknown", status.Status); } }