286 lines
12 KiB
C#
286 lines
12 KiB
C#
using Akka.Actor;
|
|
using Akka.Cluster.Tools.Client;
|
|
using Akka.TestKit.Xunit2;
|
|
using ScadaLink.Commons.Messages.Deployment;
|
|
using ScadaLink.Commons.Messages.Lifecycle;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Messages.Notification;
|
|
using ScadaLink.Commons.Messages.RemoteQuery;
|
|
using ScadaLink.Communication.Actors;
|
|
|
|
namespace ScadaLink.Communication.Tests;
|
|
|
|
/// <summary>
|
|
/// WP-4: Tests for SiteCommunicationActor message routing to local actors.
|
|
/// </summary>
|
|
public class SiteCommunicationActorTests : TestKit
|
|
{
|
|
private readonly CommunicationOptions _options = new();
|
|
|
|
public SiteCommunicationActorTests()
|
|
: base(@"akka.loglevel = DEBUG")
|
|
{
|
|
}
|
|
|
|
[Fact]
|
|
public void DeployCommand_ForwardedToDeploymentManager()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
var command = new DeployInstanceCommand(
|
|
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
|
|
siteActor.Tell(command);
|
|
|
|
dmProbe.ExpectMsg<DeployInstanceCommand>(msg => msg.DeploymentId == "dep1");
|
|
}
|
|
|
|
[Fact]
|
|
public void LifecycleCommands_ForwardedToDeploymentManager()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new DisableInstanceCommand("cmd1", "inst1", DateTimeOffset.UtcNow));
|
|
dmProbe.ExpectMsg<DisableInstanceCommand>();
|
|
|
|
siteActor.Tell(new EnableInstanceCommand("cmd2", "inst1", DateTimeOffset.UtcNow));
|
|
dmProbe.ExpectMsg<EnableInstanceCommand>();
|
|
|
|
siteActor.Tell(new DeleteInstanceCommand("cmd3", "inst1", DateTimeOffset.UtcNow));
|
|
dmProbe.ExpectMsg<DeleteInstanceCommand>();
|
|
}
|
|
|
|
[Fact]
|
|
public void DeploymentStateQuery_ForwardedToDeploymentManager()
|
|
{
|
|
// DeploymentManager-006: the site-before-redeploy query travels over the
|
|
// ClusterClient command/control transport and is routed to the local
|
|
// Deployment Manager, which owns the deployed-config store.
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
var request = new DeploymentStateQueryRequest("corr-q", "inst1", DateTimeOffset.UtcNow);
|
|
siteActor.Tell(request);
|
|
|
|
dmProbe.ExpectMsg<DeploymentStateQueryRequest>(msg => msg.CorrelationId == "corr-q");
|
|
}
|
|
|
|
[Fact]
|
|
public void IntegrationCall_WithoutHandler_ReturnsFailure()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
var request = new IntegrationCallRequest(
|
|
"corr1", "site1", "inst1", "ExtSys1", "GetData",
|
|
new Dictionary<string, object?>(), DateTimeOffset.UtcNow);
|
|
|
|
siteActor.Tell(request);
|
|
|
|
ExpectMsg<IntegrationCallResponse>(msg =>
|
|
!msg.Success && msg.ErrorMessage == "Integration handler not available");
|
|
}
|
|
|
|
[Fact]
|
|
public void IntegrationCall_WithHandler_ForwardedToHandler()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var handlerProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
// Register integration handler
|
|
siteActor.Tell(new RegisterLocalHandler(LocalHandlerType.Integration, handlerProbe.Ref));
|
|
|
|
var request = new IntegrationCallRequest(
|
|
"corr1", "site1", "inst1", "ExtSys1", "GetData",
|
|
new Dictionary<string, object?>(), DateTimeOffset.UtcNow);
|
|
|
|
siteActor.Tell(request);
|
|
handlerProbe.ExpectMsg<IntegrationCallRequest>(msg => msg.CorrelationId == "corr1");
|
|
}
|
|
|
|
[Fact]
|
|
public void NotificationSubmit_WithCentralClient_ForwardedToCentralAndAckRoutedBack()
|
|
{
|
|
// The site forwards a buffered notification to central over the ClusterClient
|
|
// command/control transport; the central ack must route back to the original
|
|
// sender (the S&F forwarder's Ask), not to the SiteCommunicationActor.
|
|
var dmProbe = CreateTestProbe();
|
|
var centralClientProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new RegisterCentralClient(centralClientProbe.Ref));
|
|
|
|
var submit = new NotificationSubmit(
|
|
"notif-1", "Operators", "Subj", "Body", "site1", "inst1", "alarmScript",
|
|
DateTimeOffset.UtcNow);
|
|
siteActor.Tell(submit);
|
|
|
|
// Central client (acting as ClusterClient) receives a ClusterClient.Send wrapping
|
|
// the NotificationSubmit, addressed to the central communication actor. Fish past
|
|
// any periodic HeartbeatMessage the actor's timer may interleave.
|
|
var send = centralClientProbe.FishForMessage<ClusterClient.Send>(
|
|
s => s.Message is NotificationSubmit);
|
|
Assert.Equal("/user/central-communication", send.Path);
|
|
var forwarded = Assert.IsType<NotificationSubmit>(send.Message);
|
|
Assert.Equal("notif-1", forwarded.NotificationId);
|
|
|
|
// The ack is sent to the ClusterClient.Send's Sender — replying as that probe
|
|
// must land back at the test actor (the original Tell sender).
|
|
centralClientProbe.Reply(new NotificationSubmitAck("notif-1", Accepted: true, Error: null));
|
|
ExpectMsg<NotificationSubmitAck>(ack => ack.NotificationId == "notif-1" && ack.Accepted);
|
|
}
|
|
|
|
[Fact]
|
|
public void NotificationSubmit_WithoutCentralClient_RepliesWithNonAccepted()
|
|
{
|
|
// No ClusterClient registered yet: the submit cannot be forwarded, so the actor
|
|
// replies with a non-accepted ack and the S&F forwarder treats it as transient.
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
var submit = new NotificationSubmit(
|
|
"notif-2", "Operators", "Subj", "Body", "site1", null, null,
|
|
DateTimeOffset.UtcNow);
|
|
siteActor.Tell(submit);
|
|
|
|
ExpectMsg<NotificationSubmitAck>(ack => ack.NotificationId == "notif-2" && !ack.Accepted);
|
|
}
|
|
|
|
[Fact]
|
|
public void NotificationStatusQuery_WithCentralClient_ForwardedToCentralAndResponseRoutedBack()
|
|
{
|
|
// Notify.Status(id) issues a NotificationStatusQuery; the site actor forwards it
|
|
// to central over the ClusterClient command/control transport and the central
|
|
// response must route back to the original sender (the helper's Ask).
|
|
var dmProbe = CreateTestProbe();
|
|
var centralClientProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new RegisterCentralClient(centralClientProbe.Ref));
|
|
|
|
var query = new NotificationStatusQuery("corr-99", "notif-1");
|
|
siteActor.Tell(query);
|
|
|
|
var send = centralClientProbe.FishForMessage<ClusterClient.Send>(
|
|
s => s.Message is NotificationStatusQuery);
|
|
Assert.Equal("/user/central-communication", send.Path);
|
|
var forwarded = Assert.IsType<NotificationStatusQuery>(send.Message);
|
|
Assert.Equal("notif-1", forwarded.NotificationId);
|
|
|
|
// The response is sent to the ClusterClient.Send's Sender — replying as that
|
|
// probe must land back at the test actor (the original Tell sender).
|
|
centralClientProbe.Reply(new NotificationStatusResponse(
|
|
"corr-99", Found: true, Status: "Delivered", RetryCount: 0,
|
|
LastError: null, DeliveredAt: DateTimeOffset.UtcNow));
|
|
ExpectMsg<NotificationStatusResponse>(r => r.CorrelationId == "corr-99" && r.Found);
|
|
}
|
|
|
|
[Fact]
|
|
public void NotificationStatusQuery_WithoutCentralClient_RepliesWithNotFound()
|
|
{
|
|
// No ClusterClient registered yet: the query cannot reach central, so the actor
|
|
// replies Found: false. Notify.Status then falls back to the site S&F buffer.
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new NotificationStatusQuery("corr-100", "notif-2"));
|
|
|
|
ExpectMsg<NotificationStatusResponse>(
|
|
r => r.CorrelationId == "corr-100" && !r.Found);
|
|
}
|
|
|
|
[Fact]
|
|
public void EventLogQuery_WithoutHandler_ReturnsFailure()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
var request = new EventLogQueryRequest(
|
|
"corr1", "site1", null, null, null, null, null, null, null, 25, DateTimeOffset.UtcNow);
|
|
|
|
siteActor.Tell(request);
|
|
|
|
ExpectMsg<EventLogQueryResponse>(msg => !msg.Success);
|
|
}
|
|
|
|
// ── Task 5 (#22): central→site Retry/Discard relay for parked cached calls ──
|
|
|
|
[Fact]
|
|
public void RetryParkedOperation_WithHandler_ForwardedToParkedMessageHandler()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var handlerProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, handlerProbe.Ref));
|
|
|
|
var id = Commons.Types.TrackedOperationId.New();
|
|
siteActor.Tell(new RetryParkedOperation("corr-rp", id));
|
|
|
|
handlerProbe.ExpectMsg<RetryParkedOperation>(msg =>
|
|
msg.CorrelationId == "corr-rp" && msg.TrackedOperationId.Equals(id));
|
|
}
|
|
|
|
[Fact]
|
|
public void DiscardParkedOperation_WithHandler_ForwardedToParkedMessageHandler()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var handlerProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, handlerProbe.Ref));
|
|
|
|
var id = Commons.Types.TrackedOperationId.New();
|
|
siteActor.Tell(new DiscardParkedOperation("corr-dp", id));
|
|
|
|
handlerProbe.ExpectMsg<DiscardParkedOperation>(msg =>
|
|
msg.CorrelationId == "corr-dp" && msg.TrackedOperationId.Equals(id));
|
|
}
|
|
|
|
[Fact]
|
|
public void RetryParkedOperation_WithoutHandler_RepliesNotAppliedAck()
|
|
{
|
|
// No parked-message handler registered — the relay must get a definitive
|
|
// non-applied ack, not silence (the SiteCallAuditActor's Ask must not
|
|
// hang and then mis-report site-unreachable when the site IS reachable).
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new RetryParkedOperation("corr-no-handler", Commons.Types.TrackedOperationId.New()));
|
|
|
|
var ack = ExpectMsg<ParkedOperationActionAck>();
|
|
Assert.Equal("corr-no-handler", ack.CorrelationId);
|
|
Assert.False(ack.Applied);
|
|
Assert.NotNull(ack.ErrorMessage);
|
|
}
|
|
|
|
[Fact]
|
|
public void DiscardParkedOperation_WithoutHandler_RepliesNotAppliedAck()
|
|
{
|
|
var dmProbe = CreateTestProbe();
|
|
var siteActor = Sys.ActorOf(Props.Create(() =>
|
|
new SiteCommunicationActor("site1", _options, dmProbe.Ref)));
|
|
|
|
siteActor.Tell(new DiscardParkedOperation("corr-no-handler", Commons.Types.TrackedOperationId.New()));
|
|
|
|
var ack = ExpectMsg<ParkedOperationActionAck>();
|
|
Assert.False(ack.Applied);
|
|
Assert.NotNull(ack.ErrorMessage);
|
|
}
|
|
}
|