using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
///
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
///
public class DebugStreamBridgeActorTests : TestKit
{
private const string SiteId = "site-alpha";
private const string InstanceName = "Site1.Pump01";
private const string GrpcNodeA = "http://localhost:5100";
private const string GrpcNodeB = "http://localhost:5200";
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
{
// Use a very short reconnect delay for testing
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
// Long stability window so streams are never considered "stable" mid-test
// unless a test deliberately waits it out.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
private record TestContext(
IActorRef BridgeActor,
TestProbe CommProbe,
MockSiteStreamGrpcClient MockGrpcClient,
List ReceivedEvents,
bool[] TerminatedFlag);
private TestContext CreateBridgeActor()
{
var commProbe = CreateTestProbe();
var mockClient = new MockSiteStreamGrpcClient();
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
var events = new List();
var terminated = new[] { false };
Action onEvent = evt => { lock (events) { events.Add(evt); } };
Action onTerminated = () => terminated[0] = true;
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId,
InstanceName,
"corr-1",
commProbe.Ref,
onEvent,
onTerminated,
factory,
GrpcNodeA,
GrpcNodeB);
var actor = Sys.ActorOf(props);
return new TestContext(actor, commProbe, mockClient, events, terminated);
}
[Fact]
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
{
var ctx = CreateBridgeActor();
var envelope = ctx.CommProbe.ExpectMsg();
Assert.Equal(SiteId, envelope.SiteId);
Assert.IsType(envelope.Message);
var req = (SubscribeDebugViewRequest)envelope.Message;
Assert.Equal(InstanceName, req.InstanceUniqueName);
Assert.Equal("corr-1", req.CorrelationId);
}
[Fact]
public void On_Snapshot_Forwards_To_OnEvent_Callback()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); }
}
[Fact]
public void On_Snapshot_Opens_GrpcStream()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var call = ctx.MockGrpcClient.SubscribeCalls[0];
Assert.Equal("corr-1", call.CorrelationId);
Assert.Equal(InstanceName, call.InstanceUniqueName);
}
[Fact]
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC event arriving via the onEvent callback
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
// snapshot + attr change
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); }
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
}
[Fact]
public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect()
{
// Communication-002 regression: a reconnect must unsubscribe the previous
// stream so the old node does not keep a zombie relay actor / subscription.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error → reconnect
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe...
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// ...and must have unsubscribed the prior correlation ID so the old node's
// relay actor is released rather than left zombie.
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
}
[Fact]
public void After_MaxRetries_Terminates()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// 4 consecutive errors: initial + 3 retries, then terminate
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
// Fourth error exceeds max retries
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // subscribe
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
ctx.BridgeActor.Tell(new StopDebugStream());
// Should send ClusterClient unsubscribe
var envelope = ctx.CommProbe.ExpectMsg();
Assert.IsType(envelope.Message);
// Should unsubscribe gRPC
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
// Should stop self
ExpectTerminated(ctx.BridgeActor);
}
[Fact]
public void DebugStreamTerminated_Stops_Actor_Idempotently()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
ExpectTerminated(ctx.BridgeActor);
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries()
{
// Communication-008 regression: a stream that connects, delivers an event,
// then fails — repeatedly — must still trip MaxRetries. The retry count is
// NO LONGER reset by a received event (only by the stability window). The
// previous behaviour reset _retryCount on every event, so a flapping site
// reconnected forever and the debug session lived on indefinitely.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
// Flap: deliver one event then fail, three times. Each event would, under
// the old buggy logic, reset the retry budget and prevent termination.
for (var i = 0; i < 3; i++)
{
var call = ctx.MockGrpcClient.SubscribeCalls[i];
call.OnEvent(attrChange);
call.OnError(new Exception($"Flap {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Fourth error (after the 3 retries) must exceed MaxRetries and terminate.
ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange);
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node_Endpoint()
{
// Communication-015 regression: drive the bridge actor through a node flip
// with an endpoint-aware factory (one distinct mock client per endpoint).
// The first subscribe targets NodeA; after a gRPC error the bridge must
// reconnect via a client bound to the *NodeB* endpoint.
var commProbe = CreateTestProbe();
var factory = new EndpointTrackingGrpcClientFactory();
var events = new List();
var terminated = new[] { false };
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId, InstanceName, "corr-1", commProbe.Ref,
(Action)(evt => { lock (events) { events.Add(evt); } }),
(Action)(() => terminated[0] = true),
factory, GrpcNodeA, GrpcNodeB);
var actor = Sys.ActorOf(props);
commProbe.ExpectMsg();
actor.Tell(new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow));
// First subscribe goes to NodeA.
AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1,
TimeSpan.FromSeconds(3));
// gRPC error → bridge flips to NodeB.
factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down"));
// The reconnect must reach a client bound to the NodeB endpoint.
AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1,
TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
}
[Fact]
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
{
// Communication-008: after a stream has been connected for the stability
// window, the retry budget is recovered — a later transient fault then gets
// a fresh set of retries rather than being counted against the old budget.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300);
try
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Two failures — but each new stream stays up long enough (the mock
// stream only completes on cancel) for the stability window to elapse
// and reset the retry budget before the next failure.
for (var i = 0; i < 5; i++)
{
Thread.Sleep(450); // exceed the 300ms stability window
ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Five well-spaced failures did NOT terminate the actor because each
// reconnect recovered its retry budget after the stability window.
Assert.False(ctx.TerminatedFlag[0]);
}
finally
{
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
}
}
///
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
///
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
public List SubscribeCalls { get; } = new();
public List UnsubscribedCorrelationIds { get; } = new();
private MockSiteStreamGrpcClient(bool _) : base() { }
public MockSiteStreamGrpcClient() : base()
{
}
public override Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action onEvent,
Action onError,
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
SubscribeCalls.Add(subscription);
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetResult());
return tcs.Task;
}
public override void Unsubscribe(string correlationId)
{
UnsubscribedCorrelationIds.Add(correlationId);
}
}
internal record MockSubscription(
string CorrelationId,
string InstanceUniqueName,
Action OnEvent,
Action OnError,
CancellationToken CancellationToken);
///
/// Factory that always returns the pre-configured mock client.
///
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly MockSiteStreamGrpcClient _mockClient;
public List RequestedEndpoints { get; } = new();
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
_mockClient = mockClient;
}
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
RequestedEndpoints.Add(grpcEndpoint);
return _mockClient;
}
}
///
/// Endpoint-aware mock factory: hands out a distinct
/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour
/// so node-flip coverage is meaningful (Communication-015).
///
internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly System.Collections.Concurrent.ConcurrentDictionary _byEndpoint = new();
public EndpointTrackingGrpcClientFactory()
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
}
public MockSiteStreamGrpcClient ClientFor(string endpoint) =>
_byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient());
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
=> ClientFor(grpcEndpoint);
}