using Akka.Actor; using Akka.TestKit; using Akka.TestKit.Xunit2; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming; using ZB.MOM.WW.ScadaBridge.Communication.Actors; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc; /// /// Tests for DebugStreamBridgeActor with gRPC streaming integration. /// public class DebugStreamBridgeActorTests : TestKit { private const string SiteId = "site-alpha"; private const string InstanceName = "Site1.Pump01"; private const string GrpcNodeA = "http://localhost:5100"; private const string GrpcNodeB = "http://localhost:5200"; public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG") { // Use a very short reconnect delay for testing DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100); // Long stability window so streams are never considered "stable" mid-test // unless a test deliberately waits it out. DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); } private record TestContext( IActorRef BridgeActor, TestProbe CommProbe, MockSiteStreamGrpcClient MockGrpcClient, List ReceivedEvents, bool[] TerminatedFlag); private TestContext CreateBridgeActor() { var commProbe = CreateTestProbe(); var mockClient = new MockSiteStreamGrpcClient(); var factory = new MockSiteStreamGrpcClientFactory(mockClient); var events = new List(); var terminated = new[] { false }; Action onEvent = evt => { lock (events) { events.Add(evt); } }; Action onTerminated = () => terminated[0] = true; var props = Props.Create(typeof(DebugStreamBridgeActor), SiteId, InstanceName, "corr-1", commProbe.Ref, onEvent, onTerminated, factory, GrpcNodeA, GrpcNodeB); var actor = Sys.ActorOf(props); return new TestContext(actor, commProbe, mockClient, events, terminated); } [Fact] public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Tears_Down_Stream_And_Terminates() { // M2.11 (revised for M2.18 stream-first): the gRPC subscription is now opened // up-front in PreStart, so when the site reports InstanceNotFound=true the // bridge actor must // (a) forward the not-found snapshot to _onEvent so DebugStreamService's TCS // resolves and the caller can inspect the flag, // (b) tear DOWN the already-opened gRPC stream (Unsubscribe the just-opened // correlation) rather than enter pass-through, and // (c) stop itself cleanly. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // initial subscribe envelope // Stream-first: the gRPC subscription is opened before the snapshot arrives. AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var notFoundSnapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow, InstanceNotFound: true); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(notFoundSnapshot); // (a) _onEvent must receive the not-found snapshot AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { var received = Assert.IsType(ctx.ReceivedEvents[0]); Assert.True(received.InstanceNotFound); } // (b) the just-opened gRPC stream is torn down (not left running / no pass-through) AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"), TimeSpan.FromSeconds(3)); // (c) actor terminates cleanly ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3)); } [Fact] public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient() { var ctx = CreateBridgeActor(); var envelope = ctx.CommProbe.ExpectMsg(); Assert.Equal(SiteId, envelope.SiteId); Assert.IsType(envelope.Message); var req = (SubscribeDebugViewRequest)envelope.Message; Assert.Equal(InstanceName, req.InstanceUniqueName); Assert.Equal("corr-1", req.CorrelationId); } [Fact] public void On_Snapshot_Forwards_To_OnEvent_Callback() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); } } [Fact] public void On_Snapshot_Does_Not_Open_Additional_GrpcStream() { // M2.18 stream-first: the gRPC subscription is opened in PreStart, BEFORE the // snapshot arrives. After the snapshot is delivered the actor switches to // pass-through — it must NOT open a second subscription. Exactly ONE subscribe // call should have been made (the PreStart one). var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // Verify the stream is already open before the snapshot. AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); // After snapshot delivery, still exactly ONE subscribe — no additional stream opened. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); var singleCall = Assert.Single(ctx.MockGrpcClient.SubscribeCalls); Assert.Equal("corr-1", singleCall.CorrelationId); Assert.Equal(InstanceName, singleCall.InstanceUniqueName); } [Fact] public void Events_From_GrpcCallback_Forwarded_To_OnEvent() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC event arriving via the onEvent callback var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange); // snapshot + attr change AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); } } [Fact] public void On_GrpcError_Reconnects_To_Other_Node() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC error ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); // Should resubscribe AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId); } [Fact] public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect() { // Communication-002 regression: a reconnect must unsubscribe the previous // stream so the old node does not keep a zombie relay actor / subscription. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Simulate gRPC error → reconnect ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); // Should resubscribe... AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); // ...and must have unsubscribed the prior correlation ID so the old node's // relay actor is released rather than left zombie. Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); } [Fact] public void After_MaxRetries_Terminates() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // 4 consecutive errors: initial + 3 retries, then terminate ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); // Fourth error exceeds max retries ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // subscribe var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); ctx.BridgeActor.Tell(new StopDebugStream()); // Should send ClusterClient unsubscribe var envelope = ctx.CommProbe.ExpectMsg(); Assert.IsType(envelope.Message); // Should unsubscribe gRPC AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3)); Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); // Should stop self ExpectTerminated(ctx.BridgeActor); } [Fact] public void DebugStreamTerminated_Stops_Actor_Idempotently() { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1")); ExpectTerminated(ctx.BridgeActor); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries() { // Communication-008 regression: a stream that connects, delivers an event, // then fails — repeatedly — must still trip MaxRetries. The retry count is // NO LONGER reset by a received event (only by the stability window). The // previous behaviour reset _retryCount on every event, so a flapping site // reconnected forever and the debug session lived on indefinitely. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); // Flap: deliver one event then fail, three times. Each event would, under // the old buggy logic, reset the retry budget and prevent termination. for (var i = 0; i < 3; i++) { var call = ctx.MockGrpcClient.SubscribeCalls[i]; call.OnEvent(attrChange); call.OnError(new Exception($"Flap {i + 1}")); var expected = i + 2; AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); } // Fourth error (after the 3 retries) must exceed MaxRetries and terminate. ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange); ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4")); ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); Assert.True(ctx.TerminatedFlag[0]); } [Fact] public void On_GrpcError_Reconnects_To_Other_Node_Endpoint() { // Communication-015 regression: drive the bridge actor through a node flip // with an endpoint-aware factory (one distinct mock client per endpoint). // The first subscribe targets NodeA; after a gRPC error the bridge must // reconnect via a client bound to the *NodeB* endpoint. var commProbe = CreateTestProbe(); var factory = new EndpointTrackingGrpcClientFactory(); var events = new List(); var terminated = new[] { false }; var props = Props.Create(typeof(DebugStreamBridgeActor), SiteId, InstanceName, "corr-1", commProbe.Ref, (Action)(evt => { lock (events) { events.Add(evt); } }), (Action)(() => terminated[0] = true), factory, GrpcNodeA, GrpcNodeB); var actor = Sys.ActorOf(props); commProbe.ExpectMsg(); actor.Tell(new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow)); // First subscribe goes to NodeA. AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // gRPC error → bridge flips to NodeB. factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down")); // The reconnect must reach a client bound to the NodeB endpoint. AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1, TimeSpan.FromSeconds(5)); Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId); } // --------------------------------------------------------------------- // M2.18 (#26) — stream-first + replay/dedup // --------------------------------------------------------------------- [Fact] public void PreStart_Opens_GrpcStream_Before_Snapshot_Arrives() { // M2.18: the gRPC subscription must be opened in PreStart (stream-first), // BEFORE the snapshot is delivered, so live events start flowing during the // snapshot-build + network-transit window. The old lifecycle opened the // stream only after the snapshot arrived, losing gap-window events. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // initial subscribe envelope // No snapshot sent yet — the stream must already be open. AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[0].CorrelationId); Assert.Equal(InstanceName, ctx.MockGrpcClient.SubscribeCalls[0].InstanceUniqueName); // _onEvent must NOT have fired — buffering, not delivering. lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } } [Fact] public void GapWindow_Event_Buffered_Before_Snapshot_Is_Delivered_Exactly_Once_After_Snapshot() { // M2.18: an event arriving DURING the snapshot window (before the snapshot // is delivered) is buffered, then flushed exactly once AFTER the snapshot. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Live event arrives BEFORE the snapshot — its entity is NOT in the snapshot, // so it is a genuine gap-window event that must survive. var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Pressure", 99.9, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(gapEvent); // While buffering, _onEvent has not fired. lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); // snapshot then the buffered gap-window event, exactly once, in that order. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); var flushed = Assert.IsType(ctx.ReceivedEvents[1]); Assert.Equal("Pressure", flushed.AttributeName); } } [Fact] public void Buffered_Event_Already_Reflected_In_Snapshot_Is_Dropped() { // M2.18 dedup: a buffered event whose entity is in the snapshot with an equal // or newer snapshot timestamp (buffered.Timestamp <= snapshot.Timestamp) is // already reflected and must be DROPPED. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var t0 = DateTimeOffset.UtcNow; // Buffered event for "Temp" at t0. var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered); // Snapshot already contains "Temp" at the SAME timestamp t0 → buffered is a dup. var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0); var snapshot = new DebugViewSnapshot( InstanceName, new List { snapAttr }, new List(), t0); ctx.BridgeActor.Tell(snapshot); // Only the snapshot is delivered; the buffered duplicate is dropped. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); // Give a beat to ensure no extra (dropped) event sneaks through. Thread.Sleep(200); lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); Assert.IsType(ctx.ReceivedEvents[0]); } } [Fact] public void Buffered_Event_Strictly_Newer_Than_Snapshot_Entity_Is_Delivered() { // M2.18 dedup: a buffered event strictly newer than the snapshot's entry for // the same entity (buffered.Timestamp > snapshot.Timestamp) is NOT a dup and // must be DELIVERED after the snapshot. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var snapTime = DateTimeOffset.UtcNow; var newerTime = snapTime.AddMilliseconds(1); // Buffered event for "Temp" strictly NEWER than the snapshot's "Temp". var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 50.0, "Good", newerTime); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered); var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", snapTime); var snapshot = new DebugViewSnapshot( InstanceName, new List { snapAttr }, new List(), snapTime); ctx.BridgeActor.Tell(snapshot); // snapshot then the strictly-newer buffered event. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); var flushed = Assert.IsType(ctx.ReceivedEvents[1]); Assert.Equal(50.0, flushed.Value); Assert.Equal(newerTime, flushed.Timestamp); } } [Fact] public void Buffered_Alarm_Dedup_Uses_AlarmIdentity_And_Timestamp() { // M2.18 dedup for alarms: identity = (instance, alarm name, source reference). // A buffered alarm older-or-equal to the snapshot's same-identity alarm is // dropped; a strictly-newer one is delivered. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var t0 = DateTimeOffset.UtcNow; // Buffered: "PumpFault" at t0 (dup) and "Overheat" at t0+1ms (newer, deliver). var dupAlarm = new AlarmStateChanged(InstanceName, "PumpFault", ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0); var newerAlarm = new AlarmStateChanged(InstanceName, "Overheat", ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 700, t0.AddMilliseconds(1)); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(dupAlarm); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(newerAlarm); // Snapshot contains BOTH "PumpFault" and "Overheat" at t0. var snapPumpFault = new AlarmStateChanged(InstanceName, "PumpFault", ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0); var snapOverheat = new AlarmStateChanged(InstanceName, "Overheat", ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Normal, 0, t0); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List { snapPumpFault, snapOverheat }, t0); ctx.BridgeActor.Tell(snapshot); // snapshot + only the strictly-newer "Overheat" alarm (PumpFault dropped). AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); Thread.Sleep(200); lock (ctx.ReceivedEvents) { Assert.Equal(2, ctx.ReceivedEvents.Count); Assert.IsType(ctx.ReceivedEvents[0]); var flushed = Assert.IsType(ctx.ReceivedEvents[1]); Assert.Equal("Overheat", flushed.AlarmName); Assert.Equal(700, flushed.Priority); } } [Fact] public void Buffered_Events_Flushed_In_Arrival_Order() { // M2.18: ordering preserved across multiple buffered events (none are dups — // their entities are absent from the snapshot). var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var baseTime = DateTimeOffset.UtcNow; var sub = ctx.MockGrpcClient.SubscribeCalls[0]; sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "A", 1, "Good", baseTime)); sub.OnEvent(new AlarmStateChanged(InstanceName, "AlarmX", ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 100, baseTime)); sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "B", 2, "Good", baseTime)); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), baseTime); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 4; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); Assert.Equal("A", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); Assert.Equal("AlarmX", Assert.IsType(ctx.ReceivedEvents[2]).AlarmName); Assert.Equal("B", Assert.IsType(ctx.ReceivedEvents[3]).AttributeName); } } [Fact] public void PassThrough_After_Flush_Delivers_Subsequent_Events_Immediately() { // M2.18: after the snapshot+flush the actor switches to pass-through — later // events go straight to _onEvent (no buffering, no dup). var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); // Post-snapshot event — must be delivered immediately, exactly once. var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(postEvent); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); } } [Fact] public void InstanceNotFound_After_StreamFirst_Tears_Down_Stream_And_Does_Not_PassThrough() { // M2.18 + M2.11: stream-first means the gRPC subscription is already open // when an InstanceNotFound snapshot arrives. The bridge must tear that stream // down (Unsubscribe the just-opened correlation), deliver the not-found // snapshot, NOT enter pass-through, and stop cleanly. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // Stream opened up-front (stream-first). AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var notFoundSnapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow, InstanceNotFound: true); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(notFoundSnapshot); // Not-found snapshot delivered. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.True(Assert.IsType(ctx.ReceivedEvents[0]).InstanceNotFound); } // The just-opened stream must be torn down. AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"), TimeSpan.FromSeconds(3)); // Stops cleanly. ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3)); // No pass-through: an event arriving after the stop is not delivered. var late = new AttributeValueChanged(InstanceName, "IO", "Temp", 1, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(late); Thread.Sleep(200); lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); } } [Fact] public void Reconnect_During_Buffering_Phase_Keeps_Buffering_Until_Snapshot() { // M2.18: a gRPC error/reconnect BEFORE the snapshot arrives must remain in the // buffering phase — events on the new stream are still buffered, then flushed // when the snapshot finally arrives. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Error before snapshot → reconnect (still buffering). ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("pre-snapshot blip")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); // Event on the reconnected stream — still buffered (snapshot not yet delivered). var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Late", 7, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(gapEvent); lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); // snapshot + the event buffered across the reconnect. AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); Assert.Equal("Late", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); } } [Fact] public void Reconnect_After_Snapshot_Resumes_PassThrough_Not_Buffering() { // M2.18: a mid-session reconnect (after the snapshot was already delivered) // must resume pass-through — the snapshot is a one-time thing and events on // the reconnected stream are delivered immediately, not re-buffered. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, TimeSpan.FromSeconds(3)); // Mid-session reconnect. ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("mid-session blip")); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); // Event on the reconnected stream — delivered immediately (pass-through). var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 9, "Good", DateTimeOffset.UtcNow); ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(postEvent); AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, TimeSpan.FromSeconds(3)); lock (ctx.ReceivedEvents) { Assert.Equal("Temp", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); } } [Fact] public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow() { // Communication-008: after a stream has been connected for the stability // window, the retry budget is recovered — a later transient fault then gets // a fresh set of retries rather than being counted against the old budget. DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300); try { var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); var snapshot = new DebugViewSnapshot( InstanceName, new List(), new List(), DateTimeOffset.UtcNow); Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); // Two failures — but each new stream stays up long enough (the mock // stream only completes on cancel) for the stability window to elapse // and reset the retry budget before the next failure. for (var i = 0; i < 5; i++) { Thread.Sleep(450); // exceed the 300ms stability window ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}")); var expected = i + 2; AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); } // Five well-spaced failures did NOT terminate the actor because each // reconnect recovered its retry budget after the stability window. Assert.False(ctx.TerminatedFlag[0]); } finally { DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); } } } /// /// Mock gRPC client that records SubscribeAsync and Unsubscribe calls. /// /// Thread safety: and /// are written from the actor/background thread /// (via and ) and read from the test /// thread (via AwaitCondition / assertions). All access goes through a shared lock /// to match the lock (events) pattern used for ctx.ReceivedEvents. /// /// internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient { private readonly object _lock = new(); private readonly List _subscribeCalls = new(); private readonly List _unsubscribedCorrelationIds = new(); /// Returns a snapshot of subscribe calls, taken under the internal lock. public List SubscribeCalls { get { lock (_lock) { return _subscribeCalls.ToList(); } } } /// Returns a snapshot of unsubscribed correlation IDs, taken under the internal lock. public List UnsubscribedCorrelationIds { get { lock (_lock) { return _unsubscribedCorrelationIds.ToList(); } } } private MockSiteStreamGrpcClient(bool _) : base() { } public MockSiteStreamGrpcClient() : base() { } public override Task SubscribeAsync( string correlationId, string instanceUniqueName, Action onEvent, Action onError, CancellationToken ct) { var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct); lock (_lock) { _subscribeCalls.Add(subscription); } // Return a task that completes when cancelled (simulates long-running stream) var tcs = new TaskCompletionSource(); ct.Register(() => tcs.TrySetResult()); return tcs.Task; } public override void Unsubscribe(string correlationId) { lock (_lock) { _unsubscribedCorrelationIds.Add(correlationId); } } } internal record MockSubscription( string CorrelationId, string InstanceUniqueName, Action OnEvent, Action OnError, CancellationToken CancellationToken); /// /// Factory that always returns the pre-configured mock client. /// internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory { private readonly MockSiteStreamGrpcClient _mockClient; public List RequestedEndpoints { get; } = new(); public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient) : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) { _mockClient = mockClient; } public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { RequestedEndpoints.Add(grpcEndpoint); return _mockClient; } } /// /// Endpoint-aware mock factory: hands out a distinct /// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour /// so node-flip coverage is meaningful (Communication-015). /// internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory { private readonly System.Collections.Concurrent.ConcurrentDictionary _byEndpoint = new(); public EndpointTrackingGrpcClientFactory() : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) { } public MockSiteStreamGrpcClient ClientFor(string endpoint) => _byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient()); public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) => ClientFor(grpcEndpoint); }