using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using NSubstitute; using ScadaLink.Commons.Entities.Sites; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Health; using ScadaLink.Communication.Actors; using ScadaLink.HealthMonitoring; using Akka.TestKit; namespace ScadaLink.Communication.Tests; /// /// Tests for CentralCommunicationActor with per-site ClusterClient routing. /// WP-4: Message routing via ClusterClient instances created per site. /// WP-5: Connection failure and failover handling. /// public class CentralCommunicationActorTests : TestKit { public CentralCommunicationActorTests() : base(@"akka.loglevel = DEBUG") { } private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes) CreateActorWithMockRepo( IEnumerable? sites = null) { var mockRepo = Substitute.For(); mockRepo.GetAllSitesAsync(Arg.Any()) .Returns(sites?.ToList() ?? new List()); var services = new ServiceCollection(); services.AddScoped(_ => mockRepo); var sp = services.BuildServiceProvider(); var siteProbes = new Dictionary(); var mockFactory = Substitute.For(); mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>()) .Returns(callInfo => { var siteId = callInfo.ArgAt(1); var probe = CreateTestProbe(); siteProbes[siteId] = probe; return probe.Ref; }); var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); return (actor, mockRepo, siteProbes); } private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes, ISiteClientFactory mockFactory) CreateActorWithFactory( IEnumerable? sites = null) { var mockRepo = Substitute.For(); mockRepo.GetAllSitesAsync(Arg.Any()) .Returns(sites?.ToList() ?? new List()); var services = new ServiceCollection(); services.AddScoped(_ => mockRepo); var sp = services.BuildServiceProvider(); var siteProbes = new Dictionary(); var mockFactory = Substitute.For(); mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>()) .Returns(callInfo => { var siteId = callInfo.ArgAt(1); var probe = CreateTestProbe(); siteProbes[siteId] = probe; return probe.Ref; }); var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); return (actor, mockRepo, siteProbes, mockFactory); } private Site CreateSite(string identifier, string? nodeAAddress, string? nodeBAddress = null) => new("Test Site", identifier) { NodeAAddress = nodeAAddress, NodeBAddress = nodeBAddress }; [Fact] public void ClusterClientRouting_RoutesToConfiguredSite() { var site = CreateSite("site1", "akka.tcp://scadalink@host:8082"); var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site }); // Wait for auto-refresh (PreStart schedules with TimeSpan.Zero initial delay) Thread.Sleep(1000); var command = new DeployInstanceCommand( "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow); actor.Tell(new SiteEnvelope("site1", command)); // The site1 probe (acting as ClusterClient) should receive a ClusterClient.Send var msg = siteProbes["site1"].ExpectMsg(); Assert.Equal("/user/site-communication", msg.Path); Assert.IsType(msg.Message); Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId); } [Fact] public void UnconfiguredSite_MessageIsDropped() { var (actor, _, _) = CreateActorWithMockRepo(); // Wait for auto-refresh Thread.Sleep(1000); var command = new DeployInstanceCommand( "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow); actor.Tell(new SiteEnvelope("unknown-site", command)); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); } [Fact] public void ConnectionLost_DebugStreamsKilled() { var site = CreateSite("site1", "akka.tcp://scadalink@host:8082"); var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site }); // Wait for auto-refresh Thread.Sleep(1000); // Subscribe to debug view (tracks the subscription) var subscriberProbe = CreateTestProbe(); var subRequest = new SubscribeDebugViewRequest("inst1", "corr-123"); actor.Tell(new SiteEnvelope("site1", subRequest), subscriberProbe.Ref); // The ClusterClient probe receives the routed message siteProbes["site1"].ExpectMsg(); // Simulate site disconnection actor.Tell(new ConnectionStateChanged("site1", false, DateTimeOffset.UtcNow)); // The subscriber should receive a DebugStreamTerminated notification subscriberProbe.ExpectMsg( msg => msg.SiteId == "site1" && msg.CorrelationId == "corr-123"); } [Fact] public void Heartbeat_BumpsAggregatorTimestamp() { var mockRepo = Substitute.For(); mockRepo.GetAllSitesAsync(Arg.Any()) .Returns(new List()); var aggregator = Substitute.For(); var services = new ServiceCollection(); services.AddScoped(_ => mockRepo); services.AddSingleton(aggregator); var sp = services.BuildServiceProvider(); var siteClientFactory = Substitute.For(); var centralActor = Sys.ActorOf( Props.Create(() => new CentralCommunicationActor(sp, siteClientFactory))); var timestamp = DateTimeOffset.UtcNow; centralActor.Tell(new HeartbeatMessage("site1", "host1", true, timestamp)); AwaitAssert(() => aggregator.Received(1).MarkHeartbeat("site1", timestamp)); } [Fact] public void RefreshSiteAddresses_UpdatesCache() { var site1 = CreateSite("site1", "akka.tcp://scadalink@host1:8082"); var (actor, mockRepo, siteProbes) = CreateActorWithMockRepo(new[] { site1 }); // Wait for initial load Thread.Sleep(1000); // Verify routing to site1 works var cmd1 = new DeployInstanceCommand( "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow); actor.Tell(new SiteEnvelope("site1", cmd1)); var msg1 = siteProbes["site1"].ExpectMsg(); Assert.Equal("dep1", ((DeployInstanceCommand)msg1.Message).DeploymentId); // Update mock repo to return both sites var site2 = CreateSite("site2", "akka.tcp://scadalink@host2:8082"); mockRepo.GetAllSitesAsync(Arg.Any()) .Returns(new List { site1, site2 }); // Refresh again actor.Tell(new RefreshSiteAddresses()); Thread.Sleep(1000); // Verify routing to site2 now works var cmd2 = new DeployInstanceCommand( "dep2", "inst2", "hash2", "{}", "admin", DateTimeOffset.UtcNow); actor.Tell(new SiteEnvelope("site2", cmd2)); var msg2 = siteProbes["site2"].ExpectMsg(); Assert.Equal("dep2", ((DeployInstanceCommand)msg2.Message).DeploymentId); } [Fact] public void LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed() { // Regression test for Communication-006. When the repository query throws, // PipeTo delivers a Status.Failure to the actor. Without a Receive // handler the failure becomes an unhandled message (debug-level only) and the // periodic refresh fails silently — operators cannot tell "no addresses // configured" from "database is down". The fix logs the failure at Warning. var mockRepo = Substitute.For(); mockRepo.GetAllSitesAsync(Arg.Any()) .Returns>>(_ => throw new InvalidOperationException("database is down")); var services = new ServiceCollection(); services.AddScoped(_ => mockRepo); var sp = services.BuildServiceProvider(); var mockFactory = Substitute.For(); // The fix logs a Warning carrying the InvalidOperationException as the cause. EventFilter.Warning(contains: "Failed to load site addresses from the database").ExpectOne(() => { Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); }); } [Fact] public void MalformedSiteAddress_DoesNotAbortRefresh_OtherSitesStillRegistered() { // Regression test for Communication-009. HandleSiteAddressCacheLoaded calls // ActorPath.Parse for every site in a single loop. A malformed NodeAAddress // throws inside that loop; before the fix the whole refresh aborted partway // through, leaving the cache half-updated (some sites registered, others not). // The fix wraps the parse in a try/catch that logs and skips the bad site so // a single garbage row cannot starve every other site of its ClusterClient. var goodSite = CreateSite("good-site", "akka.tcp://scadalink@host1:8082"); // A garbage address that ActorPath.Parse rejects. var badSite = CreateSite("bad-site", "this is not a valid actor path !!!"); // Order the bad site first so a non-resilient loop aborts before reaching good-site. var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { badSite, goodSite }); Thread.Sleep(1000); // good-site must still be registered and routable despite bad-site failing to parse. var cmd = new DeployInstanceCommand( "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow); actor.Tell(new SiteEnvelope("good-site", cmd)); Assert.True(siteProbes.ContainsKey("good-site"), "good-site should have a ClusterClient even though bad-site's address is malformed"); var msg = siteProbes["good-site"].ExpectMsg(); Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId); } [Fact] public void BothContactPoints_UsedInSingleClient() { var site = CreateSite("site1", "akka.tcp://scadalink@host1:8082", "akka.tcp://scadalink@host2:8082"); var (actor, _, siteProbes, mockFactory) = CreateActorWithFactory(new[] { site }); // Wait for auto-refresh Thread.Sleep(1000); // Verify the factory was called with 2 contact paths mockFactory.Received(1).Create( Arg.Any(), Arg.Is("site1"), Arg.Is>(paths => paths.Count == 2)); } }