using Akka.Actor; using Akka.TestKit; using Akka.TestKit.Xunit2; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Communication.Actors; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests.Grpc; /// /// Tests for DebugStreamBridgeActor with gRPC streaming integration. /// public class DebugStreamBridgeActorTests : TestKit { private const string SiteId = "site-alpha"; private const string InstanceName = "Site1.Pump01"; private const string GrpcNodeA = "http://localhost:5100"; private const string GrpcNodeB = "http://localhost:5200"; public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG") { // Use a very short reconnect delay for testing DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100); } private record TestContext( IActorRef BridgeActor, TestProbe CommProbe, MockSiteStreamGrpcClient MockGrpcClient, List ReceivedEvents, bool[] TerminatedFlag); private TestContext CreateBridgeActor() { var commProbe = CreateTestProbe(); var mockClient = new MockSiteStreamGrpcClient(); var factory = new MockSiteStreamGrpcClientFactory(mockClient); var events = new List(); var terminated = new[] { false }; Action onEvent = evt => { lock (events) { events.Add(evt); } }; Action onTerminated = () => terminated[0] = true; var props = Props.Create(typeof(DebugStreamBridgeActor), SiteId, InstanceName, "corr-1", commProbe.Ref, onEvent, onTerminated, factory, GrpcNodeA, GrpcNodeB); var actor = Sys.ActorOf(props); return new TestContext(actor, commProbe, mockClient, events, terminated); } [Fact] public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient() { var ctx = CreateBridgeActor(); var envelope = ctx.CommProbe.ExpectMsg(); Assert.Equal(SiteId, envelope.SiteId); Assert.IsType(envelope.Message); var req = (SubscribeDebugViewRequest)envelope.Message; Assert.Equal(InstanceName, req.InstanceUniqueName); Assert.Equal("corr-1", req.CorrelationId); } [Fact] public void On_Snapshot_Forwards_To_OnEvent_Callback() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); } } [Fact] public void On_Snapshot_Opens_GrpcStream() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var call = ctx.MockGrpcClient.SubscribeCalls[0]; Assert.Equal("corr-1", call.CorrelationId); Assert.Equal(InstanceName, call.InstanceUniqueName); } [Fact] public void Events_From_GrpcCallback_Forwarded_To_OnEvent() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC event arriving via the onEvent callback var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange); // snapshot + attr change AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); } } [Fact] public void On_GrpcError_Reconnects_To_Other_Node() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC error ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); // Should resubscribe AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId); } [Fact] public void After_MaxRetries_Terminates() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // 4 consecutive errors: initial + 3 retries, then terminate ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); // Fourth error exceeds max retries ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // subscribe var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); ctx.BridgeActor.Tell(new StopDebugStream()); // Should send ClusterClient unsubscribe var envelope = ctx.CommProbe.ExpectMsg(); Assert.IsType(envelope.Message); // Should unsubscribe gRPC AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3)); Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); // Should stop self ExpectTerminated(ctx.BridgeActor); } [Fact] public void DebugStreamTerminated_Stops_Actor_Idempotently() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1")); ExpectTerminated(ctx.BridgeActor); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void Grpc_Error_Resets_RetryCount_On_Successful_Event() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // First error → retry 1 ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); // Simulate successful event (resets retry count) var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); // Now another 3 errors should be tolerated (retry count was reset) ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5)); // Still alive — 3 retries from the second failure point succeeded } } /// /// Mock gRPC client that records SubscribeAsync and Unsubscribe calls. /// internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient { public List SubscribeCalls { get; } = new(); public List UnsubscribedCorrelationIds { get; } = new(); private MockSiteStreamGrpcClient(bool _) : base() { } public MockSiteStreamGrpcClient() : base() { } public override Task SubscribeAsync( string correlationId, string instanceUniqueName, Action onEvent, Action onError, CancellationToken ct) { var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct); SubscribeCalls.Add(subscription); // Return a task that completes when cancelled (simulates long-running stream) var tcs = new TaskCompletionSource(); ct.Register(() => tcs.TrySetResult()); return tcs.Task; } public override void Unsubscribe(string correlationId) { UnsubscribedCorrelationIds.Add(correlationId); } } internal record MockSubscription( string CorrelationId, string InstanceUniqueName, Action OnEvent, Action OnError, CancellationToken CancellationToken); /// /// Factory that always returns the pre-configured mock client. /// internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory { private readonly MockSiteStreamGrpcClient _mockClient; public List RequestedEndpoints { get; } = new(); public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient) : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) { _mockClient = mockClient; } public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { RequestedEndpoints.Add(grpcEndpoint); return _mockClient; } }