using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
///
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
///
public class DebugStreamBridgeActorTests : TestKit
{
private const string SiteId = "site-alpha";
private const string InstanceName = "Site1.Pump01";
private const string GrpcNodeA = "http://localhost:5100";
private const string GrpcNodeB = "http://localhost:5200";
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
{
// Use a very short reconnect delay for testing
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
// Long stability window so streams are never considered "stable" mid-test
// unless a test deliberately waits it out.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
private record TestContext(
IActorRef BridgeActor,
TestProbe CommProbe,
MockSiteStreamGrpcClient MockGrpcClient,
List ReceivedEvents,
bool[] TerminatedFlag);
private TestContext CreateBridgeActor()
{
var commProbe = CreateTestProbe();
var mockClient = new MockSiteStreamGrpcClient();
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
var events = new List();
var terminated = new[] { false };
Action onEvent = evt => { lock (events) { events.Add(evt); } };
Action onTerminated = () => terminated[0] = true;
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId,
InstanceName,
"corr-1",
commProbe.Ref,
onEvent,
onTerminated,
factory,
GrpcNodeA,
GrpcNodeB);
var actor = Sys.ActorOf(props);
return new TestContext(actor, commProbe, mockClient, events, terminated);
}
[Fact]
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
{
var ctx = CreateBridgeActor();
var envelope = ctx.CommProbe.ExpectMsg();
Assert.Equal(SiteId, envelope.SiteId);
Assert.IsType(envelope.Message);
var req = (SubscribeDebugViewRequest)envelope.Message;
Assert.Equal(InstanceName, req.InstanceUniqueName);
Assert.Equal("corr-1", req.CorrelationId);
}
[Fact]
public void On_Snapshot_Forwards_To_OnEvent_Callback()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); }
}
[Fact]
public void On_Snapshot_Opens_GrpcStream()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var call = ctx.MockGrpcClient.SubscribeCalls[0];
Assert.Equal("corr-1", call.CorrelationId);
Assert.Equal(InstanceName, call.InstanceUniqueName);
}
[Fact]
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC event arriving via the onEvent callback
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
// snapshot + attr change
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); }
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
}
[Fact]
public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect()
{
// Communication-002 regression: a reconnect must unsubscribe the previous
// stream so the old node does not keep a zombie relay actor / subscription.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error → reconnect
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe...
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// ...and must have unsubscribed the prior correlation ID so the old node's
// relay actor is released rather than left zombie.
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
}
[Fact]
public void After_MaxRetries_Terminates()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// 4 consecutive errors: initial + 3 retries, then terminate
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
// Fourth error exceeds max retries
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // subscribe
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
ctx.BridgeActor.Tell(new StopDebugStream());
// Should send ClusterClient unsubscribe
var envelope = ctx.CommProbe.ExpectMsg();
Assert.IsType(envelope.Message);
// Should unsubscribe gRPC
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
// Should stop self
ExpectTerminated(ctx.BridgeActor);
}
[Fact]
public void DebugStreamTerminated_Stops_Actor_Idempotently()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
ExpectTerminated(ctx.BridgeActor);
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries()
{
// Communication-008 regression: a stream that connects, delivers an event,
// then fails — repeatedly — must still trip MaxRetries. The retry count is
// NO LONGER reset by a received event (only by the stability window). The
// previous behaviour reset _retryCount on every event, so a flapping site
// reconnected forever and the debug session lived on indefinitely.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
// Flap: deliver one event then fail, three times. Each event would, under
// the old buggy logic, reset the retry budget and prevent termination.
for (var i = 0; i < 3; i++)
{
var call = ctx.MockGrpcClient.SubscribeCalls[i];
call.OnEvent(attrChange);
call.OnError(new Exception($"Flap {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Fourth error (after the 3 retries) must exceed MaxRetries and terminate.
ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange);
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
{
// Communication-008: after a stream has been connected for the stability
// window, the retry budget is recovered — a later transient fault then gets
// a fresh set of retries rather than being counted against the old budget.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300);
try
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Two failures — but each new stream stays up long enough (the mock
// stream only completes on cancel) for the stability window to elapse
// and reset the retry budget before the next failure.
for (var i = 0; i < 5; i++)
{
Thread.Sleep(450); // exceed the 300ms stability window
ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Five well-spaced failures did NOT terminate the actor because each
// reconnect recovered its retry budget after the stability window.
Assert.False(ctx.TerminatedFlag[0]);
}
finally
{
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
}
}
///
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
///
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
public List SubscribeCalls { get; } = new();
public List UnsubscribedCorrelationIds { get; } = new();
private MockSiteStreamGrpcClient(bool _) : base() { }
public MockSiteStreamGrpcClient() : base()
{
}
public override Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action onEvent,
Action onError,
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
SubscribeCalls.Add(subscription);
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetResult());
return tcs.Task;
}
public override void Unsubscribe(string correlationId)
{
UnsubscribedCorrelationIds.Add(correlationId);
}
}
internal record MockSubscription(
string CorrelationId,
string InstanceUniqueName,
Action OnEvent,
Action OnError,
CancellationToken CancellationToken);
///
/// Factory that always returns the pre-configured mock client.
///
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly MockSiteStreamGrpcClient _mockClient;
public List RequestedEndpoints { get; } = new();
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
_mockClient = mockClient;
}
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
RequestedEndpoints.Add(grpcEndpoint);
return _mockClient;
}
}