diff --git a/tests/ScadaLink.Communication.Tests/Grpc/CleanupVerificationTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/CleanupVerificationTests.cs new file mode 100644 index 0000000..c665238 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/CleanupVerificationTests.cs @@ -0,0 +1,129 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Verifies that after a gRPC stream is cancelled, the SiteStreamManager +/// subscription is properly cleaned up with no leaked subscriptions. +/// +public class CleanupVerificationTests : TestKit +{ + [Fact] + public async Task Stream_Cancellation_CleansUp_SiteStreamManager_Subscription() + { + // Arrange: create server with mock subscriber that tracks subscribe/remove calls + var subscriber = Substitute.For(); + var subscribeCalled = false; + var removeCalled = false; + IActorRef? subscribedActor = null; + + subscriber.Subscribe(Arg.Any(), Arg.Any()) + .Returns(ci => + { + subscribeCalled = true; + subscribedActor = ci.Arg(); + return "sub-cleanup-test"; + }); + + subscriber.When(x => x.RemoveSubscriber(Arg.Any())) + .Do(_ => removeCalled = true); + + var logger = NullLogger.Instance; + var server = new SiteStreamGrpcServer(subscriber, logger); + server.SetReady(Sys); + + var cts = new CancellationTokenSource(); + var context = Substitute.For(); + context.CancellationToken.Returns(cts.Token); + + var writer = Substitute.For>(); + + var request = new InstanceStreamRequest + { + CorrelationId = "corr-cleanup-verify", + InstanceUniqueName = "Site1.TestInst" + }; + + // Act: start a stream, wait for it to register, then cancel + var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context)); + + await WaitForConditionAsync(() => subscribeCalled); + Assert.True(subscribeCalled, "Subscribe should have been called"); + Assert.Equal(1, server.ActiveStreamCount); + + cts.Cancel(); + await streamTask; + + // Assert: verify cleanup + Assert.True(removeCalled, "RemoveSubscriber should have been called after cancellation"); + Assert.Equal(0, server.ActiveStreamCount); + + // Verify the same actor that was subscribed is the one that was removed + subscriber.Received(1).RemoveSubscriber(subscribedActor!); + } + + [Fact] + public async Task Multiple_Streams_Cancelled_AllCleanedUp() + { + var subscriber = Substitute.For(); + var removeCount = 0; + + subscriber.Subscribe(Arg.Any(), Arg.Any()) + .Returns("sub-multi"); + + subscriber.When(x => x.RemoveSubscriber(Arg.Any())) + .Do(_ => Interlocked.Increment(ref removeCount)); + + var logger = NullLogger.Instance; + var server = new SiteStreamGrpcServer(subscriber, logger); + server.SetReady(Sys); + + // Start 3 streams + var ctsList = new List(); + var tasks = new List(); + + for (var i = 0; i < 3; i++) + { + var cts = new CancellationTokenSource(); + ctsList.Add(cts); + var ctx = Substitute.For(); + ctx.CancellationToken.Returns(cts.Token); + var w = Substitute.For>(); + + var req = new InstanceStreamRequest + { + CorrelationId = $"corr-multi-{i}", + InstanceUniqueName = $"Site1.Inst{i}" + }; + + tasks.Add(Task.Run(() => server.SubscribeInstance(req, w, ctx))); + } + + await WaitForConditionAsync(() => server.ActiveStreamCount == 3); + + // Cancel all + foreach (var cts in ctsList) + cts.Cancel(); + + await Task.WhenAll(tasks); + + Assert.Equal(0, server.ActiveStreamCount); + Assert.Equal(3, removeCount); + } + + private static async Task WaitForConditionAsync(Func condition, int timeoutMs = 5000) + { + var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); + while (!condition() && DateTime.UtcNow < deadline) + { + await Task.Delay(25); + } + + Assert.True(condition(), $"Condition not met within {timeoutMs}ms"); + } +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/NoClusterClientStreamingRegressionTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/NoClusterClientStreamingRegressionTests.cs new file mode 100644 index 0000000..28dcf33 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/NoClusterClientStreamingRegressionTests.cs @@ -0,0 +1,51 @@ +using System.Reflection; +using ScadaLink.Communication.Actors; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Regression tests ensuring that the old ClusterClient-based debug streaming +/// path is not reintroduced. Debug streaming now flows through gRPC. +/// +/// Note: The DebugStreamEvent type-does-not-exist check lives in +/// ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs and is not +/// duplicated here. +/// +public class NoClusterClientStreamingRegressionTests +{ + [Fact] + public void CentralCommunicationActor_DoesNotHave_HandleDebugStreamEvent() + { + var type = typeof(CentralCommunicationActor); + var method = type.GetMethod("HandleDebugStreamEvent", + BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.Null(method); + } + + [Fact] + public void SiteCommunicationActor_DoesNotHave_HandleDebugStreamEvent() + { + var type = typeof(SiteCommunicationActor); + var method = type.GetMethod("HandleDebugStreamEvent", + BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.Null(method); + } + + [Fact] + public void CentralCommunicationActor_DoesNotHave_ForwardDebugStreamEvent() + { + var type = typeof(CentralCommunicationActor); + var method = type.GetMethod("ForwardDebugStreamEvent", + BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.Null(method); + } + + [Fact] + public void Communication_Assembly_DoesNotContain_DebugStreamEvent_Type() + { + // DebugStreamEvent should not exist in the Communication assembly either + var assembly = typeof(CentralCommunicationActor).Assembly; + var type = assembly.GetTypes().FirstOrDefault(t => t.Name == "DebugStreamEvent"); + Assert.Null(type); + } +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/ProtoContractTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/ProtoContractTests.cs new file mode 100644 index 0000000..26bc7eb --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/ProtoContractTests.cs @@ -0,0 +1,79 @@ +using Google.Protobuf.WellKnownTypes; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Guardrail tests that verify all oneof variants in SiteStreamEvent have +/// corresponding conversion handlers. Adding a new proto field without +/// implementing the conversion will cause these tests to fail. +/// +public class ProtoContractTests +{ + /// + /// The set of EventOneofCase values we handle in ConvertToDomainEvent. + /// Update this array when adding a new oneof variant. + /// + private static readonly SiteStreamEvent.EventOneofCase[] HandledCases = + [ + SiteStreamEvent.EventOneofCase.AttributeChanged, + SiteStreamEvent.EventOneofCase.AlarmChanged + ]; + + [Fact] + public void AllOneofVariants_HaveConversionHandlers() + { + var allCases = System.Enum.GetValues() + .Where(c => c != SiteStreamEvent.EventOneofCase.None) + .ToArray(); + + Assert.Equal(allCases.Length, HandledCases.Length); + foreach (var c in allCases) + Assert.Contains(c, HandledCases); + } + + [Theory] + [InlineData(SiteStreamEvent.EventOneofCase.AttributeChanged)] + [InlineData(SiteStreamEvent.EventOneofCase.AlarmChanged)] + public void ConvertToDomainEvent_HandlesAllOneofVariants(SiteStreamEvent.EventOneofCase eventCase) + { + var evt = CreateTestEvent(eventCase); + var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt); + Assert.NotNull(result); + } + + private static SiteStreamEvent CreateTestEvent(SiteStreamEvent.EventOneofCase eventCase) + { + var ts = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow); + + return eventCase switch + { + SiteStreamEvent.EventOneofCase.AttributeChanged => new SiteStreamEvent + { + CorrelationId = "test", + AttributeChanged = new AttributeValueUpdate + { + InstanceUniqueName = "Site1.Inst1", + AttributePath = "Path", + AttributeName = "Attr", + Value = "42", + Quality = Quality.Good, + Timestamp = ts + } + }, + SiteStreamEvent.EventOneofCase.AlarmChanged => new SiteStreamEvent + { + CorrelationId = "test", + AlarmChanged = new AlarmStateUpdate + { + InstanceUniqueName = "Site1.Inst1", + AlarmName = "HighTemp", + State = AlarmStateEnum.AlarmStateActive, + Priority = 1, + Timestamp = ts + } + }, + _ => throw new ArgumentOutOfRangeException(nameof(eventCase), eventCase, "Unhandled event case") + }; + } +}