using Akka.Actor; using Akka.TestKit.Xunit2; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests.Grpc; /// /// Verifies that after a gRPC stream is cancelled, the SiteStreamManager /// subscription is properly cleaned up with no leaked subscriptions. /// public class CleanupVerificationTests : TestKit { [Fact] public async Task Stream_Cancellation_CleansUp_SiteStreamManager_Subscription() { // Arrange: create server with mock subscriber that tracks subscribe/remove calls var subscriber = Substitute.For(); var subscribeCalled = false; var removeCalled = false; IActorRef? subscribedActor = null; subscriber.Subscribe(Arg.Any(), Arg.Any()) .Returns(ci => { subscribeCalled = true; subscribedActor = ci.Arg(); return "sub-cleanup-test"; }); subscriber.When(x => x.RemoveSubscriber(Arg.Any())) .Do(_ => removeCalled = true); var logger = NullLogger.Instance; var server = new SiteStreamGrpcServer(subscriber, logger); server.SetReady(Sys); var cts = new CancellationTokenSource(); var context = Substitute.For(); context.CancellationToken.Returns(cts.Token); var writer = Substitute.For>(); var request = new InstanceStreamRequest { CorrelationId = "corr-cleanup-verify", InstanceUniqueName = "Site1.TestInst" }; // Act: start a stream, wait for it to register, then cancel var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context)); await WaitForConditionAsync(() => subscribeCalled); Assert.True(subscribeCalled, "Subscribe should have been called"); Assert.Equal(1, server.ActiveStreamCount); cts.Cancel(); await streamTask; // Assert: verify cleanup Assert.True(removeCalled, "RemoveSubscriber should have been called after cancellation"); Assert.Equal(0, server.ActiveStreamCount); // Verify the same actor that was subscribed is the one that was removed subscriber.Received(1).RemoveSubscriber(subscribedActor!); } [Fact] public async Task Multiple_Streams_Cancelled_AllCleanedUp() { var subscriber = Substitute.For(); var removeCount = 0; subscriber.Subscribe(Arg.Any(), Arg.Any()) .Returns("sub-multi"); subscriber.When(x => x.RemoveSubscriber(Arg.Any())) .Do(_ => Interlocked.Increment(ref removeCount)); var logger = NullLogger.Instance; var server = new SiteStreamGrpcServer(subscriber, logger); server.SetReady(Sys); // Start 3 streams var ctsList = new List(); var tasks = new List(); for (var i = 0; i < 3; i++) { var cts = new CancellationTokenSource(); ctsList.Add(cts); var ctx = Substitute.For(); ctx.CancellationToken.Returns(cts.Token); var w = Substitute.For>(); var req = new InstanceStreamRequest { CorrelationId = $"corr-multi-{i}", InstanceUniqueName = $"Site1.Inst{i}" }; tasks.Add(Task.Run(() => server.SubscribeInstance(req, w, ctx))); } await WaitForConditionAsync(() => server.ActiveStreamCount == 3); // Cancel all foreach (var cts in ctsList) cts.Cancel(); await Task.WhenAll(tasks); Assert.Equal(0, server.ActiveStreamCount); Assert.Equal(3, removeCount); } private static async Task WaitForConditionAsync(Func condition, int timeoutMs = 5000) { var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); while (!condition() && DateTime.UtcNow < deadline) { await Task.Delay(25); } Assert.True(condition(), $"Condition not met within {timeoutMs}ms"); } }