From 3326bddeb034ab7f09c5aa9f11ae7b3982ddde8a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 02:30:51 -0400 Subject: [PATCH] feat(notification-outbox): async Notify.Send with status handle Notify.To(list).Send(subject,body) now generates a NotificationId GUID, enqueues a Notification-category message into the site Store-and-Forward Engine, and returns the NotificationId immediately (Task). The NotificationId is the single idempotency key end-to-end: it is the S&F message Id, it is carried inside the buffered NotificationSubmit payload, and it is the id the forwarder submits to central. NotificationForwarder now deserializes the buffered payload as a NotificationSubmit and reads NotificationId from it (re-stamping only the site-owned SourceSiteId / SourceInstanceId), instead of deriving the id from StoreAndForwardMessage.Id. Adds NotifyHelper.Status(id): queries central via the site communication actor; reports the site-local Forwarding state while the notification is still buffered at the site, maps central's response when found, and Unknown otherwise. Adds a NotificationDeliveryStatus record. SiteCommunicationActor gains a NotificationStatusQuery forwarding handler mirroring NotificationSubmit. StoreAndForwardService.EnqueueAsync gains an optional messageId parameter and exposes GetMessageByIdAsync. --- .../Actors/SiteCommunicationActor.cs | 24 ++ .../Actors/ScriptExecutionActor.cs | 19 +- .../Scripts/ScriptRuntimeContext.cs | 214 ++++++++++++++++-- .../NotificationForwarder.cs | 62 ++--- .../StoreAndForwardService.cs | 23 +- .../SiteCommunicationActorTests.cs | 45 ++++ .../IntegrationSurfaceTests.cs | 29 ++- .../Scripts/NotifyHelperTests.cs | 181 +++++++++++++++ .../NotificationForwarderTests.cs | 42 ++-- 9 files changed, 562 insertions(+), 77 deletions(-) create mode 100644 tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 5e5a43d..21094b1 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -190,6 +190,30 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers new ClusterClient.Send("/user/central-communication", msg), Sender); }); + // Notification Outbox: forward a Notify.Status query to the central cluster. + // The original Sender (the Notify helper's Ask) is forwarded as the + // ClusterClient.Send sender so the NotificationStatusResponse routes straight + // back to the waiting Ask, not here. + Receive(msg => + { + if (_centralClient == null) + { + // No ClusterClient registered yet. Reply Found: false so Notify.Status + // falls back to the site S&F buffer to decide Forwarding vs Unknown. + _log.Warning( + "Cannot forward NotificationStatusQuery {0} — no central ClusterClient registered", + msg.NotificationId); + Sender.Tell(new NotificationStatusResponse( + msg.CorrelationId, Found: false, Status: "Unknown", + RetryCount: 0, LastError: null, DeliveredAt: null)); + return; + } + + _log.Debug("Forwarding NotificationStatusQuery {0} to central", msg.NotificationId); + _centralClient.Tell( + new ClusterClient.Send("/user/central-communication", msg), Sender); + }); + // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); diff --git a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs index 233f81b..ce4f6e4 100644 --- a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs @@ -8,6 +8,7 @@ using ScadaLink.Commons.Types; using ScadaLink.HealthMonitoring; using ScadaLink.SiteEventLogging; using ScadaLink.SiteRuntime.Scripts; +using ScadaLink.StoreAndForward; namespace ScadaLink.SiteRuntime.Actors; @@ -78,6 +79,11 @@ public class ScriptExecutionActor : ReceiveActor // starve the global pool and stall Akka dispatchers / HTTP handling. var scheduler = ScriptExecutionScheduler.Shared(options); + // Notification Outbox: the site communication actor that Notify.Status queries + // central through. Resolved by actor path so the Notify helper does not need an + // IActorRef threaded all the way down from the host wiring. + var siteCommunicationActor = Context.System.ActorSelection("/user/site-communication"); + // CTS must be created inside the async lambda so it outlives this method _ = Task.Factory.StartNew(async () => { @@ -91,14 +97,19 @@ public class ScriptExecutionActor : ReceiveActor // Resolve integration services from DI (scoped lifetime) IExternalSystemClient? externalSystemClient = null; IDatabaseGateway? databaseGateway = null; - INotificationDeliveryService? notificationService = null; + // Notification Outbox: the S&F engine is a singleton; the site identity + // provider supplies the site id stamped on enqueued notifications. + StoreAndForwardService? storeAndForward = null; + var siteId = string.Empty; if (serviceProvider != null) { serviceScope = serviceProvider.CreateScope(); externalSystemClient = serviceScope.ServiceProvider.GetService(); databaseGateway = serviceScope.ServiceProvider.GetService(); - notificationService = serviceScope.ServiceProvider.GetService(); + storeAndForward = serviceScope.ServiceProvider.GetService(); + siteId = serviceScope.ServiceProvider.GetService()?.SiteId + ?? string.Empty; } var context = new ScriptRuntimeContext( @@ -112,7 +123,9 @@ public class ScriptExecutionActor : ReceiveActor logger, externalSystemClient, databaseGateway, - notificationService); + storeAndForward, + siteCommunicationActor, + siteId); var globals = new ScriptGlobals { diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index fb9d0ca..1b6b593 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -1,9 +1,13 @@ +using System.Text.Json; using Akka.Actor; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Instance; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.ScriptExecution; using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.StoreAndForward; namespace ScadaLink.SiteRuntime.Scripts; @@ -46,9 +50,21 @@ public class ScriptRuntimeContext private readonly IDatabaseGateway? _databaseGateway; /// - /// WP-13: Notification delivery for Notify.To().Send(). + /// Notification Outbox: the site Store-and-Forward Engine that Notify.Send + /// enqueues notifications into. The S&F engine forwards them to central. /// - private readonly INotificationDeliveryService? _notificationService; + private readonly StoreAndForwardService? _storeAndForward; + + /// + /// Notification Outbox: the site communication actor that Notify.Status + /// queries central through (via the ClusterClient command/control transport). + /// + private readonly ICanTell? _siteCommunicationActor; + + /// + /// Notification Outbox: this site's identifier, stamped on enqueued notifications. + /// + private readonly string _siteId; public ScriptRuntimeContext( IActorRef instanceActor, @@ -61,7 +77,9 @@ public class ScriptRuntimeContext ILogger logger, IExternalSystemClient? externalSystemClient = null, IDatabaseGateway? databaseGateway = null, - INotificationDeliveryService? notificationService = null) + StoreAndForwardService? storeAndForward = null, + ICanTell? siteCommunicationActor = null, + string siteId = "") { _instanceActor = instanceActor; _self = self; @@ -73,7 +91,9 @@ public class ScriptRuntimeContext _logger = logger; _externalSystemClient = externalSystemClient; _databaseGateway = databaseGateway; - _notificationService = notificationService; + _storeAndForward = storeAndForward; + _siteCommunicationActor = siteCommunicationActor; + _siteId = siteId; } /// @@ -183,10 +203,13 @@ public class ScriptRuntimeContext public DatabaseHelper Database => new(_databaseGateway, _instanceName, _logger); /// - /// WP-13: Provides access to notification delivery. - /// Notify.To("listName").Send("subject", "message") + /// Provides access to the Notification Outbox API. + /// Notify.To("listName").Send("subject", "message") enqueues a notification + /// for central delivery and returns its NotificationId; + /// Notify.Status(id) queries the delivery status of that notification. /// - public NotifyHelper Notify => new(_notificationService, _instanceName, _logger); + public NotifyHelper Notify => new( + _storeAndForward, _siteCommunicationActor, _siteId, _instanceName, _askTimeout, _logger); /// /// Helper class for Scripts.CallShared() syntax. @@ -319,54 +342,205 @@ public class ScriptRuntimeContext } /// - /// WP-13: Helper for Notify.To("listName").Send("subject", "message") syntax. + /// Notification Outbox: helper for the Notify script API. + /// + /// In the outbox design the site no longer delivers notification email inline. + /// Notify.To("listName").Send(...) enqueues the notification into the site + /// Store-and-Forward Engine — which forwards it to central — and returns a + /// NotificationId handle immediately. Notify.Status(id) later queries + /// the delivery status of that notification. /// public class NotifyHelper { - private readonly INotificationDeliveryService? _service; + private readonly StoreAndForwardService? _storeAndForward; + private readonly ICanTell? _siteCommunicationActor; + private readonly string _siteId; private readonly string _instanceName; + private readonly TimeSpan _askTimeout; private readonly ILogger _logger; - internal NotifyHelper(INotificationDeliveryService? service, string instanceName, ILogger logger) + internal NotifyHelper( + StoreAndForwardService? storeAndForward, + ICanTell? siteCommunicationActor, + string siteId, + string instanceName, + TimeSpan askTimeout, + ILogger logger) { - _service = service; + _storeAndForward = storeAndForward; + _siteCommunicationActor = siteCommunicationActor; + _siteId = siteId; _instanceName = instanceName; + _askTimeout = askTimeout; _logger = logger; } + /// + /// Selects the notification list to send to. + /// public NotifyTarget To(string listName) { - return new NotifyTarget(listName, _service, _instanceName, _logger); + return new NotifyTarget( + listName, _storeAndForward, _siteId, _instanceName, _logger); + } + + /// + /// Queries the delivery status of a previously-sent notification. + /// + /// The query is issued to central via the site communication actor. While the + /// notification is still buffered in the site Store-and-Forward Engine — central + /// has no row for it yet (Found: false) but the buffer still holds the id — + /// the status is reported as the site-local Forwarding state. If central + /// has a row, its status is mapped through verbatim. If central does not know the + /// id and it is not buffered locally, the status is Unknown. + /// + public async Task Status(string notificationId) + { + if (_siteCommunicationActor == null) + throw new InvalidOperationException( + "Notification status query is not available — site communication actor not wired"); + + var correlationId = Guid.NewGuid().ToString(); + var query = new NotificationStatusQuery(correlationId, notificationId); + + NotificationStatusResponse response; + try + { + response = await _siteCommunicationActor + .Ask(query, _askTimeout); + } + catch (Exception ex) + { + // Central could not be reached. Fall through to the buffer check: if the + // notification is still in the local S&F buffer it is Forwarding. + _logger.LogWarning(ex, + "Notification status query for {NotificationId} did not reach central", + notificationId); + response = new NotificationStatusResponse( + correlationId, Found: false, Status: "Unknown", + RetryCount: 0, LastError: null, DeliveredAt: null); + } + + if (response.Found) + { + return new NotificationDeliveryStatus( + response.Status, response.RetryCount, response.LastError, response.DeliveredAt); + } + + // Central has no row. If the notification is still buffered at the site it + // is in transit — report the site-local Forwarding state. Otherwise it is + // genuinely unknown (never sent, or already forwarded and central lost it). + if (_storeAndForward != null) + { + var buffered = await _storeAndForward.GetMessageByIdAsync(notificationId); + if (buffered != null) + { + return new NotificationDeliveryStatus( + "Forwarding", buffered.RetryCount, buffered.LastError, DeliveredAt: null); + } + } + + return new NotificationDeliveryStatus("Unknown", 0, null, null); } } /// - /// WP-13: Target for Notify.To("listName").Send("subject", "message"). + /// Notification Outbox: target for Notify.To("listName").Send(...). /// public class NotifyTarget { private readonly string _listName; - private readonly INotificationDeliveryService? _service; + private readonly StoreAndForwardService? _storeAndForward; + private readonly string _siteId; private readonly string _instanceName; private readonly ILogger _logger; - internal NotifyTarget(string listName, INotificationDeliveryService? service, string instanceName, ILogger logger) + internal NotifyTarget( + string listName, + StoreAndForwardService? storeAndForward, + string siteId, + string instanceName, + ILogger logger) { _listName = listName; - _service = service; + _storeAndForward = storeAndForward; + _siteId = siteId; _instanceName = instanceName; _logger = logger; } - public async Task Send( + /// + /// Enqueues a notification for central delivery and returns its + /// NotificationId immediately. + /// + /// The notification is buffered into the site Store-and-Forward Engine under the + /// category; the S&F + /// engine's NotificationForwarder forwards it to central and treats + /// central's ack as the delivery outcome. The returned NotificationId is + /// the single idempotency key end-to-end: it is the S&F message id, it is + /// carried inside the buffered payload, and it is the id the forwarder submits to + /// central. Pass it to to track delivery. + /// + public async Task Send( string subject, string message, CancellationToken cancellationToken = default) { - if (_service == null) - throw new InvalidOperationException("Notification service not available"); + if (_storeAndForward == null) + throw new InvalidOperationException( + "Notification store-and-forward engine not available"); - return await _service.SendAsync(_listName, subject, message, _instanceName, cancellationToken); + // The script controls the idempotency key: generate the NotificationId here, + // use it as the S&F message id, and carry it inside the buffered payload so + // the forwarder submits the same id to central on every retry. + var notificationId = Guid.NewGuid().ToString("N"); + + var payload = new NotificationSubmit( + NotificationId: notificationId, + ListName: _listName, + Subject: subject, + Body: message, + // SourceSiteId is re-stamped by the forwarder from its own site id; this + // value is the best-effort site id known to the script runtime. + SourceSiteId: _siteId, + SourceInstanceId: _instanceName, + // SourceScript: the script runtime does not currently thread the script + // name down to the Notify helper; left null until that wiring exists. + SourceScript: null, + SiteEnqueuedAt: DateTimeOffset.UtcNow); + + var payloadJson = JsonSerializer.Serialize(payload); + + // The S&F engine assigns its own GUID to the message; pin the message id to + // the NotificationId so the buffer can be queried by it (Notify.Status) and + // the forwarder's idempotency key matches the buffered row. + await _storeAndForward.EnqueueAsync( + StoreAndForwardCategory.Notification, + target: _listName, + payloadJson: payloadJson, + originInstanceName: _instanceName, + messageId: notificationId); + + _logger.LogDebug( + "Notify enqueued notification {NotificationId} to list '{List}' for central delivery", + notificationId, _listName); + + return notificationId; } } } + +/// +/// Notification Outbox: the delivery status of a notification, as returned to a +/// script by Notify.Status(id). +/// +/// is either a central status (Pending, Retrying, +/// Delivered, Parked, Discarded), the site-local Forwarding +/// state (the notification is still buffered at the site and has not yet been +/// forwarded/acked), or Unknown (no central row and not buffered locally). +/// +public record NotificationDeliveryStatus( + string Status, + int RetryCount, + string? LastError, + DateTimeOffset? DeliveredAt); diff --git a/src/ScadaLink.StoreAndForward/NotificationForwarder.cs b/src/ScadaLink.StoreAndForward/NotificationForwarder.cs index 19bc720..41a9038 100644 --- a/src/ScadaLink.StoreAndForward/NotificationForwarder.cs +++ b/src/ScadaLink.StoreAndForward/NotificationForwarder.cs @@ -87,21 +87,28 @@ public sealed class NotificationForwarder } /// - /// Maps a buffered S&F notification message onto a , - /// returning false if the payload is unreadable. - /// The is the central idempotency - /// key and must be stable across every retry of the same buffered message, so it is - /// derived from — a stable GUID assigned - /// once at enqueue time. + /// Maps a buffered S&F notification message onto the + /// forwarded to central, returning false if the payload is unreadable. + /// + /// The buffered payload IS a serialized written by + /// the site Notify.Send enqueue path (Task 19). Its + /// is the central idempotency key — + /// it was generated by the script, equals the buffered row's + /// , and is stable across every retry. The + /// forwarder forwards the payload as-is except that it re-stamps the fields it + /// authoritatively owns: (this site's + /// id) and (the buffered row's + /// origin instance), and it falls the list name back to the S&F + /// when the payload list name is blank. /// private bool TryBuildSubmit(StoreAndForwardMessage message, out NotificationSubmit submit) { submit = null!; - BufferedNotificationPayload? payload; + NotificationSubmit? payload; try { - payload = JsonSerializer.Deserialize(message.PayloadJson); + payload = JsonSerializer.Deserialize(message.PayloadJson); } catch (JsonException) { @@ -113,30 +120,25 @@ public sealed class NotificationForwarder return false; } - submit = new NotificationSubmit( - NotificationId: message.Id, - // A null OR empty/blank ListName falls back to the S&F Target — matching the - // empty-string guard the former SMTP handler (NotificationDeliveryService) - // applied, so an empty list name is never forwarded to central. - ListName: string.IsNullOrEmpty(payload.ListName) ? message.Target : payload.ListName, - Subject: payload.Subject ?? string.Empty, - Body: payload.Message ?? string.Empty, - SourceSiteId: _sourceSiteId, - SourceInstanceId: message.OriginInstanceName, - // The buffered payload does not currently carry the originating script; - // Task 19 (the enqueue side) will add it. Null until then. - SourceScript: null, - SiteEnqueuedAt: message.CreatedAt); + submit = payload with + { + // The NotificationId is the script-generated idempotency key carried in the + // payload. Defend against a payload missing it by falling back to the + // buffered row id, which the enqueue path pins to the same value. + NotificationId = string.IsNullOrEmpty(payload.NotificationId) + ? message.Id + : payload.NotificationId, + // A null OR empty/blank ListName falls back to the S&F Target — so an empty + // list name is never forwarded to central. + ListName = string.IsNullOrEmpty(payload.ListName) ? message.Target : payload.ListName, + // SourceSiteId/SourceInstanceId are authoritatively owned by the site: the + // forwarder knows the real site id, and the buffered row records the origin + // instance even after the instance is deleted. + SourceSiteId = _sourceSiteId, + SourceInstanceId = message.OriginInstanceName, + }; return true; } - - /// - /// Mirrors the payload shape written by the site notification enqueue path - /// ({ ListName, Subject, Message }). Kept private to this forwarder — Task 19 - /// will reshape the enqueue payload, at which point this is updated alongside it. - /// - private sealed record BufferedNotificationPayload( - string? ListName, string? Subject, string? Message); } /// diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index ab2618c..4a38216 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -149,6 +149,13 @@ public class StoreAndForwardService /// When false, the caller has already made its own delivery attempt and the /// message is buffered directly for the retry sweep (the handler is not invoked here). /// + /// + /// An explicit, caller-supplied message id. null (the default) makes the + /// service mint a fresh GUID. The Notification Outbox enqueue path supplies its own + /// id so the script-generated NotificationId is the single idempotency key — + /// it is the buffered row's , it is carried + /// inside the payload, and it is the id the forwarder submits to central. + /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, @@ -156,11 +163,12 @@ public class StoreAndForwardService string? originInstanceName = null, int? maxRetries = null, TimeSpan? retryInterval = null, - bool attemptImmediateDelivery = true) + bool attemptImmediateDelivery = true, + string? messageId = null) { var message = new StoreAndForwardMessage { - Id = Guid.NewGuid().ToString("N"), + Id = messageId ?? Guid.NewGuid().ToString("N"), Category = category, Target = target, PayloadJson = payloadJson, @@ -430,6 +438,17 @@ public class StoreAndForwardService return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); } + /// + /// Notification Outbox: looks up a buffered message by its id, or null if it + /// is not (or no longer) in the buffer. Notify.Status uses this to detect a + /// notification still in transit at the site — central reports it not-found while + /// the S&F buffer still holds it, which is the site-local Forwarding state. + /// + public async Task GetMessageByIdAsync(string messageId) + { + return await _storage.GetMessageByIdAsync(messageId); + } + /// /// WP-14: Raises the S&F activity notification. StoreAndForward-009: the /// delegate is snapshotted (so a concurrent unsubscribe cannot NRE) and every diff --git a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs index b519a47..5a39f2f 100644 --- a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs @@ -155,6 +155,51 @@ public class SiteCommunicationActorTests : TestKit ExpectMsg(ack => ack.NotificationId == "notif-2" && !ack.Accepted); } + [Fact] + public void NotificationStatusQuery_WithCentralClient_ForwardedToCentralAndResponseRoutedBack() + { + // Notify.Status(id) issues a NotificationStatusQuery; the site actor forwards it + // to central over the ClusterClient command/control transport and the central + // response must route back to the original sender (the helper's Ask). + var dmProbe = CreateTestProbe(); + var centralClientProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new RegisterCentralClient(centralClientProbe.Ref)); + + var query = new NotificationStatusQuery("corr-99", "notif-1"); + siteActor.Tell(query); + + var send = centralClientProbe.FishForMessage( + s => s.Message is NotificationStatusQuery); + Assert.Equal("/user/central-communication", send.Path); + var forwarded = Assert.IsType(send.Message); + Assert.Equal("notif-1", forwarded.NotificationId); + + // The response is sent to the ClusterClient.Send's Sender — replying as that + // probe must land back at the test actor (the original Tell sender). + centralClientProbe.Reply(new NotificationStatusResponse( + "corr-99", Found: true, Status: "Delivered", RetryCount: 0, + LastError: null, DeliveredAt: DateTimeOffset.UtcNow)); + ExpectMsg(r => r.CorrelationId == "corr-99" && r.Found); + } + + [Fact] + public void NotificationStatusQuery_WithoutCentralClient_RepliesWithNotFound() + { + // No ClusterClient registered yet: the query cannot reach central, so the actor + // replies Found: false. Notify.Status then falls back to the site S&F buffer. + var dmProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new NotificationStatusQuery("corr-100", "notif-2")); + + ExpectMsg( + r => r.CorrelationId == "corr-100" && !r.Found); + } + [Fact] public void EventLogQuery_WithoutHandler_ReturnsFailure() { diff --git a/tests/ScadaLink.IntegrationTests/IntegrationSurfaceTests.cs b/tests/ScadaLink.IntegrationTests/IntegrationSurfaceTests.cs index c5b4080..279e5b0 100644 --- a/tests/ScadaLink.IntegrationTests/IntegrationSurfaceTests.cs +++ b/tests/ScadaLink.IntegrationTests/IntegrationSurfaceTests.cs @@ -156,15 +156,26 @@ public class IntegrationSurfaceTests [Fact] public async Task ScriptContext_Notify_Send_Wired() { - var mockNotify = Substitute.For(); - mockNotify.SendAsync("ops", "Alert", "Body", Arg.Any(), Arg.Any()) - .Returns(new NotificationResult(true, null)); + // Notification Outbox: Notify.Send enqueues into the site Store-and-Forward + // Engine and returns the NotificationId handle immediately. + var dbName = $"NotifyWired_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + using var keepAlive = new Microsoft.Data.Sqlite.SqliteConnection(connStr); + keepAlive.Open(); + var storage = new StoreAndForward.StoreAndForwardStorage( + connStr, Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance); + await storage.InitializeAsync(); + var saf = new StoreAndForward.StoreAndForwardService( + storage, new StoreAndForward.StoreAndForwardOptions(), + Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance); - var context = CreateMinimalScriptContext(notificationService: mockNotify); + var context = CreateMinimalScriptContext(storeAndForward: saf); - var result = await context.Notify.To("ops").Send("Alert", "Body"); + var notificationId = await context.Notify.To("ops").Send("Alert", "Body"); - Assert.True(result.Success); + Assert.False(string.IsNullOrEmpty(notificationId)); + var buffered = await saf.GetMessageByIdAsync(notificationId); + Assert.NotNull(buffered); } [Fact] @@ -188,6 +199,7 @@ public class IntegrationSurfaceTests [Fact] public async Task ScriptContext_Notify_NoService_Throws() { + // No Store-and-Forward Engine wired → Notify.Send cannot enqueue and throws. var context = CreateMinimalScriptContext(); await Assert.ThrowsAsync( @@ -197,7 +209,7 @@ public class IntegrationSurfaceTests private static SiteRuntime.Scripts.ScriptRuntimeContext CreateMinimalScriptContext( IExternalSystemClient? externalSystemClient = null, IDatabaseGateway? databaseGateway = null, - INotificationDeliveryService? notificationService = null) + StoreAndForward.StoreAndForwardService? storeAndForward = null) { // Create a minimal context — we use Substitute.For which is fine since // we won't exercise Akka functionality in these tests. @@ -219,6 +231,7 @@ public class IntegrationSurfaceTests logger: Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance, externalSystemClient: externalSystemClient, databaseGateway: databaseGateway, - notificationService: notificationService); + storeAndForward: storeAndForward, + siteId: "test-site"); } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs new file mode 100644 index 0000000..95cf53b --- /dev/null +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs @@ -0,0 +1,181 @@ +using System.Text.Json; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.SiteRuntime.Scripts; +using ScadaLink.StoreAndForward; + +namespace ScadaLink.SiteRuntime.Tests.Scripts; + +/// +/// Notification Outbox (Task 19): tests for the async Notify.Send / +/// Notify.Status script API. +/// +/// In the outbox design Notify.To("list").Send(...) no longer delivers email +/// inline — it generates a stable NotificationId, enqueues a +/// message into the site +/// Store-and-Forward Engine (which Task 18 retargets to forward to central), and +/// returns the NotificationId immediately. Notify.Status(id) queries +/// central for delivery status, reporting the site-local Forwarding state +/// while the notification is still buffered at the site. +/// +public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _saf; + + public NotifyHelperTests() + { + var dbName = $"NotifyTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + RetryTimerInterval = TimeSpan.FromMinutes(10) + }; + _saf = new StoreAndForwardService(_storage, options, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _keepAlive.Dispose(); + } + base.Dispose(disposing); + } + + private ScriptRuntimeContext.NotifyHelper CreateHelper(IActorRef siteCommunicationActor) + { + return new ScriptRuntimeContext.NotifyHelper( + _saf, + siteCommunicationActor, + "site-7", + "Plant.Pump3", + TimeSpan.FromSeconds(3), + NullLogger.Instance); + } + + [Fact] + public async Task Send_EnqueuesNotificationIntoStoreAndForward_AndReturnsNotificationIdImmediately() + { + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref); + + var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); + + // Send returns a non-empty NotificationId string immediately (no central round-trip). + Assert.False(string.IsNullOrEmpty(notificationId)); + + // Exactly one Notification-category message was buffered for the S&F forwarder. + var depth = await _saf.GetBufferDepthAsync(); + Assert.Equal(1, depth.GetValueOrDefault(StoreAndForwardCategory.Notification)); + } + + [Fact] + public async Task Send_BufferedPayload_CarriesListSubjectBodyAndNotificationId() + { + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref); + + var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); + + var buffered = await _saf.GetMessageByIdAsync(notificationId); + Assert.NotNull(buffered); + Assert.Equal(StoreAndForwardCategory.Notification, buffered!.Category); + Assert.Equal("Operators", buffered.Target); + Assert.Equal("Plant.Pump3", buffered.OriginInstanceName); + + // The S&F message Id is the NotificationId — the single idempotency key. + Assert.Equal(notificationId, buffered.Id); + + // The payload is a NotificationSubmit carrying the same NotificationId and the + // list / subject / body the script supplied — the shape the forwarder reads. + var payload = JsonSerializer.Deserialize(buffered.PayloadJson); + Assert.NotNull(payload); + Assert.Equal(notificationId, payload!.NotificationId); + Assert.Equal("Operators", payload.ListName); + Assert.Equal("Pump alarm", payload.Subject); + Assert.Equal("Pump 3 tripped", payload.Body); + Assert.Equal("Plant.Pump3", payload.SourceInstanceId); + } + + [Fact] + public async Task Status_WhenStillBufferedAtSite_ReportsForwarding() + { + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref); + + // Enqueue but never let it forward — the message stays buffered at the site. + var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); + + var statusTask = notify.Status(notificationId); + + // The status query goes to central; central has no row for an un-forwarded + // notification, so it answers Found: false. + var query = await commProbe.ExpectMsgAsync(); + Assert.Equal(notificationId, query.NotificationId); + commProbe.Reply(new NotificationStatusResponse( + query.CorrelationId, Found: false, Status: "Unknown", + RetryCount: 0, LastError: null, DeliveredAt: null)); + + var status = await statusTask; + + // Found: false AND still in the site S&F buffer → the site-local Forwarding state. + Assert.Equal("Forwarding", status.Status); + } + + [Fact] + public async Task Status_WhenCentralReportsDelivered_MapsTheCentralResponse() + { + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref); + + var deliveredAt = DateTimeOffset.UtcNow; + var statusTask = notify.Status("not-buffered-id"); + + var query = await commProbe.ExpectMsgAsync(); + commProbe.Reply(new NotificationStatusResponse( + query.CorrelationId, Found: true, Status: "Delivered", + RetryCount: 2, LastError: "earlier transient", DeliveredAt: deliveredAt)); + + var status = await statusTask; + + Assert.Equal("Delivered", status.Status); + Assert.Equal(2, status.RetryCount); + Assert.Equal("earlier transient", status.LastError); + Assert.Equal(deliveredAt, status.DeliveredAt); + } + + [Fact] + public async Task Status_WhenCentralNotFoundAndNotBuffered_ReportsUnknown() + { + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref); + + var statusTask = notify.Status("never-existed-id"); + + var query = await commProbe.ExpectMsgAsync(); + commProbe.Reply(new NotificationStatusResponse( + query.CorrelationId, Found: false, Status: "Unknown", + RetryCount: 0, LastError: null, DeliveredAt: null)); + + var status = await statusTask; + + // Not at central, not in the site buffer → genuinely unknown, NOT Forwarding. + Assert.Equal("Unknown", status.Status); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs b/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs index 1eec959..822fe33 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/NotificationForwarderTests.cs @@ -18,19 +18,26 @@ public class NotificationForwarderTests : TestKit /// /// Builds a buffered notification S&F message whose payload matches the shape - /// produced by the site NotificationDeliveryService enqueue path. + /// produced by the site Notify.Send enqueue path (Task 19): a serialized + /// carrying a script-generated + /// . The S&F message + /// equals that same id. /// private static StoreAndForwardMessage BufferedNotification( string id = "msg-1", string listName = "Operators", string subject = "Pump alarm", string message = "Pump 3 tripped", - string? originInstance = "Plant.Pump3") + string? originInstance = "Plant.Pump3", string? sourceScript = "alarmScript") { - var payload = JsonSerializer.Serialize(new - { - ListName = listName, - Subject = subject, - Message = message - }); + var payload = JsonSerializer.Serialize(new NotificationSubmit( + NotificationId: id, + ListName: listName, + Subject: subject, + Body: message, + // SourceSiteId is re-stamped by the forwarder; the enqueue side leaves it blank. + SourceSiteId: string.Empty, + SourceInstanceId: originInstance, + SourceScript: sourceScript, + SiteEnqueuedAt: DateTimeOffset.UtcNow)); return new StoreAndForwardMessage { Id = id, @@ -57,11 +64,15 @@ public class NotificationForwarderTests : TestKit // The central target receives a NotificationSubmit whose fields map from the // buffered payload; reply Accepted so the handler completes as delivered. var submit = centralProbe.ExpectMsg(); + Assert.Equal("msg-1", submit.NotificationId); Assert.Equal("Operators", submit.ListName); Assert.Equal("Pump alarm", submit.Subject); Assert.Equal("Pump 3 tripped", submit.Body); + // SourceSiteId is re-stamped by the forwarder from its own site id. Assert.Equal("site-7", submit.SourceSiteId); Assert.Equal("Plant.Pump3", submit.SourceInstanceId); + // The originating script travels through from the buffered payload. + Assert.Equal("alarmScript", submit.SourceScript); centralProbe.Reply(new NotificationSubmitAck(submit.NotificationId, Accepted: true, Error: null)); Assert.True(await deliverTask); @@ -76,12 +87,15 @@ public class NotificationForwarderTests : TestKit // A buffered payload carrying an empty-string ListName: the empty value must not // be forwarded — the forwarder falls back to the S&F message Target instead. - var payload = JsonSerializer.Serialize(new - { - ListName = "", - Subject = "Pump alarm", - Message = "Pump 3 tripped" - }); + var payload = JsonSerializer.Serialize(new NotificationSubmit( + NotificationId: "msg-empty-list", + ListName: "", + Subject: "Pump alarm", + Body: "Pump 3 tripped", + SourceSiteId: string.Empty, + SourceInstanceId: "Plant.Pump3", + SourceScript: null, + SiteEnqueuedAt: DateTimeOffset.UtcNow)); var msg = new StoreAndForwardMessage { Id = "msg-empty-list",