using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.Commons.Entities.Instances;
using ScadaLink.Commons.Entities.Sites;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Communication;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests;
///
/// Tests for DebugStreamService session lifecycle.
///
public class DebugStreamServiceTests : TestKit
{
[Fact]
public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException()
{
// Regression test for Communication-001. When the debug stream terminates before
// the initial snapshot arrives, StartStreamAsync used to let the raw
// InvalidOperationException from onTerminatedWrapper escape its
// OperationCanceledException-only catch — the caller saw an untranslated exception
// and the failure path did not deterministically tear the bridge actor down.
// The fix catches any failure, tells the bridge actor StopDebugStream, and throws
// a descriptive exception that names the instance and wraps the underlying cause.
var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 };
var site = new Site("Site One", "site-1")
{
Id = 3,
GrpcNodeAAddress = "http://localhost:5100",
GrpcNodeBAddress = "http://localhost:5200"
};
var instanceRepo = Substitute.For();
instanceRepo.GetInstanceByIdAsync(7, Arg.Any()).Returns(instance);
var siteRepo = Substitute.For();
siteRepo.GetSiteByIdAsync(3, Arg.Any()).Returns(site);
var services = new ServiceCollection();
services.AddScoped(_ => instanceRepo);
services.AddScoped(_ => siteRepo);
using var provider = services.BuildServiceProvider();
var commProbe = CreateTestProbe();
var commService = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger.Instance);
commService.SetCommunicationActor(commProbe.Ref);
using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
var service = new DebugStreamService(
commService, provider, grpcFactory, NullLogger.Instance);
service.SetActorSystem(Sys);
// Act — start the stream; it blocks awaiting the initial snapshot.
var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { });
// The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor;
// the envelope's sender is the bridge actor itself.
commProbe.ExpectMsg(TimeSpan.FromSeconds(5));
var bridgeActor = commProbe.LastSender;
// Simulate the site terminating the stream before any snapshot is delivered.
bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr"));
// Assert — a descriptive exception that names the instance and wraps the cause,
// not the raw "terminated before snapshot received" InvalidOperationException.
var ex = await Assert.ThrowsAsync(() => startTask);
Assert.Contains("Site1.Pump01", ex.Message);
Assert.NotNull(ex.InnerException);
}
}