using Akka.Actor; using Akka.TestKit; using Akka.TestKit.Xunit2; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Communication.Actors; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests.Grpc; /// /// Tests for DebugStreamBridgeActor with gRPC streaming integration. /// public class DebugStreamBridgeActorTests : TestKit { private const string SiteId = "site-alpha"; private const string InstanceName = "Site1.Pump01"; private const string GrpcNodeA = "http://localhost:5100"; private const string GrpcNodeB = "http://localhost:5200"; public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG") { // Use a very short reconnect delay for testing DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100); // Long stability window so streams are never considered "stable" mid-test // unless a test deliberately waits it out. DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); } private record TestContext( IActorRef BridgeActor, TestProbe CommProbe, MockSiteStreamGrpcClient MockGrpcClient, List ReceivedEvents, bool[] TerminatedFlag); private TestContext CreateBridgeActor() { var commProbe = CreateTestProbe(); var mockClient = new MockSiteStreamGrpcClient(); var factory = new MockSiteStreamGrpcClientFactory(mockClient); var events = new List(); var terminated = new[] { false }; Action onEvent = evt => { lock (events) { events.Add(evt); } }; Action onTerminated = () => terminated[0] = true; var props = Props.Create(typeof(DebugStreamBridgeActor), SiteId, InstanceName, "corr-1", commProbe.Ref, onEvent, onTerminated, factory, GrpcNodeA, GrpcNodeB); var actor = Sys.ActorOf(props); return new TestContext(actor, commProbe, mockClient, events, terminated); } [Fact] public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient() { var ctx = CreateBridgeActor(); var envelope = ctx.CommProbe.ExpectMsg(); Assert.Equal(SiteId, envelope.SiteId); Assert.IsType(envelope.Message); var req = (SubscribeDebugViewRequest)envelope.Message; Assert.Equal(InstanceName, req.InstanceUniqueName); Assert.Equal("corr-1", req.CorrelationId); } [Fact] public void On_Snapshot_Forwards_To_OnEvent_Callback() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); } } [Fact] public void On_Snapshot_Opens_GrpcStream() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var call = ctx.MockGrpcClient.SubscribeCalls[0]; Assert.Equal("corr-1", call.CorrelationId); Assert.Equal(InstanceName, call.InstanceUniqueName); } [Fact] public void Events_From_GrpcCallback_Forwarded_To_OnEvent() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC event arriving via the onEvent callback var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange); // snapshot + attr change AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); } } [Fact] public void On_GrpcError_Reconnects_To_Other_Node() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC error ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); // Should resubscribe AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId); } [Fact] public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect() { // Communication-002 regression: a reconnect must unsubscribe the previous // stream so the old node does not keep a zombie relay actor / subscription. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC error → reconnect ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); // Should resubscribe... AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); // ...and must have unsubscribed the prior correlation ID so the old node's // relay actor is released rather than left zombie. Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); } [Fact] public void After_MaxRetries_Terminates() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // 4 consecutive errors: initial + 3 retries, then terminate ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); // Fourth error exceeds max retries ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // subscribe var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); ctx.BridgeActor.Tell(new StopDebugStream()); // Should send ClusterClient unsubscribe var envelope = ctx.CommProbe.ExpectMsg(); Assert.IsType(envelope.Message); // Should unsubscribe gRPC AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3)); Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); // Should stop self ExpectTerminated(ctx.BridgeActor); } [Fact] public void DebugStreamTerminated_Stops_Actor_Idempotently() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1")); ExpectTerminated(ctx.BridgeActor); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries() { // Communication-008 regression: a stream that connects, delivers an event, // then fails — repeatedly — must still trip MaxRetries. The retry count is // NO LONGER reset by a received event (only by the stability window). The // previous behaviour reset _retryCount on every event, so a flapping site // reconnected forever and the debug session lived on indefinitely. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); // Flap: deliver one event then fail, three times. Each event would, under // the old buggy logic, reset the retry budget and prevent termination. for (var i = 0; i < 3; i++) { var call = ctx.MockGrpcClient.SubscribeCalls[i]; call.OnEvent(attrChange); call.OnError(new Exception($"Flap {i + 1}")); var expected = i + 2; AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); } // Fourth error (after the 3 retries) must exceed MaxRetries and terminate. ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange); ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4")); ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void On_GrpcError_Reconnects_To_Other_Node_Endpoint() { // Communication-015 regression: drive the bridge actor through a node flip // with an endpoint-aware factory (one distinct mock client per endpoint). // The first subscribe targets NodeA; after a gRPC error the bridge must // reconnect via a client bound to the *NodeB* endpoint. var commProbe = CreateTestProbe(); var factory = new EndpointTrackingGrpcClientFactory(); var events = new List(); var terminated = new[] { false }; var props = Props.Create(typeof(DebugStreamBridgeActor), SiteId, InstanceName, "corr-1", commProbe.Ref, (Action)(evt => { lock (events) { events.Add(evt); } }), (Action)(() => terminated[0] = true), factory, GrpcNodeA, GrpcNodeB); var actor = Sys.ActorOf(props); commProbe.ExpectMsg(); actor.Tell(new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow)); // First subscribe goes to NodeA. AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // gRPC error → bridge flips to NodeB. factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down")); // The reconnect must reach a client bound to the NodeB endpoint. AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1, TimeSpan.FromSeconds(5)); Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId); } [Fact] public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow() { // Communication-008: after a stream has been connected for the stability // window, the retry budget is recovered — a later transient fault then gets // a fresh set of retries rather than being counted against the old budget. DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300); try { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Two failures — but each new stream stays up long enough (the mock // stream only completes on cancel) for the stability window to elapse // and reset the retry budget before the next failure. for (var i = 0; i < 5; i++) { Thread.Sleep(450); // exceed the 300ms stability window ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}")); var expected = i + 2; AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); } // Five well-spaced failures did NOT terminate the actor because each // reconnect recovered its retry budget after the stability window. Assert.False(ctx.TerminatedFlag[0]); } finally { DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); } } } /// /// Mock gRPC client that records SubscribeAsync and Unsubscribe calls. /// internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient { public List SubscribeCalls { get; } = new(); public List UnsubscribedCorrelationIds { get; } = new(); private MockSiteStreamGrpcClient(bool _) : base() { } public MockSiteStreamGrpcClient() : base() { } public override Task SubscribeAsync( string correlationId, string instanceUniqueName, Action onEvent, Action onError, CancellationToken ct) { var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct); SubscribeCalls.Add(subscription); // Return a task that completes when cancelled (simulates long-running stream) var tcs = new TaskCompletionSource(); ct.Register(() => tcs.TrySetResult()); return tcs.Task; } public override void Unsubscribe(string correlationId) { UnsubscribedCorrelationIds.Add(correlationId); } } internal record MockSubscription( string CorrelationId, string InstanceUniqueName, Action OnEvent, Action OnError, CancellationToken CancellationToken); /// /// Factory that always returns the pre-configured mock client. /// internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory { private readonly MockSiteStreamGrpcClient _mockClient; public List RequestedEndpoints { get; } = new(); public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient) : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) { _mockClient = mockClient; } public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { RequestedEndpoints.Add(grpcEndpoint); return _mockClient; } } /// /// Endpoint-aware mock factory: hands out a distinct /// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour /// so node-flip coverage is meaningful (Communication-015). /// internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory { private readonly System.Collections.Concurrent.ConcurrentDictionary _byEndpoint = new(); public EndpointTrackingGrpcClientFactory() : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) { } public MockSiteStreamGrpcClient ClientFor(string endpoint) => _byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient()); public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) => ClientFor(grpcEndpoint); }