using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NSubstitute; using ScadaLink.Commons.Entities.Instances; using ScadaLink.Commons.Entities.Sites; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Communication; using ScadaLink.Communication.Actors; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests; /// /// Tests for DebugStreamService session lifecycle. /// public class DebugStreamServiceTests : TestKit { [Fact] public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException() { // Regression test for Communication-001. When the debug stream terminates before // the initial snapshot arrives, StartStreamAsync used to let the raw // InvalidOperationException from onTerminatedWrapper escape its // OperationCanceledException-only catch — the caller saw an untranslated exception // and the failure path did not deterministically tear the bridge actor down. // The fix catches any failure, tells the bridge actor StopDebugStream, and throws // a descriptive exception that names the instance and wraps the underlying cause. var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 }; var site = new Site("Site One", "site-1") { Id = 3, GrpcNodeAAddress = "http://localhost:5100", GrpcNodeBAddress = "http://localhost:5200" }; var instanceRepo = Substitute.For(); instanceRepo.GetInstanceByIdAsync(7, Arg.Any()).Returns(instance); var siteRepo = Substitute.For(); siteRepo.GetSiteByIdAsync(3, Arg.Any()).Returns(site); var services = new ServiceCollection(); services.AddScoped(_ => instanceRepo); services.AddScoped(_ => siteRepo); using var provider = services.BuildServiceProvider(); var commProbe = CreateTestProbe(); var commService = new CommunicationService( Options.Create(new CommunicationOptions()), NullLogger.Instance); commService.SetCommunicationActor(commProbe.Ref); using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance); var service = new DebugStreamService( commService, provider, grpcFactory, NullLogger.Instance); service.SetActorSystem(Sys); // Act — start the stream; it blocks awaiting the initial snapshot. var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { }); // The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor; // the envelope's sender is the bridge actor itself. commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); var bridgeActor = commProbe.LastSender; // Simulate the site terminating the stream before any snapshot is delivered. bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr")); // Assert — a descriptive exception that names the instance and wraps the cause, // not the raw "terminated before snapshot received" InvalidOperationException. var ex = await Assert.ThrowsAsync(() => startTask); Assert.Contains("Site1.Pump01", ex.Message); Assert.NotNull(ex.InnerException); } }