a0d9379a4f
- MockSiteStreamGrpcClient.SubscribeCalls and UnsubscribedCorrelationIds switched from bare List<T> to lock-guarded backing fields with snapshot accessors, eliminating the actor-thread/test-thread data race (matches the existing lock(events) pattern for ReceivedEvents) - AttributeKey and AlarmKey null-guard each component with ?? string.Empty so a null SourceReference/AlarmName/etc. cannot silently collide with an empty-string component in the dedup dictionary - On_Snapshot_Opens_GrpcStream renamed to On_Snapshot_Does_Not_Open_Additional_GrpcStream; assertion updated to confirm exactly one subscribe (the PreStart stream-first open) with no second subscribe after snapshot delivery - _stopped ordering in InstanceNotFound path moved after CleanupGrpc() for consistency with DebugStreamTerminated and ReceiveTimeout handlers
908 lines
38 KiB
C#
908 lines
38 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit;
|
|
using Akka.TestKit.Xunit2;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
|
|
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
|
|
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
|
|
|
|
/// <summary>
|
|
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
|
|
/// </summary>
|
|
public class DebugStreamBridgeActorTests : TestKit
|
|
{
|
|
private const string SiteId = "site-alpha";
|
|
private const string InstanceName = "Site1.Pump01";
|
|
private const string GrpcNodeA = "http://localhost:5100";
|
|
private const string GrpcNodeB = "http://localhost:5200";
|
|
|
|
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
|
|
{
|
|
// Use a very short reconnect delay for testing
|
|
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
|
|
// Long stability window so streams are never considered "stable" mid-test
|
|
// unless a test deliberately waits it out.
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
|
|
}
|
|
|
|
private record TestContext(
|
|
IActorRef BridgeActor,
|
|
TestProbe CommProbe,
|
|
MockSiteStreamGrpcClient MockGrpcClient,
|
|
List<object> ReceivedEvents,
|
|
bool[] TerminatedFlag);
|
|
|
|
private TestContext CreateBridgeActor()
|
|
{
|
|
var commProbe = CreateTestProbe();
|
|
var mockClient = new MockSiteStreamGrpcClient();
|
|
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
|
|
var events = new List<object>();
|
|
var terminated = new[] { false };
|
|
|
|
Action<object> onEvent = evt => { lock (events) { events.Add(evt); } };
|
|
Action onTerminated = () => terminated[0] = true;
|
|
|
|
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
|
SiteId,
|
|
InstanceName,
|
|
"corr-1",
|
|
commProbe.Ref,
|
|
onEvent,
|
|
onTerminated,
|
|
factory,
|
|
GrpcNodeA,
|
|
GrpcNodeB);
|
|
|
|
var actor = Sys.ActorOf(props);
|
|
return new TestContext(actor, commProbe, mockClient, events, terminated);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Tears_Down_Stream_And_Terminates()
|
|
{
|
|
// M2.11 (revised for M2.18 stream-first): the gRPC subscription is now opened
|
|
// up-front in PreStart, so when the site reports InstanceNotFound=true the
|
|
// bridge actor must
|
|
// (a) forward the not-found snapshot to _onEvent so DebugStreamService's TCS
|
|
// resolves and the caller can inspect the flag,
|
|
// (b) tear DOWN the already-opened gRPC stream (Unsubscribe the just-opened
|
|
// correlation) rather than enter pass-through, and
|
|
// (c) stop itself cleanly.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // initial subscribe envelope
|
|
|
|
// Stream-first: the gRPC subscription is opened before the snapshot arrives.
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var notFoundSnapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow,
|
|
InstanceNotFound: true);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(notFoundSnapshot);
|
|
|
|
// (a) _onEvent must receive the not-found snapshot
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
var received = Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
Assert.True(received.InstanceNotFound);
|
|
}
|
|
|
|
// (b) the just-opened gRPC stream is torn down (not left running / no pass-through)
|
|
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// (c) actor terminates cleanly
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
|
|
}
|
|
|
|
[Fact]
|
|
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
|
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
Assert.Equal(SiteId, envelope.SiteId);
|
|
Assert.IsType<SubscribeDebugViewRequest>(envelope.Message);
|
|
|
|
var req = (SubscribeDebugViewRequest)envelope.Message;
|
|
Assert.Equal(InstanceName, req.InstanceUniqueName);
|
|
Assert.Equal("corr-1", req.CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_Snapshot_Forwards_To_OnEvent_Callback()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents) { Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]); }
|
|
}
|
|
|
|
[Fact]
|
|
public void On_Snapshot_Does_Not_Open_Additional_GrpcStream()
|
|
{
|
|
// M2.18 stream-first: the gRPC subscription is opened in PreStart, BEFORE the
|
|
// snapshot arrives. After the snapshot is delivered the actor switches to
|
|
// pass-through — it must NOT open a second subscription. Exactly ONE subscribe
|
|
// call should have been made (the PreStart one).
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
// Verify the stream is already open before the snapshot.
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// After snapshot delivery, still exactly ONE subscribe — no additional stream opened.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
var singleCall = Assert.Single(ctx.MockGrpcClient.SubscribeCalls);
|
|
Assert.Equal("corr-1", singleCall.CorrelationId);
|
|
Assert.Equal(InstanceName, singleCall.InstanceUniqueName);
|
|
}
|
|
|
|
[Fact]
|
|
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC event arriving via the onEvent callback
|
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
|
|
|
|
// snapshot + attr change
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents) { Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]); }
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Reconnects_To_Other_Node()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC error
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
|
|
|
|
// Should resubscribe
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect()
|
|
{
|
|
// Communication-002 regression: a reconnect must unsubscribe the previous
|
|
// stream so the old node does not keep a zombie relay actor / subscription.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Simulate gRPC error → reconnect
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
|
|
|
|
// Should resubscribe...
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
// ...and must have unsubscribed the prior correlation ID so the old node's
|
|
// relay actor is released rather than left zombie.
|
|
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
|
|
}
|
|
|
|
[Fact]
|
|
public void After_MaxRetries_Terminates()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// 4 consecutive errors: initial + 3 retries, then terminate
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
|
|
|
|
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
|
|
|
|
// Fourth error exceeds max retries
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // subscribe
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
ctx.BridgeActor.Tell(new StopDebugStream());
|
|
|
|
// Should send ClusterClient unsubscribe
|
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
Assert.IsType<UnsubscribeDebugViewRequest>(envelope.Message);
|
|
|
|
// Should unsubscribe gRPC
|
|
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
|
|
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
|
|
|
|
// Should stop self
|
|
ExpectTerminated(ctx.BridgeActor);
|
|
}
|
|
|
|
[Fact]
|
|
public void DebugStreamTerminated_Stops_Actor_Idempotently()
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor);
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries()
|
|
{
|
|
// Communication-008 regression: a stream that connects, delivers an event,
|
|
// then fails — repeatedly — must still trip MaxRetries. The retry count is
|
|
// NO LONGER reset by a received event (only by the stability window). The
|
|
// previous behaviour reset _retryCount on every event, so a flapping site
|
|
// reconnected forever and the debug session lived on indefinitely.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
|
|
|
// Flap: deliver one event then fail, three times. Each event would, under
|
|
// the old buggy logic, reset the retry budget and prevent termination.
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
var call = ctx.MockGrpcClient.SubscribeCalls[i];
|
|
call.OnEvent(attrChange);
|
|
call.OnError(new Exception($"Flap {i + 1}"));
|
|
var expected = i + 2;
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// Fourth error (after the 3 retries) must exceed MaxRetries and terminate.
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange);
|
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4"));
|
|
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
|
|
Assert.True(ctx.TerminatedFlag[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public void On_GrpcError_Reconnects_To_Other_Node_Endpoint()
|
|
{
|
|
// Communication-015 regression: drive the bridge actor through a node flip
|
|
// with an endpoint-aware factory (one distinct mock client per endpoint).
|
|
// The first subscribe targets NodeA; after a gRPC error the bridge must
|
|
// reconnect via a client bound to the *NodeB* endpoint.
|
|
var commProbe = CreateTestProbe();
|
|
var factory = new EndpointTrackingGrpcClientFactory();
|
|
var events = new List<object>();
|
|
var terminated = new[] { false };
|
|
|
|
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
|
SiteId, InstanceName, "corr-1", commProbe.Ref,
|
|
(Action<object>)(evt => { lock (events) { events.Add(evt); } }),
|
|
(Action)(() => terminated[0] = true),
|
|
factory, GrpcNodeA, GrpcNodeB);
|
|
|
|
var actor = Sys.ActorOf(props);
|
|
commProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
actor.Tell(new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow));
|
|
|
|
// First subscribe goes to NodeA.
|
|
AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1,
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// gRPC error → bridge flips to NodeB.
|
|
factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down"));
|
|
|
|
// The reconnect must reach a client bound to the NodeB endpoint.
|
|
AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1,
|
|
TimeSpan.FromSeconds(5));
|
|
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// M2.18 (#26) — stream-first + replay/dedup
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void PreStart_Opens_GrpcStream_Before_Snapshot_Arrives()
|
|
{
|
|
// M2.18: the gRPC subscription must be opened in PreStart (stream-first),
|
|
// BEFORE the snapshot is delivered, so live events start flowing during the
|
|
// snapshot-build + network-transit window. The old lifecycle opened the
|
|
// stream only after the snapshot arrived, losing gap-window events.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // initial subscribe envelope
|
|
|
|
// No snapshot sent yet — the stream must already be open.
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[0].CorrelationId);
|
|
Assert.Equal(InstanceName, ctx.MockGrpcClient.SubscribeCalls[0].InstanceUniqueName);
|
|
|
|
// _onEvent must NOT have fired — buffering, not delivering.
|
|
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
|
|
}
|
|
|
|
[Fact]
|
|
public void GapWindow_Event_Buffered_Before_Snapshot_Is_Delivered_Exactly_Once_After_Snapshot()
|
|
{
|
|
// M2.18: an event arriving DURING the snapshot window (before the snapshot
|
|
// is delivered) is buffered, then flushed exactly once AFTER the snapshot.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Live event arrives BEFORE the snapshot — its entity is NOT in the snapshot,
|
|
// so it is a genuine gap-window event that must survive.
|
|
var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Pressure", 99.9, "Good",
|
|
DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(gapEvent);
|
|
|
|
// While buffering, _onEvent has not fired.
|
|
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// snapshot then the buffered gap-window event, exactly once, in that order.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
var flushed = Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]);
|
|
Assert.Equal("Pressure", flushed.AttributeName);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void Buffered_Event_Already_Reflected_In_Snapshot_Is_Dropped()
|
|
{
|
|
// M2.18 dedup: a buffered event whose entity is in the snapshot with an equal
|
|
// or newer snapshot timestamp (buffered.Timestamp <= snapshot.Timestamp) is
|
|
// already reflected and must be DROPPED.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var t0 = DateTimeOffset.UtcNow;
|
|
|
|
// Buffered event for "Temp" at t0.
|
|
var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
|
|
|
|
// Snapshot already contains "Temp" at the SAME timestamp t0 → buffered is a dup.
|
|
var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged> { snapAttr },
|
|
new List<AlarmStateChanged>(),
|
|
t0);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// Only the snapshot is delivered; the buffered duplicate is dropped.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
// Give a beat to ensure no extra (dropped) event sneaks through.
|
|
Thread.Sleep(200);
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.Single(ctx.ReceivedEvents);
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void Buffered_Event_Strictly_Newer_Than_Snapshot_Entity_Is_Delivered()
|
|
{
|
|
// M2.18 dedup: a buffered event strictly newer than the snapshot's entry for
|
|
// the same entity (buffered.Timestamp > snapshot.Timestamp) is NOT a dup and
|
|
// must be DELIVERED after the snapshot.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var snapTime = DateTimeOffset.UtcNow;
|
|
var newerTime = snapTime.AddMilliseconds(1);
|
|
|
|
// Buffered event for "Temp" strictly NEWER than the snapshot's "Temp".
|
|
var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 50.0, "Good", newerTime);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
|
|
|
|
var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", snapTime);
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged> { snapAttr },
|
|
new List<AlarmStateChanged>(),
|
|
snapTime);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// snapshot then the strictly-newer buffered event.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
var flushed = Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]);
|
|
Assert.Equal(50.0, flushed.Value);
|
|
Assert.Equal(newerTime, flushed.Timestamp);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void Buffered_Alarm_Dedup_Uses_AlarmIdentity_And_Timestamp()
|
|
{
|
|
// M2.18 dedup for alarms: identity = (instance, alarm name, source reference).
|
|
// A buffered alarm older-or-equal to the snapshot's same-identity alarm is
|
|
// dropped; a strictly-newer one is delivered.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var t0 = DateTimeOffset.UtcNow;
|
|
|
|
// Buffered: "PumpFault" at t0 (dup) and "Overheat" at t0+1ms (newer, deliver).
|
|
var dupAlarm = new AlarmStateChanged(InstanceName, "PumpFault",
|
|
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
|
|
var newerAlarm = new AlarmStateChanged(InstanceName, "Overheat",
|
|
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 700, t0.AddMilliseconds(1));
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(dupAlarm);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(newerAlarm);
|
|
|
|
// Snapshot contains BOTH "PumpFault" and "Overheat" at t0.
|
|
var snapPumpFault = new AlarmStateChanged(InstanceName, "PumpFault",
|
|
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
|
|
var snapOverheat = new AlarmStateChanged(InstanceName, "Overheat",
|
|
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Normal, 0, t0);
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged> { snapPumpFault, snapOverheat },
|
|
t0);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// snapshot + only the strictly-newer "Overheat" alarm (PumpFault dropped).
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
Thread.Sleep(200);
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.Equal(2, ctx.ReceivedEvents.Count);
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
var flushed = Assert.IsType<AlarmStateChanged>(ctx.ReceivedEvents[1]);
|
|
Assert.Equal("Overheat", flushed.AlarmName);
|
|
Assert.Equal(700, flushed.Priority);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void Buffered_Events_Flushed_In_Arrival_Order()
|
|
{
|
|
// M2.18: ordering preserved across multiple buffered events (none are dups —
|
|
// their entities are absent from the snapshot).
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var baseTime = DateTimeOffset.UtcNow;
|
|
var sub = ctx.MockGrpcClient.SubscribeCalls[0];
|
|
sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "A", 1, "Good", baseTime));
|
|
sub.OnEvent(new AlarmStateChanged(InstanceName, "AlarmX",
|
|
ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 100, baseTime));
|
|
sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "B", 2, "Good", baseTime));
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
baseTime);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 4; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
Assert.Equal("A", Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]).AttributeName);
|
|
Assert.Equal("AlarmX", Assert.IsType<AlarmStateChanged>(ctx.ReceivedEvents[2]).AlarmName);
|
|
Assert.Equal("B", Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[3]).AttributeName);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void PassThrough_After_Flush_Delivers_Subsequent_Events_Immediately()
|
|
{
|
|
// M2.18: after the snapshot+flush the actor switches to pass-through — later
|
|
// events go straight to _onEvent (no buffering, no dup).
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Post-snapshot event — must be delivered immediately, exactly once.
|
|
var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good",
|
|
DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(postEvent);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void InstanceNotFound_After_StreamFirst_Tears_Down_Stream_And_Does_Not_PassThrough()
|
|
{
|
|
// M2.18 + M2.11: stream-first means the gRPC subscription is already open
|
|
// when an InstanceNotFound snapshot arrives. The bridge must tear that stream
|
|
// down (Unsubscribe the just-opened correlation), deliver the not-found
|
|
// snapshot, NOT enter pass-through, and stop cleanly.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
// Stream opened up-front (stream-first).
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var notFoundSnapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow,
|
|
InstanceNotFound: true);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(notFoundSnapshot);
|
|
|
|
// Not-found snapshot delivered.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.True(Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]).InstanceNotFound);
|
|
}
|
|
|
|
// The just-opened stream must be torn down.
|
|
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Stops cleanly.
|
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
|
|
|
|
// No pass-through: an event arriving after the stop is not delivered.
|
|
var late = new AttributeValueChanged(InstanceName, "IO", "Temp", 1, "Good", DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(late);
|
|
Thread.Sleep(200);
|
|
lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); }
|
|
}
|
|
|
|
[Fact]
|
|
public void Reconnect_During_Buffering_Phase_Keeps_Buffering_Until_Snapshot()
|
|
{
|
|
// M2.18: a gRPC error/reconnect BEFORE the snapshot arrives must remain in the
|
|
// buffering phase — events on the new stream are still buffered, then flushed
|
|
// when the snapshot finally arrives.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Error before snapshot → reconnect (still buffering).
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("pre-snapshot blip"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
// Event on the reconnected stream — still buffered (snapshot not yet delivered).
|
|
var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Late", 7, "Good",
|
|
DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(gapEvent);
|
|
lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
|
|
// snapshot + the event buffered across the reconnect.
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]);
|
|
Assert.Equal("Late", Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]).AttributeName);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void Reconnect_After_Snapshot_Resumes_PassThrough_Not_Buffering()
|
|
{
|
|
// M2.18: a mid-session reconnect (after the snapshot was already delivered)
|
|
// must resume pass-through — the snapshot is a one-time thing and events on
|
|
// the reconnected stream are delivered immediately, not re-buffered.
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Mid-session reconnect.
|
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("mid-session blip"));
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
|
|
|
// Event on the reconnected stream — delivered immediately (pass-through).
|
|
var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 9, "Good",
|
|
DateTimeOffset.UtcNow);
|
|
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(postEvent);
|
|
|
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
|
TimeSpan.FromSeconds(3));
|
|
lock (ctx.ReceivedEvents)
|
|
{
|
|
Assert.Equal("Temp", Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]).AttributeName);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
|
|
{
|
|
// Communication-008: after a stream has been connected for the stability
|
|
// window, the retry budget is recovered — a later transient fault then gets
|
|
// a fresh set of retries rather than being counted against the old budget.
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300);
|
|
try
|
|
{
|
|
var ctx = CreateBridgeActor();
|
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
|
|
|
var snapshot = new DebugViewSnapshot(
|
|
InstanceName,
|
|
new List<AttributeValueChanged>(),
|
|
new List<AlarmStateChanged>(),
|
|
DateTimeOffset.UtcNow);
|
|
|
|
Watch(ctx.BridgeActor);
|
|
ctx.BridgeActor.Tell(snapshot);
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
|
|
|
// Two failures — but each new stream stays up long enough (the mock
|
|
// stream only completes on cancel) for the stability window to elapse
|
|
// and reset the retry budget before the next failure.
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
Thread.Sleep(450); // exceed the 300ms stability window
|
|
ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}"));
|
|
var expected = i + 2;
|
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// Five well-spaced failures did NOT terminate the actor because each
|
|
// reconnect recovered its retry budget after the stability window.
|
|
Assert.False(ctx.TerminatedFlag[0]);
|
|
}
|
|
finally
|
|
{
|
|
DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
|
|
/// <para>
|
|
/// <b>Thread safety:</b> <see cref="SubscribeCalls"/> and
|
|
/// <see cref="UnsubscribedCorrelationIds"/> are written from the actor/background thread
|
|
/// (via <see cref="SubscribeAsync"/> and <see cref="Unsubscribe"/>) and read from the test
|
|
/// thread (via <c>AwaitCondition</c> / assertions). All access goes through a shared lock
|
|
/// to match the <c>lock (events)</c> pattern used for <c>ctx.ReceivedEvents</c>.
|
|
/// </para>
|
|
/// </summary>
|
|
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
|
{
|
|
private readonly object _lock = new();
|
|
private readonly List<MockSubscription> _subscribeCalls = new();
|
|
private readonly List<string> _unsubscribedCorrelationIds = new();
|
|
|
|
/// <summary>Returns a snapshot of subscribe calls, taken under the internal lock.</summary>
|
|
public List<MockSubscription> SubscribeCalls { get { lock (_lock) { return _subscribeCalls.ToList(); } } }
|
|
|
|
/// <summary>Returns a snapshot of unsubscribed correlation IDs, taken under the internal lock.</summary>
|
|
public List<string> UnsubscribedCorrelationIds { get { lock (_lock) { return _unsubscribedCorrelationIds.ToList(); } } }
|
|
|
|
private MockSiteStreamGrpcClient(bool _) : base() { }
|
|
|
|
public MockSiteStreamGrpcClient() : base()
|
|
{
|
|
}
|
|
|
|
public override Task SubscribeAsync(
|
|
string correlationId,
|
|
string instanceUniqueName,
|
|
Action<object> onEvent,
|
|
Action<Exception> onError,
|
|
CancellationToken ct)
|
|
{
|
|
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
|
|
lock (_lock) { _subscribeCalls.Add(subscription); }
|
|
|
|
// Return a task that completes when cancelled (simulates long-running stream)
|
|
var tcs = new TaskCompletionSource();
|
|
ct.Register(() => tcs.TrySetResult());
|
|
return tcs.Task;
|
|
}
|
|
|
|
public override void Unsubscribe(string correlationId)
|
|
{
|
|
lock (_lock) { _unsubscribedCorrelationIds.Add(correlationId); }
|
|
}
|
|
}
|
|
|
|
internal record MockSubscription(
|
|
string CorrelationId,
|
|
string InstanceUniqueName,
|
|
Action<object> OnEvent,
|
|
Action<Exception> OnError,
|
|
CancellationToken CancellationToken);
|
|
|
|
/// <summary>
|
|
/// Factory that always returns the pre-configured mock client.
|
|
/// </summary>
|
|
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
|
|
{
|
|
private readonly MockSiteStreamGrpcClient _mockClient;
|
|
public List<string> RequestedEndpoints { get; } = new();
|
|
|
|
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
|
|
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
|
{
|
|
_mockClient = mockClient;
|
|
}
|
|
|
|
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
|
{
|
|
RequestedEndpoints.Add(grpcEndpoint);
|
|
return _mockClient;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Endpoint-aware mock factory: hands out a distinct <see cref="MockSiteStreamGrpcClient"/>
|
|
/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour
|
|
/// so node-flip coverage is meaningful (Communication-015).
|
|
/// </summary>
|
|
internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory
|
|
{
|
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, MockSiteStreamGrpcClient> _byEndpoint = new();
|
|
|
|
public EndpointTrackingGrpcClientFactory()
|
|
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
|
{
|
|
}
|
|
|
|
public MockSiteStreamGrpcClient ClientFor(string endpoint) =>
|
|
_byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient());
|
|
|
|
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
|
=> ClientFor(grpcEndpoint);
|
|
}
|