using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
///
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
///
public class DebugStreamBridgeActorTests : TestKit
{
private const string SiteId = "site-alpha";
private const string InstanceName = "Site1.Pump01";
private const string GrpcNodeA = "http://localhost:5100";
private const string GrpcNodeB = "http://localhost:5200";
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
{
// Use a very short reconnect delay for testing
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
// Long stability window so streams are never considered "stable" mid-test
// unless a test deliberately waits it out.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
private record TestContext(
IActorRef BridgeActor,
TestProbe CommProbe,
MockSiteStreamGrpcClient MockGrpcClient,
List ReceivedEvents,
bool[] TerminatedFlag);
private TestContext CreateBridgeActor()
{
var commProbe = CreateTestProbe();
var mockClient = new MockSiteStreamGrpcClient();
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
var events = new List();
var terminated = new[] { false };
Action onEvent = evt => { lock (events) { events.Add(evt); } };
Action onTerminated = () => terminated[0] = true;
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId,
InstanceName,
"corr-1",
commProbe.Ref,
onEvent,
onTerminated,
factory,
GrpcNodeA,
GrpcNodeB);
var actor = Sys.ActorOf(props);
return new TestContext(actor, commProbe, mockClient, events, terminated);
}
[Fact]
public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Tears_Down_Stream_And_Terminates()
{
// M2.11 (revised for M2.18 stream-first): the gRPC subscription is now opened
// up-front in PreStart, so when the site reports InstanceNotFound=true the
// bridge actor must
// (a) forward the not-found snapshot to _onEvent so DebugStreamService's TCS
// resolves and the caller can inspect the flag,
// (b) tear DOWN the already-opened gRPC stream (Unsubscribe the just-opened
// correlation) rather than enter pass-through, and
// (c) stop itself cleanly.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // initial subscribe envelope
// Stream-first: the gRPC subscription is opened before the snapshot arrives.
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var notFoundSnapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow,
InstanceNotFound: true);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(notFoundSnapshot);
// (a) _onEvent must receive the not-found snapshot
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
var received = Assert.IsType(ctx.ReceivedEvents[0]);
Assert.True(received.InstanceNotFound);
}
// (b) the just-opened gRPC stream is torn down (not left running / no pass-through)
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
TimeSpan.FromSeconds(3));
// (c) actor terminates cleanly
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
}
[Fact]
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
{
var ctx = CreateBridgeActor();
var envelope = ctx.CommProbe.ExpectMsg();
Assert.Equal(SiteId, envelope.SiteId);
Assert.IsType(envelope.Message);
var req = (SubscribeDebugViewRequest)envelope.Message;
Assert.Equal(InstanceName, req.InstanceUniqueName);
Assert.Equal("corr-1", req.CorrelationId);
}
[Fact]
public void On_Snapshot_Forwards_To_OnEvent_Callback()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); }
}
[Fact]
public void On_Snapshot_Opens_GrpcStream()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var call = ctx.MockGrpcClient.SubscribeCalls[0];
Assert.Equal("corr-1", call.CorrelationId);
Assert.Equal(InstanceName, call.InstanceUniqueName);
}
[Fact]
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC event arriving via the onEvent callback
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
// snapshot + attr change
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); }
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
}
[Fact]
public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect()
{
// Communication-002 regression: a reconnect must unsubscribe the previous
// stream so the old node does not keep a zombie relay actor / subscription.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error → reconnect
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe...
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// ...and must have unsubscribed the prior correlation ID so the old node's
// relay actor is released rather than left zombie.
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
}
[Fact]
public void After_MaxRetries_Terminates()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// 4 consecutive errors: initial + 3 retries, then terminate
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
// Fourth error exceeds max retries
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // subscribe
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
ctx.BridgeActor.Tell(new StopDebugStream());
// Should send ClusterClient unsubscribe
var envelope = ctx.CommProbe.ExpectMsg();
Assert.IsType(envelope.Message);
// Should unsubscribe gRPC
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
// Should stop self
ExpectTerminated(ctx.BridgeActor);
}
[Fact]
public void DebugStreamTerminated_Stops_Actor_Idempotently()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
ExpectTerminated(ctx.BridgeActor);
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries()
{
// Communication-008 regression: a stream that connects, delivers an event,
// then fails — repeatedly — must still trip MaxRetries. The retry count is
// NO LONGER reset by a received event (only by the stability window). The
// previous behaviour reset _retryCount on every event, so a flapping site
// reconnected forever and the debug session lived on indefinitely.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
// Flap: deliver one event then fail, three times. Each event would, under
// the old buggy logic, reset the retry budget and prevent termination.
for (var i = 0; i < 3; i++)
{
var call = ctx.MockGrpcClient.SubscribeCalls[i];
call.OnEvent(attrChange);
call.OnError(new Exception($"Flap {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Fourth error (after the 3 retries) must exceed MaxRetries and terminate.
ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange);
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node_Endpoint()
{
// Communication-015 regression: drive the bridge actor through a node flip
// with an endpoint-aware factory (one distinct mock client per endpoint).
// The first subscribe targets NodeA; after a gRPC error the bridge must
// reconnect via a client bound to the *NodeB* endpoint.
var commProbe = CreateTestProbe();
var factory = new EndpointTrackingGrpcClientFactory();
var events = new List();
var terminated = new[] { false };
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId, InstanceName, "corr-1", commProbe.Ref,
(Action)(evt => { lock (events) { events.Add(evt); } }),
(Action)(() => terminated[0] = true),
factory, GrpcNodeA, GrpcNodeB);
var actor = Sys.ActorOf(props);
commProbe.ExpectMsg();
actor.Tell(new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow));
// First subscribe goes to NodeA.
AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1,
TimeSpan.FromSeconds(3));
// gRPC error → bridge flips to NodeB.
factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down"));
// The reconnect must reach a client bound to the NodeB endpoint.
AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1,
TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
}
// ---------------------------------------------------------------------
// M2.18 (#26) — stream-first + replay/dedup
// ---------------------------------------------------------------------
[Fact]
public void PreStart_Opens_GrpcStream_Before_Snapshot_Arrives()
{
// M2.18: the gRPC subscription must be opened in PreStart (stream-first),
// BEFORE the snapshot is delivered, so live events start flowing during the
// snapshot-build + network-transit window. The old lifecycle opened the
// stream only after the snapshot arrived, losing gap-window events.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // initial subscribe envelope
// No snapshot sent yet — the stream must already be open.
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[0].CorrelationId);
Assert.Equal(InstanceName, ctx.MockGrpcClient.SubscribeCalls[0].InstanceUniqueName);
// _onEvent must NOT have fired — buffering, not delivering.
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
}
[Fact]
public void GapWindow_Event_Buffered_Before_Snapshot_Is_Delivered_Exactly_Once_After_Snapshot()
{
// M2.18: an event arriving DURING the snapshot window (before the snapshot
// is delivered) is buffered, then flushed exactly once AFTER the snapshot.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Live event arrives BEFORE the snapshot — its entity is NOT in the snapshot,
// so it is a genuine gap-window event that must survive.
var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Pressure", 99.9, "Good",
DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(gapEvent);
// While buffering, _onEvent has not fired.
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
// snapshot then the buffered gap-window event, exactly once, in that order.
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.IsType(ctx.ReceivedEvents[0]);
var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
Assert.Equal("Pressure", flushed.AttributeName);
}
}
[Fact]
public void Buffered_Event_Already_Reflected_In_Snapshot_Is_Dropped()
{
// M2.18 dedup: a buffered event whose entity is in the snapshot with an equal
// or newer snapshot timestamp (buffered.Timestamp <= snapshot.Timestamp) is
// already reflected and must be DROPPED.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var t0 = DateTimeOffset.UtcNow;
// Buffered event for "Temp" at t0.
var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
// Snapshot already contains "Temp" at the SAME timestamp t0 → buffered is a dup.
var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
var snapshot = new DebugViewSnapshot(
InstanceName,
new List { snapAttr },
new List(),
t0);
ctx.BridgeActor.Tell(snapshot);
// Only the snapshot is delivered; the buffered duplicate is dropped.
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
// Give a beat to ensure no extra (dropped) event sneaks through.
Thread.Sleep(200);
lock (ctx.ReceivedEvents)
{
Assert.Single(ctx.ReceivedEvents);
Assert.IsType(ctx.ReceivedEvents[0]);
}
}
[Fact]
public void Buffered_Event_Strictly_Newer_Than_Snapshot_Entity_Is_Delivered()
{
// M2.18 dedup: a buffered event strictly newer than the snapshot's entry for
// the same entity (buffered.Timestamp > snapshot.Timestamp) is NOT a dup and
// must be DELIVERED after the snapshot.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var snapTime = DateTimeOffset.UtcNow;
var newerTime = snapTime.AddMilliseconds(1);
// Buffered event for "Temp" strictly NEWER than the snapshot's "Temp".
var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 50.0, "Good", newerTime);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", snapTime);
var snapshot = new DebugViewSnapshot(
InstanceName,
new List { snapAttr },
new List(),
snapTime);
ctx.BridgeActor.Tell(snapshot);
// snapshot then the strictly-newer buffered event.
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.IsType(ctx.ReceivedEvents[0]);
var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
Assert.Equal(50.0, flushed.Value);
Assert.Equal(newerTime, flushed.Timestamp);
}
}
[Fact]
public void Buffered_Alarm_Dedup_Uses_AlarmIdentity_And_Timestamp()
{
// M2.18 dedup for alarms: identity = (instance, alarm name, source reference).
// A buffered alarm older-or-equal to the snapshot's same-identity alarm is
// dropped; a strictly-newer one is delivered.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var t0 = DateTimeOffset.UtcNow;
// Buffered: "PumpFault" at t0 (dup) and "Overheat" at t0+1ms (newer, deliver).
var dupAlarm = new AlarmStateChanged(InstanceName, "PumpFault",
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
var newerAlarm = new AlarmStateChanged(InstanceName, "Overheat",
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 700, t0.AddMilliseconds(1));
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(dupAlarm);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(newerAlarm);
// Snapshot contains BOTH "PumpFault" and "Overheat" at t0.
var snapPumpFault = new AlarmStateChanged(InstanceName, "PumpFault",
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
var snapOverheat = new AlarmStateChanged(InstanceName, "Overheat",
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Normal, 0, t0);
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List { snapPumpFault, snapOverheat },
t0);
ctx.BridgeActor.Tell(snapshot);
// snapshot + only the strictly-newer "Overheat" alarm (PumpFault dropped).
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
Thread.Sleep(200);
lock (ctx.ReceivedEvents)
{
Assert.Equal(2, ctx.ReceivedEvents.Count);
Assert.IsType(ctx.ReceivedEvents[0]);
var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
Assert.Equal("Overheat", flushed.AlarmName);
Assert.Equal(700, flushed.Priority);
}
}
[Fact]
public void Buffered_Events_Flushed_In_Arrival_Order()
{
// M2.18: ordering preserved across multiple buffered events (none are dups —
// their entities are absent from the snapshot).
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var baseTime = DateTimeOffset.UtcNow;
var sub = ctx.MockGrpcClient.SubscribeCalls[0];
sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "A", 1, "Good", baseTime));
sub.OnEvent(new AlarmStateChanged(InstanceName, "AlarmX",
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 100, baseTime));
sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "B", 2, "Good", baseTime));
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
baseTime);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 4; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.IsType(ctx.ReceivedEvents[0]);
Assert.Equal("A", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
Assert.Equal("AlarmX", Assert.IsType(ctx.ReceivedEvents[2]).AlarmName);
Assert.Equal("B", Assert.IsType(ctx.ReceivedEvents[3]).AttributeName);
}
}
[Fact]
public void PassThrough_After_Flush_Delivers_Subsequent_Events_Immediately()
{
// M2.18: after the snapshot+flush the actor switches to pass-through — later
// events go straight to _onEvent (no buffering, no dup).
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
// Post-snapshot event — must be delivered immediately, exactly once.
var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good",
DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(postEvent);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.IsType(ctx.ReceivedEvents[1]);
}
}
[Fact]
public void InstanceNotFound_After_StreamFirst_Tears_Down_Stream_And_Does_Not_PassThrough()
{
// M2.18 + M2.11: stream-first means the gRPC subscription is already open
// when an InstanceNotFound snapshot arrives. The bridge must tear that stream
// down (Unsubscribe the just-opened correlation), deliver the not-found
// snapshot, NOT enter pass-through, and stop cleanly.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
// Stream opened up-front (stream-first).
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var notFoundSnapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow,
InstanceNotFound: true);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(notFoundSnapshot);
// Not-found snapshot delivered.
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.True(Assert.IsType(ctx.ReceivedEvents[0]).InstanceNotFound);
}
// The just-opened stream must be torn down.
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
TimeSpan.FromSeconds(3));
// Stops cleanly.
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
// No pass-through: an event arriving after the stop is not delivered.
var late = new AttributeValueChanged(InstanceName, "IO", "Temp", 1, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(late);
Thread.Sleep(200);
lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); }
}
[Fact]
public void Reconnect_During_Buffering_Phase_Keeps_Buffering_Until_Snapshot()
{
// M2.18: a gRPC error/reconnect BEFORE the snapshot arrives must remain in the
// buffering phase — events on the new stream are still buffered, then flushed
// when the snapshot finally arrives.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Error before snapshot → reconnect (still buffering).
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("pre-snapshot blip"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// Event on the reconnected stream — still buffered (snapshot not yet delivered).
var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Late", 7, "Good",
DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(gapEvent);
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
// snapshot + the event buffered across the reconnect.
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.IsType(ctx.ReceivedEvents[0]);
Assert.Equal("Late", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
}
}
[Fact]
public void Reconnect_After_Snapshot_Resumes_PassThrough_Not_Buffering()
{
// M2.18: a mid-session reconnect (after the snapshot was already delivered)
// must resume pass-through — the snapshot is a one-time thing and events on
// the reconnected stream are delivered immediately, not re-buffered.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
// Mid-session reconnect.
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("mid-session blip"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// Event on the reconnected stream — delivered immediately (pass-through).
var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 9, "Good",
DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(postEvent);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents)
{
Assert.Equal("Temp", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
}
}
[Fact]
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
{
// Communication-008: after a stream has been connected for the stability
// window, the retry budget is recovered — a later transient fault then gets
// a fresh set of retries rather than being counted against the old budget.
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300);
try
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
new List(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Two failures — but each new stream stays up long enough (the mock
// stream only completes on cancel) for the stability window to elapse
// and reset the retry budget before the next failure.
for (var i = 0; i < 5; i++)
{
Thread.Sleep(450); // exceed the 300ms stability window
ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}"));
var expected = i + 2;
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
}
// Five well-spaced failures did NOT terminate the actor because each
// reconnect recovered its retry budget after the stability window.
Assert.False(ctx.TerminatedFlag[0]);
}
finally
{
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
}
}
}
///
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
///
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
public List SubscribeCalls { get; } = new();
public List UnsubscribedCorrelationIds { get; } = new();
private MockSiteStreamGrpcClient(bool _) : base() { }
public MockSiteStreamGrpcClient() : base()
{
}
public override Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action onEvent,
Action onError,
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
SubscribeCalls.Add(subscription);
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetResult());
return tcs.Task;
}
public override void Unsubscribe(string correlationId)
{
UnsubscribedCorrelationIds.Add(correlationId);
}
}
internal record MockSubscription(
string CorrelationId,
string InstanceUniqueName,
Action OnEvent,
Action OnError,
CancellationToken CancellationToken);
///
/// Factory that always returns the pre-configured mock client.
///
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly MockSiteStreamGrpcClient _mockClient;
public List RequestedEndpoints { get; } = new();
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
_mockClient = mockClient;
}
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
RequestedEndpoints.Add(grpcEndpoint);
return _mockClient;
}
}
///
/// Endpoint-aware mock factory: hands out a distinct
/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour
/// so node-flip coverage is meaningful (Communication-015).
///
internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly System.Collections.Concurrent.ConcurrentDictionary _byEndpoint = new();
public EndpointTrackingGrpcClientFactory()
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
}
public MockSiteStreamGrpcClient ClientFor(string endpoint) =>
_byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient());
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
=> ClientFor(grpcEndpoint);
}