From afdf581e329e481df1a9ff156c493a3016579b2b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 02:51:11 -0400 Subject: [PATCH] feat(notification-outbox): add CommunicationService outbox methods --- .../CommunicationService.cs | 51 +++++++ .../Actors/AkkaHostedService.cs | 4 + .../CommunicationServiceTests.cs | 137 ++++++++++++++++++ 3 files changed, 192 insertions(+) diff --git a/src/ScadaLink.Communication/CommunicationService.cs b/src/ScadaLink.Communication/CommunicationService.cs index fdba5d4..366eb08 100644 --- a/src/ScadaLink.Communication/CommunicationService.cs +++ b/src/ScadaLink.Communication/CommunicationService.cs @@ -8,6 +8,7 @@ using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.InboundApi; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Messages.Lifecycle; +using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.RemoteQuery; using ScadaLink.Communication.Actors; @@ -23,6 +24,7 @@ public class CommunicationService private readonly CommunicationOptions _options; private readonly ILogger _logger; private IActorRef? _centralCommunicationActor; + private IActorRef? _notificationOutboxProxy; public CommunicationService( IOptions options, @@ -40,6 +42,16 @@ public class CommunicationService _centralCommunicationActor = centralCommunicationActor; } + /// + /// Sets the notification-outbox singleton proxy reference. Called during actor + /// system startup. The outbox actor is central-local, so outbox calls Ask this + /// proxy directly (no SiteEnvelope routing). + /// + public void SetNotificationOutbox(IActorRef notificationOutboxProxy) + { + _notificationOutboxProxy = notificationOutboxProxy; + } + /// /// Triggers an immediate refresh of the site address cache from the database. /// @@ -59,6 +71,15 @@ public class CommunicationService private IActorRef GetActor() => GetCommunicationActor(); + /// + /// Gets the notification-outbox proxy reference. Throws if not yet initialized. + /// + private IActorRef GetNotificationOutbox() + { + return _notificationOutboxProxy + ?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set."); + } + // ── Pattern 1: Instance Deployment ── public async Task DeployInstanceAsync( @@ -230,6 +251,36 @@ public class CommunicationService return await GetActor().Ask( envelope, _options.IntegrationTimeout, cancellationToken); } + + // ── Notification Outbox (central-local actor — Asked directly, no SiteEnvelope) ── + + public async Task QueryNotificationOutboxAsync( + NotificationOutboxQueryRequest request, CancellationToken cancellationToken = default) + { + return await GetNotificationOutbox().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task RetryNotificationAsync( + RetryNotificationRequest request, CancellationToken cancellationToken = default) + { + return await GetNotificationOutbox().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task DiscardNotificationAsync( + DiscardNotificationRequest request, CancellationToken cancellationToken = default) + { + return await GetNotificationOutbox().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task GetNotificationKpisAsync( + NotificationKpiRequest request, CancellationToken cancellationToken = default) + { + return await GetNotificationOutbox().Ask( + request, _options.QueryTimeout, cancellationToken); + } } /// diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index e5fcd9e..c0711d8 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -288,6 +288,10 @@ akka {{ // Hand the outbox proxy to the CentralCommunicationActor so forwarded // NotificationSubmit messages from sites are routed to the outbox singleton. centralCommActor.Tell(new RegisterNotificationOutbox(outboxProxy)); + + // Hand the same proxy to the CommunicationService so the Central UI can + // Ask the outbox actor directly (query, retry, discard, KPIs). + commService?.SetNotificationOutbox(outboxProxy); _logger.LogInformation("NotificationOutbox singleton created and registered with CentralCommunicationActor"); _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); diff --git a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs index 6585794..a74bfc4 100644 --- a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs +++ b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs @@ -3,6 +3,7 @@ using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Deployment; +using ScadaLink.Commons.Messages.Notification; namespace ScadaLink.Communication.Tests; @@ -72,6 +73,142 @@ public class CommunicationServiceTests : TestKit Assert.Equal("sha256:applied", response.AppliedRevisionHash); } + // ── Notification Outbox: central-side outbox actor calls ── + + [Fact] + public async Task QueryNotificationOutboxAsync_BeforeOutboxSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.QueryNotificationOutboxAsync( + new NotificationOutboxQueryRequest( + "corr-1", null, null, null, null, false, null, null, null, 1, 50))); + } + + [Fact] + public async Task RetryNotificationAsync_BeforeOutboxSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.RetryNotificationAsync(new RetryNotificationRequest("corr-1", "n1"))); + } + + [Fact] + public async Task DiscardNotificationAsync_BeforeOutboxSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.DiscardNotificationAsync(new DiscardNotificationRequest("corr-1", "n1"))); + } + + [Fact] + public async Task GetNotificationKpisAsync_BeforeOutboxSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.GetNotificationKpisAsync(new NotificationKpiRequest("corr-1"))); + } + + [Fact] + public async Task QueryNotificationOutboxAsync_AsksOutboxProxyDirectly() + { + // The outbox actor is central-local: the request must be Asked directly + // to the outbox proxy (no SiteEnvelope wrapping). + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetNotificationOutbox(probe.Ref); + + var request = new NotificationOutboxQueryRequest( + "corr-q", "Pending", null, null, null, true, "alarm", null, null, 2, 25); + var task = service.QueryNotificationOutboxAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new NotificationOutboxQueryResponse( + "corr-q", true, null, Array.Empty(), 0); + probe.Reply(reply); + + Assert.Same(reply, await task); + } + + [Fact] + public async Task RetryNotificationAsync_AsksOutboxProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetNotificationOutbox(probe.Ref); + + var request = new RetryNotificationRequest("corr-r", "n-7"); + var task = service.RetryNotificationAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new RetryNotificationResponse("corr-r", true, null); + probe.Reply(reply); + + Assert.Same(reply, await task); + } + + [Fact] + public async Task DiscardNotificationAsync_AsksOutboxProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetNotificationOutbox(probe.Ref); + + var request = new DiscardNotificationRequest("corr-d", "n-9"); + var task = service.DiscardNotificationAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new DiscardNotificationResponse("corr-d", false, "already delivered"); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.False(result.Success); + } + + [Fact] + public async Task GetNotificationKpisAsync_AsksOutboxProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetNotificationOutbox(probe.Ref); + + var request = new NotificationKpiRequest("corr-k"); + var task = service.GetNotificationKpisAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new NotificationKpiResponse("corr-k", true, null, 3, 1, 0, 12, TimeSpan.FromMinutes(5)); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.Equal(3, result.QueueDepth); + } + /// /// Stand-in for CentralCommunicationActor: verifies the message is wrapped /// in a SiteEnvelope targeting the requested site and replies with a typed