using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
using ScadaLink.Commons.Entities.Sites;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Communication.Actors;
using ScadaLink.HealthMonitoring;
using Akka.TestKit;
namespace ScadaLink.Communication.Tests;
///
/// Tests for CentralCommunicationActor with per-site ClusterClient routing.
/// WP-4: Message routing via ClusterClient instances created per site.
/// WP-5: Connection failure and failover handling.
///
public class CentralCommunicationActorTests : TestKit
{
public CentralCommunicationActorTests() : base(@"akka.loglevel = DEBUG") { }
private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes) CreateActorWithMockRepo(
IEnumerable? sites = null)
{
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(sites?.ToList() ?? new List());
var services = new ServiceCollection();
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
var siteProbes = new Dictionary();
var mockFactory = Substitute.For();
mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>())
.Returns(callInfo =>
{
var siteId = callInfo.ArgAt(1);
var probe = CreateTestProbe();
siteProbes[siteId] = probe;
return probe.Ref;
});
var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
return (actor, mockRepo, siteProbes);
}
private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes, ISiteClientFactory mockFactory) CreateActorWithFactory(
IEnumerable? sites = null)
{
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(sites?.ToList() ?? new List());
var services = new ServiceCollection();
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
var siteProbes = new Dictionary();
var mockFactory = Substitute.For();
mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>())
.Returns(callInfo =>
{
var siteId = callInfo.ArgAt(1);
var probe = CreateTestProbe();
siteProbes[siteId] = probe;
return probe.Ref;
});
var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
return (actor, mockRepo, siteProbes, mockFactory);
}
private Site CreateSite(string identifier, string? nodeAAddress, string? nodeBAddress = null) =>
new("Test Site", identifier) { NodeAAddress = nodeAAddress, NodeBAddress = nodeBAddress };
[Fact]
public void ClusterClientRouting_RoutesToConfiguredSite()
{
var site = CreateSite("site1", "akka.tcp://scadalink@host:8082");
var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site });
// Wait for auto-refresh (PreStart schedules with TimeSpan.Zero initial delay)
Thread.Sleep(1000);
var command = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site1", command));
// The site1 probe (acting as ClusterClient) should receive a ClusterClient.Send
var msg = siteProbes["site1"].ExpectMsg();
Assert.Equal("/user/site-communication", msg.Path);
Assert.IsType(msg.Message);
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
}
[Fact]
public void UnconfiguredSite_MessageIsDropped()
{
var (actor, _, _) = CreateActorWithMockRepo();
// Wait for auto-refresh
Thread.Sleep(1000);
var command = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("unknown-site", command));
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
}
[Fact]
public void ConnectionLost_DebugStreamsKilled()
{
var site = CreateSite("site1", "akka.tcp://scadalink@host:8082");
var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site });
// Wait for auto-refresh
Thread.Sleep(1000);
// Subscribe to debug view (tracks the subscription)
var subscriberProbe = CreateTestProbe();
var subRequest = new SubscribeDebugViewRequest("inst1", "corr-123");
actor.Tell(new SiteEnvelope("site1", subRequest), subscriberProbe.Ref);
// The ClusterClient probe receives the routed message
siteProbes["site1"].ExpectMsg();
// Simulate site disconnection
actor.Tell(new ConnectionStateChanged("site1", false, DateTimeOffset.UtcNow));
// The subscriber should receive a DebugStreamTerminated notification
subscriberProbe.ExpectMsg(
msg => msg.SiteId == "site1" && msg.CorrelationId == "corr-123");
}
[Fact]
public void Heartbeat_BumpsAggregatorTimestamp()
{
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(new List());
var aggregator = Substitute.For();
var services = new ServiceCollection();
services.AddScoped(_ => mockRepo);
services.AddSingleton(aggregator);
var sp = services.BuildServiceProvider();
var siteClientFactory = Substitute.For();
var centralActor = Sys.ActorOf(
Props.Create(() => new CentralCommunicationActor(sp, siteClientFactory)));
var timestamp = DateTimeOffset.UtcNow;
centralActor.Tell(new HeartbeatMessage("site1", "host1", true, timestamp));
AwaitAssert(() => aggregator.Received(1).MarkHeartbeat("site1", timestamp));
}
[Fact]
public void RefreshSiteAddresses_UpdatesCache()
{
var site1 = CreateSite("site1", "akka.tcp://scadalink@host1:8082");
var (actor, mockRepo, siteProbes) = CreateActorWithMockRepo(new[] { site1 });
// Wait for initial load
Thread.Sleep(1000);
// Verify routing to site1 works
var cmd1 = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site1", cmd1));
var msg1 = siteProbes["site1"].ExpectMsg();
Assert.Equal("dep1", ((DeployInstanceCommand)msg1.Message).DeploymentId);
// Update mock repo to return both sites
var site2 = CreateSite("site2", "akka.tcp://scadalink@host2:8082");
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(new List { site1, site2 });
// Refresh again
actor.Tell(new RefreshSiteAddresses());
Thread.Sleep(1000);
// Verify routing to site2 now works
var cmd2 = new DeployInstanceCommand(
"dep2", "inst2", "hash2", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site2", cmd2));
var msg2 = siteProbes["site2"].ExpectMsg();
Assert.Equal("dep2", ((DeployInstanceCommand)msg2.Message).DeploymentId);
}
[Fact]
public void LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed()
{
// Regression test for Communication-006. When the repository query throws,
// PipeTo delivers a Status.Failure to the actor. Without a Receive
// handler the failure becomes an unhandled message (debug-level only) and the
// periodic refresh fails silently — operators cannot tell "no addresses
// configured" from "database is down". The fix logs the failure at Warning.
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns>>(_ => throw new InvalidOperationException("database is down"));
var services = new ServiceCollection();
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
var mockFactory = Substitute.For();
// The fix logs a Warning carrying the InvalidOperationException as the cause.
EventFilter.Warning(contains: "Failed to load site addresses from the database").ExpectOne(() =>
{
Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
});
}
[Fact]
public void MalformedSiteAddress_DoesNotAbortRefresh_OtherSitesStillRegistered()
{
// Regression test for Communication-009. HandleSiteAddressCacheLoaded calls
// ActorPath.Parse for every site in a single loop. A malformed NodeAAddress
// throws inside that loop; before the fix the whole refresh aborted partway
// through, leaving the cache half-updated (some sites registered, others not).
// The fix wraps the parse in a try/catch that logs and skips the bad site so
// a single garbage row cannot starve every other site of its ClusterClient.
var goodSite = CreateSite("good-site", "akka.tcp://scadalink@host1:8082");
// A garbage address that ActorPath.Parse rejects.
var badSite = CreateSite("bad-site", "this is not a valid actor path !!!");
// Order the bad site first so a non-resilient loop aborts before reaching good-site.
var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { badSite, goodSite });
Thread.Sleep(1000);
// good-site must still be registered and routable despite bad-site failing to parse.
var cmd = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("good-site", cmd));
Assert.True(siteProbes.ContainsKey("good-site"),
"good-site should have a ClusterClient even though bad-site's address is malformed");
var msg = siteProbes["good-site"].ExpectMsg();
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
}
private NotificationSubmit CreateSubmit(string id = "notif1") =>
new(id, "ops-list", "Subject", "Body", "site1", "inst1", "script.cs", DateTimeOffset.UtcNow);
[Fact]
public void NotificationSubmit_ForwardedToOutboxProxy_AckRoutesBackToSite()
{
var (actor, _, _) = CreateActorWithMockRepo();
var outboxProbe = CreateTestProbe();
actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref));
// A second probe stands in for the site's ClusterClient (the original Sender).
var siteProbe = CreateTestProbe();
var submit = CreateSubmit();
actor.Tell(submit, siteProbe.Ref);
// The outbox proxy receives the NotificationSubmit with the site as the sender,
// so an ack it sends routes straight back to the site, not the central actor.
outboxProbe.ExpectMsg(m => m.NotificationId == "notif1");
outboxProbe.Reply(new NotificationSubmitAck("notif1", Accepted: true, Error: null));
siteProbe.ExpectMsg(a => a.NotificationId == "notif1" && a.Accepted);
}
[Fact]
public void NotificationStatusQuery_ForwardedToOutboxProxy_ResponseRoutesBackToSite()
{
var (actor, _, _) = CreateActorWithMockRepo();
var outboxProbe = CreateTestProbe();
actor.Tell(new RegisterNotificationOutbox(outboxProbe.Ref));
var siteProbe = CreateTestProbe();
var query = new NotificationStatusQuery("corr1", "notif1");
actor.Tell(query, siteProbe.Ref);
outboxProbe.ExpectMsg(m => m.CorrelationId == "corr1");
outboxProbe.Reply(new NotificationStatusResponse(
"corr1", Found: true, Status: "Delivered", RetryCount: 0,
LastError: null, DeliveredAt: DateTimeOffset.UtcNow));
siteProbe.ExpectMsg(r => r.CorrelationId == "corr1" && r.Found);
}
[Fact]
public void NotificationSubmit_NoOutboxConfigured_RepliesNonAccepted()
{
var (actor, _, _) = CreateActorWithMockRepo();
// No RegisterNotificationOutbox sent — the proxy is null.
var submit = CreateSubmit();
actor.Tell(submit);
var ack = ExpectMsg();
Assert.Equal("notif1", ack.NotificationId);
Assert.False(ack.Accepted);
Assert.NotNull(ack.Error);
}
[Fact]
public void NotificationStatusQuery_NoOutboxConfigured_RepliesNotFound()
{
var (actor, _, _) = CreateActorWithMockRepo();
// No RegisterNotificationOutbox sent — the proxy is null.
var query = new NotificationStatusQuery("corr1", "notif1");
actor.Tell(query);
var response = ExpectMsg();
Assert.Equal("corr1", response.CorrelationId);
Assert.False(response.Found);
}
[Fact]
public void BothContactPoints_UsedInSingleClient()
{
var site = CreateSite("site1",
"akka.tcp://scadalink@host1:8082",
"akka.tcp://scadalink@host2:8082");
var (actor, _, siteProbes, mockFactory) = CreateActorWithFactory(new[] { site });
// Wait for auto-refresh
Thread.Sleep(1000);
// Verify the factory was called with 2 contact paths
mockFactory.Received(1).Create(
Arg.Any(),
Arg.Is("site1"),
Arg.Is>(paths => paths.Count == 2));
}
}