Files
scadalink-design/tests/ScadaLink.Communication.Tests/DebugStreamServiceTests.cs
Joseph Doherty a9ceba00d0 fix(communication): resolve Communication-001 — early stream termination handling
DebugStreamService.StartStreamAsync awaited the initial debug snapshot inside
a try whose only handler was catch (OperationCanceledException). When the
stream terminated before the snapshot arrived, onTerminatedWrapper completed
the await with an InvalidOperationException that escaped the catch — the
caller got a raw, untranslated exception and the service did no teardown of
its own on that path.

Replaced with catch (Exception): it removes the session entry, sends
StopDebugStream to the bridge actor via the local reference (deterministic
teardown, idempotent), and throws a descriptive exception — TimeoutException
for the 30s timeout, otherwise an InvalidOperationException naming the
instance/site and wrapping the cause.

Re-triaged Critical -> Medium: the originally-claimed multi-minute site-side
resource leak does not occur (the bridge actor self-terminates on every
onTerminated path). Adds the first DebugStreamService test, which fails
against the pre-fix code.
2026-05-16 18:32:52 -04:00

78 lines
3.5 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.Commons.Entities.Instances;
using ScadaLink.Commons.Entities.Sites;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Communication;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests;
/// <summary>
/// Tests for DebugStreamService session lifecycle.
/// </summary>
public class DebugStreamServiceTests : TestKit
{
[Fact]
public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException()
{
// Regression test for Communication-001. When the debug stream terminates before
// the initial snapshot arrives, StartStreamAsync used to let the raw
// InvalidOperationException from onTerminatedWrapper escape its
// OperationCanceledException-only catch — the caller saw an untranslated exception
// and the failure path did not deterministically tear the bridge actor down.
// The fix catches any failure, tells the bridge actor StopDebugStream, and throws
// a descriptive exception that names the instance and wraps the underlying cause.
var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 };
var site = new Site("Site One", "site-1")
{
Id = 3,
GrpcNodeAAddress = "http://localhost:5100",
GrpcNodeBAddress = "http://localhost:5200"
};
var instanceRepo = Substitute.For<ITemplateEngineRepository>();
instanceRepo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
var siteRepo = Substitute.For<ISiteRepository>();
siteRepo.GetSiteByIdAsync(3, Arg.Any<CancellationToken>()).Returns(site);
var services = new ServiceCollection();
services.AddScoped(_ => instanceRepo);
services.AddScoped(_ => siteRepo);
using var provider = services.BuildServiceProvider();
var commProbe = CreateTestProbe();
var commService = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
commService.SetCommunicationActor(commProbe.Ref);
using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
var service = new DebugStreamService(
commService, provider, grpcFactory, NullLogger<DebugStreamService>.Instance);
service.SetActorSystem(Sys);
// Act — start the stream; it blocks awaiting the initial snapshot.
var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { });
// The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor;
// the envelope's sender is the bridge actor itself.
commProbe.ExpectMsg<SiteEnvelope>(TimeSpan.FromSeconds(5));
var bridgeActor = commProbe.LastSender;
// Simulate the site terminating the stream before any snapshot is delivered.
bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr"));
// Assert — a descriptive exception that names the instance and wraps the cause,
// not the raw "terminated before snapshot received" InvalidOperationException.
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => startTask);
Assert.Contains("Site1.Pump01", ex.Message);
Assert.NotNull(ex.InnerException);
}
}