479 lines
18 KiB
C#
479 lines
18 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit;
|
|
using Akka.TestKit.Xunit2;
|
|
using ScadaLink.Commons.Messages.DebugView;
|
|
using ScadaLink.Commons.Messages.Streaming;
|
|
using ScadaLink.Communication.Actors;
|
|
using ScadaLink.Communication.Grpc;
|
|
|
|
namespace ScadaLink.Communication.Tests.Grpc;
|
|
|
|
/// <summary>
|
|
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
|
|
/// </summary>
|
|
public class DebugStreamBridgeActorTests : TestKit
|
|
{
|
|
private const string SiteId = "site-alpha";
|
|
private const string InstanceName = "Site1.Pump01";
|
|
private const string GrpcNodeA = "http://localhost:5100";
|
|
private const string GrpcNodeB = "http://localhost:5200";
|
|
|
|
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
|
|
{
|
|
// Use a very short reconnect delay for testing
|
|
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
|
|
// Long stability window so streams are never considered "stable" mid-test
|
|
// unless a test deliberately waits it out.
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
|
|
}
|
|
|
|
private record TestContext(
|
|
IActorRef BridgeActor,
|
|
TestProbe CommProbe,
|
|
MockSiteStreamGrpcClient MockGrpcClient,
|
|
List<object> ReceivedEvents,
|
|
bool[] TerminatedFlag);
|
|
|
|
private TestContext CreateBridgeActor()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var mockClient = new MockSiteStreamGrpcClient();
|
|
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
|
|
var events = new List<object>();
|
|
var terminated = new[] { false };
|
|
|
|
Action<object> onEvent = evt => { lock (events) { events.Add(evt); } };
|
|
Action onTerminated = () => terminated[0] = true;
|
|
|
|
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
|
SiteId,
|
|
InstanceName,
|
|
"corr-1",
|
|
commProbe.Ref,
|
|
onEvent,
|
|
onTerminated,
|
|
factory,
|
|
GrpcNodeA,
|
|
GrpcNodeB);
|
|
|
|
var actor = Sys.ActorOf(props);
|
|
return new TestContext(actor, commProbe, mockClient, events, terminated);
|
|
}
|
|
|
|
[Fact]
|
|
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
|
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
Assert.Equal(SiteId, envelope.SiteId);
|
|
Assert.IsType<SubscribeDebugViewRequest>(envelope.Message);
|
|
|
|
var req = (SubscribeDebugViewRequest)envelope.Message;
|
|
Assert.Equal(InstanceName, req.InstanceUniqueName);
|
|
Assert.Equal("corr-1", req.CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_Snapshot_Forwards_To_OnEvent_Callback()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents) { Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]); }
|
|
}
|
|
|
|
[Fact]
|
|
public void On_Snapshot_Opens_GrpcStream()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var call = ctx.MockGrpcClient.SubscribeCalls[0];
|
|
Assert.Equal("corr-1", call.CorrelationId);
|
|
Assert.Equal(InstanceName, call.InstanceUniqueName);
|
|
}
|
|
|
|
[Fact]
|
|
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC event arriving via the onEvent callback
|
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
|
|
|
|
// snapshot + attr change
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents) { Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]); }
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Reconnects_To_Other_Node()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC error
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
|
|
|
|
// Should resubscribe
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect()
|
|
{
|
|
// Communication-002 regression: a reconnect must unsubscribe the previous
|
|
// stream so the old node does not keep a zombie relay actor / subscription.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC error → reconnect
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
|
|
|
|
// Should resubscribe...
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
// ...and must have unsubscribed the prior correlation ID so the old node's
|
|
// relay actor is released rather than left zombie.
|
|
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
|
|
}
|
|
|
|
[Fact]
|
|
public void After_MaxRetries_Terminates()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// 4 consecutive errors: initial + 3 retries, then terminate
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
|
|
|
|
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
|
|
|
|
// Fourth error exceeds max retries
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // subscribe
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
ctx.BridgeActor.Tell(new StopDebugStream());
|
|
|
|
// Should send ClusterClient unsubscribe
|
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
Assert.IsType<UnsubscribeDebugViewRequest>(envelope.Message);
|
|
|
|
// Should unsubscribe gRPC
|
|
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
|
|
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
|
|
|
|
// Should stop self
|
|
ExpectTerminated(ctx.BridgeActor);
|
|
}
|
|
|
|
[Fact]
|
|
public void DebugStreamTerminated_Stops_Actor_Idempotently()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor);
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries()
|
|
{
|
|
// Communication-008 regression: a stream that connects, delivers an event,
|
|
// then fails — repeatedly — must still trip MaxRetries. The retry count is
|
|
// NO LONGER reset by a received event (only by the stability window). The
|
|
// previous behaviour reset _retryCount on every event, so a flapping site
|
|
// reconnected forever and the debug session lived on indefinitely.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
|
|
|
// Flap: deliver one event then fail, three times. Each event would, under
|
|
// the old buggy logic, reset the retry budget and prevent termination.
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
var call = ctx.MockGrpcClient.SubscribeCalls[i];
|
|
call.OnEvent(attrChange);
|
|
call.OnError(new Exception($"Flap {i + 1}"));
|
|
var expected = i + 2;
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// Fourth error (after the 3 retries) must exceed MaxRetries and terminate.
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange);
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Reconnects_To_Other_Node_Endpoint()
|
|
{
|
|
// Communication-015 regression: drive the bridge actor through a node flip
|
|
// with an endpoint-aware factory (one distinct mock client per endpoint).
|
|
// The first subscribe targets NodeA; after a gRPC error the bridge must
|
|
// reconnect via a client bound to the *NodeB* endpoint.
|
|
var commProbe = CreateTestProbe();
|
|
var factory = new EndpointTrackingGrpcClientFactory();
|
|
var events = new List<object>();
|
|
var terminated = new[] { false };
|
|
|
|
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
|
SiteId, InstanceName, "corr-1", commProbe.Ref,
|
|
(Action<object>)(evt => { lock (events) { events.Add(evt); } }),
|
|
(Action)(() => terminated[0] = true),
|
|
factory, GrpcNodeA, GrpcNodeB);
|
|
|
|
var actor = Sys.ActorOf(props);
|
|
commProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
actor.Tell(new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow));
|
|
|
|
// First subscribe goes to NodeA.
|
|
AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1,
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// gRPC error → bridge flips to NodeB.
|
|
factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down"));
|
|
|
|
// The reconnect must reach a client bound to the NodeB endpoint.
|
|
AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1,
|
|
TimeSpan.FromSeconds(5));
|
|
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
|
|
{
|
|
// Communication-008: after a stream has been connected for the stability
|
|
// window, the retry budget is recovered — a later transient fault then gets
|
|
// a fresh set of retries rather than being counted against the old budget.
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300);
|
|
try
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Two failures — but each new stream stays up long enough (the mock
|
|
// stream only completes on cancel) for the stability window to elapse
|
|
// and reset the retry budget before the next failure.
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
Thread.Sleep(450); // exceed the 300ms stability window
|
|
ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}"));
|
|
var expected = i + 2;
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// Five well-spaced failures did NOT terminate the actor because each
|
|
// reconnect recovered its retry budget after the stability window.
|
|
Assert.False(ctx.TerminatedFlag[0]);
|
|
}
|
|
finally
|
|
{
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
|
|
/// </summary>
|
|
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
|
{
|
|
public List<MockSubscription> SubscribeCalls { get; } = new();
|
|
public List<string> UnsubscribedCorrelationIds { get; } = new();
|
|
|
|
private MockSiteStreamGrpcClient(bool _) : base() { }
|
|
|
|
public MockSiteStreamGrpcClient() : base()
|
|
{
|
|
}
|
|
|
|
public override Task SubscribeAsync(
|
|
string correlationId,
|
|
string instanceUniqueName,
|
|
Action<object> onEvent,
|
|
Action<Exception> onError,
|
|
CancellationToken ct)
|
|
{
|
|
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
|
|
SubscribeCalls.Add(subscription);
|
|
|
|
// Return a task that completes when cancelled (simulates long-running stream)
|
|
var tcs = new TaskCompletionSource();
|
|
ct.Register(() => tcs.TrySetResult());
|
|
return tcs.Task;
|
|
}
|
|
|
|
public override void Unsubscribe(string correlationId)
|
|
{
|
|
UnsubscribedCorrelationIds.Add(correlationId);
|
|
}
|
|
}
|
|
|
|
internal record MockSubscription(
|
|
string CorrelationId,
|
|
string InstanceUniqueName,
|
|
Action<object> OnEvent,
|
|
Action<Exception> OnError,
|
|
CancellationToken CancellationToken);
|
|
|
|
/// <summary>
|
|
/// Factory that always returns the pre-configured mock client.
|
|
/// </summary>
|
|
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
|
|
{
|
|
private readonly MockSiteStreamGrpcClient _mockClient;
|
|
public List<string> RequestedEndpoints { get; } = new();
|
|
|
|
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
|
|
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
|
{
|
|
_mockClient = mockClient;
|
|
}
|
|
|
|
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
|
{
|
|
RequestedEndpoints.Add(grpcEndpoint);
|
|
return _mockClient;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Endpoint-aware mock factory: hands out a distinct <see cref="MockSiteStreamGrpcClient"/>
|
|
/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour
|
|
/// so node-flip coverage is meaningful (Communication-015).
|
|
/// </summary>
|
|
internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory
|
|
{
|
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, MockSiteStreamGrpcClient> _byEndpoint = new();
|
|
|
|
public EndpointTrackingGrpcClientFactory()
|
|
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
|
{
|
|
}
|
|
|
|
public MockSiteStreamGrpcClient ClientFor(string endpoint) =>
|
|
_byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient());
|
|
|
|
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
|
=> ClientFor(grpcEndpoint);
|
|
}
|