feat(notification-outbox): add CommunicationService outbox methods
This commit is contained in:
@@ -8,6 +8,7 @@ using ScadaLink.Commons.Messages.Health;
|
|||||||
using ScadaLink.Commons.Messages.InboundApi;
|
using ScadaLink.Commons.Messages.InboundApi;
|
||||||
using ScadaLink.Commons.Messages.Integration;
|
using ScadaLink.Commons.Messages.Integration;
|
||||||
using ScadaLink.Commons.Messages.Lifecycle;
|
using ScadaLink.Commons.Messages.Lifecycle;
|
||||||
|
using ScadaLink.Commons.Messages.Notification;
|
||||||
using ScadaLink.Commons.Messages.RemoteQuery;
|
using ScadaLink.Commons.Messages.RemoteQuery;
|
||||||
using ScadaLink.Communication.Actors;
|
using ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
@@ -23,6 +24,7 @@ public class CommunicationService
|
|||||||
private readonly CommunicationOptions _options;
|
private readonly CommunicationOptions _options;
|
||||||
private readonly ILogger<CommunicationService> _logger;
|
private readonly ILogger<CommunicationService> _logger;
|
||||||
private IActorRef? _centralCommunicationActor;
|
private IActorRef? _centralCommunicationActor;
|
||||||
|
private IActorRef? _notificationOutboxProxy;
|
||||||
|
|
||||||
public CommunicationService(
|
public CommunicationService(
|
||||||
IOptions<CommunicationOptions> options,
|
IOptions<CommunicationOptions> options,
|
||||||
@@ -40,6 +42,16 @@ public class CommunicationService
|
|||||||
_centralCommunicationActor = centralCommunicationActor;
|
_centralCommunicationActor = centralCommunicationActor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets the notification-outbox singleton proxy reference. Called during actor
|
||||||
|
/// system startup. The outbox actor is central-local, so outbox calls Ask this
|
||||||
|
/// proxy directly (no SiteEnvelope routing).
|
||||||
|
/// </summary>
|
||||||
|
public void SetNotificationOutbox(IActorRef notificationOutboxProxy)
|
||||||
|
{
|
||||||
|
_notificationOutboxProxy = notificationOutboxProxy;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Triggers an immediate refresh of the site address cache from the database.
|
/// Triggers an immediate refresh of the site address cache from the database.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -59,6 +71,15 @@ public class CommunicationService
|
|||||||
|
|
||||||
private IActorRef GetActor() => GetCommunicationActor();
|
private IActorRef GetActor() => GetCommunicationActor();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the notification-outbox proxy reference. Throws if not yet initialized.
|
||||||
|
/// </summary>
|
||||||
|
private IActorRef GetNotificationOutbox()
|
||||||
|
{
|
||||||
|
return _notificationOutboxProxy
|
||||||
|
?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set.");
|
||||||
|
}
|
||||||
|
|
||||||
// ── Pattern 1: Instance Deployment ──
|
// ── Pattern 1: Instance Deployment ──
|
||||||
|
|
||||||
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
|
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
|
||||||
@@ -230,6 +251,36 @@ public class CommunicationService
|
|||||||
return await GetActor().Ask<RouteToSetAttributesResponse>(
|
return await GetActor().Ask<RouteToSetAttributesResponse>(
|
||||||
envelope, _options.IntegrationTimeout, cancellationToken);
|
envelope, _options.IntegrationTimeout, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Notification Outbox (central-local actor — Asked directly, no SiteEnvelope) ──
|
||||||
|
|
||||||
|
public async Task<NotificationOutboxQueryResponse> QueryNotificationOutboxAsync(
|
||||||
|
NotificationOutboxQueryRequest request, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return await GetNotificationOutbox().Ask<NotificationOutboxQueryResponse>(
|
||||||
|
request, _options.QueryTimeout, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<RetryNotificationResponse> RetryNotificationAsync(
|
||||||
|
RetryNotificationRequest request, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return await GetNotificationOutbox().Ask<RetryNotificationResponse>(
|
||||||
|
request, _options.QueryTimeout, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<DiscardNotificationResponse> DiscardNotificationAsync(
|
||||||
|
DiscardNotificationRequest request, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return await GetNotificationOutbox().Ask<DiscardNotificationResponse>(
|
||||||
|
request, _options.QueryTimeout, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<NotificationKpiResponse> GetNotificationKpisAsync(
|
||||||
|
NotificationKpiRequest request, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return await GetNotificationOutbox().Ask<NotificationKpiResponse>(
|
||||||
|
request, _options.QueryTimeout, cancellationToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -288,6 +288,10 @@ akka {{
|
|||||||
// Hand the outbox proxy to the CentralCommunicationActor so forwarded
|
// Hand the outbox proxy to the CentralCommunicationActor so forwarded
|
||||||
// NotificationSubmit messages from sites are routed to the outbox singleton.
|
// NotificationSubmit messages from sites are routed to the outbox singleton.
|
||||||
centralCommActor.Tell(new RegisterNotificationOutbox(outboxProxy));
|
centralCommActor.Tell(new RegisterNotificationOutbox(outboxProxy));
|
||||||
|
|
||||||
|
// Hand the same proxy to the CommunicationService so the Central UI can
|
||||||
|
// Ask the outbox actor directly (query, retry, discard, KPIs).
|
||||||
|
commService?.SetNotificationOutbox(outboxProxy);
|
||||||
_logger.LogInformation("NotificationOutbox singleton created and registered with CentralCommunicationActor");
|
_logger.LogInformation("NotificationOutbox singleton created and registered with CentralCommunicationActor");
|
||||||
|
|
||||||
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
|
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using Akka.TestKit.Xunit2;
|
|||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using ScadaLink.Commons.Messages.Deployment;
|
using ScadaLink.Commons.Messages.Deployment;
|
||||||
|
using ScadaLink.Commons.Messages.Notification;
|
||||||
|
|
||||||
namespace ScadaLink.Communication.Tests;
|
namespace ScadaLink.Communication.Tests;
|
||||||
|
|
||||||
@@ -72,6 +73,142 @@ public class CommunicationServiceTests : TestKit
|
|||||||
Assert.Equal("sha256:applied", response.AppliedRevisionHash);
|
Assert.Equal("sha256:applied", response.AppliedRevisionHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Notification Outbox: central-side outbox actor calls ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryNotificationOutboxAsync_BeforeOutboxSet_Throws()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
||||||
|
service.QueryNotificationOutboxAsync(
|
||||||
|
new NotificationOutboxQueryRequest(
|
||||||
|
"corr-1", null, null, null, null, false, null, null, null, 1, 50)));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RetryNotificationAsync_BeforeOutboxSet_Throws()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
||||||
|
service.RetryNotificationAsync(new RetryNotificationRequest("corr-1", "n1")));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DiscardNotificationAsync_BeforeOutboxSet_Throws()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
||||||
|
service.DiscardNotificationAsync(new DiscardNotificationRequest("corr-1", "n1")));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task GetNotificationKpisAsync_BeforeOutboxSet_Throws()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
||||||
|
service.GetNotificationKpisAsync(new NotificationKpiRequest("corr-1")));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryNotificationOutboxAsync_AsksOutboxProxyDirectly()
|
||||||
|
{
|
||||||
|
// The outbox actor is central-local: the request must be Asked directly
|
||||||
|
// to the outbox proxy (no SiteEnvelope wrapping).
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
service.SetNotificationOutbox(probe.Ref);
|
||||||
|
|
||||||
|
var request = new NotificationOutboxQueryRequest(
|
||||||
|
"corr-q", "Pending", null, null, null, true, "alarm", null, null, 2, 25);
|
||||||
|
var task = service.QueryNotificationOutboxAsync(request);
|
||||||
|
|
||||||
|
var received = probe.ExpectMsg<NotificationOutboxQueryRequest>();
|
||||||
|
Assert.Same(request, received);
|
||||||
|
var reply = new NotificationOutboxQueryResponse(
|
||||||
|
"corr-q", true, null, Array.Empty<NotificationSummary>(), 0);
|
||||||
|
probe.Reply(reply);
|
||||||
|
|
||||||
|
Assert.Same(reply, await task);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RetryNotificationAsync_AsksOutboxProxyDirectly()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
service.SetNotificationOutbox(probe.Ref);
|
||||||
|
|
||||||
|
var request = new RetryNotificationRequest("corr-r", "n-7");
|
||||||
|
var task = service.RetryNotificationAsync(request);
|
||||||
|
|
||||||
|
var received = probe.ExpectMsg<RetryNotificationRequest>();
|
||||||
|
Assert.Same(request, received);
|
||||||
|
var reply = new RetryNotificationResponse("corr-r", true, null);
|
||||||
|
probe.Reply(reply);
|
||||||
|
|
||||||
|
Assert.Same(reply, await task);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DiscardNotificationAsync_AsksOutboxProxyDirectly()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
service.SetNotificationOutbox(probe.Ref);
|
||||||
|
|
||||||
|
var request = new DiscardNotificationRequest("corr-d", "n-9");
|
||||||
|
var task = service.DiscardNotificationAsync(request);
|
||||||
|
|
||||||
|
var received = probe.ExpectMsg<DiscardNotificationRequest>();
|
||||||
|
Assert.Same(request, received);
|
||||||
|
var reply = new DiscardNotificationResponse("corr-d", false, "already delivered");
|
||||||
|
probe.Reply(reply);
|
||||||
|
|
||||||
|
var result = await task;
|
||||||
|
Assert.Same(reply, result);
|
||||||
|
Assert.False(result.Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task GetNotificationKpisAsync_AsksOutboxProxyDirectly()
|
||||||
|
{
|
||||||
|
var service = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
service.SetNotificationOutbox(probe.Ref);
|
||||||
|
|
||||||
|
var request = new NotificationKpiRequest("corr-k");
|
||||||
|
var task = service.GetNotificationKpisAsync(request);
|
||||||
|
|
||||||
|
var received = probe.ExpectMsg<NotificationKpiRequest>();
|
||||||
|
Assert.Same(request, received);
|
||||||
|
var reply = new NotificationKpiResponse("corr-k", true, null, 3, 1, 0, 12, TimeSpan.FromMinutes(5));
|
||||||
|
probe.Reply(reply);
|
||||||
|
|
||||||
|
var result = await task;
|
||||||
|
Assert.Same(reply, result);
|
||||||
|
Assert.Equal(3, result.QueueDepth);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Stand-in for CentralCommunicationActor: verifies the message is wrapped
|
/// Stand-in for CentralCommunicationActor: verifies the message is wrapped
|
||||||
/// in a SiteEnvelope targeting the requested site and replies with a typed
|
/// in a SiteEnvelope targeting the requested site and replies with a typed
|
||||||
|
|||||||
Reference in New Issue
Block a user