From a0d9379a4f5a4f2249997655b33ae2e55555971b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 07:41:41 -0400 Subject: [PATCH] =?UTF-8?q?fix(debug-stream):=20M2.18=20review=20nits=20?= =?UTF-8?q?=E2=80=94=20thread-safe=20test=20mock=20+=20AlarmKey=20null-gua?= =?UTF-8?q?rd=20+=20rename=20stale=20test=20(#26)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - MockSiteStreamGrpcClient.SubscribeCalls and UnsubscribedCorrelationIds switched from bare List to lock-guarded backing fields with snapshot accessors, eliminating the actor-thread/test-thread data race (matches the existing lock(events) pattern for ReceivedEvents) - AttributeKey and AlarmKey null-guard each component with ?? string.Empty so a null SourceReference/AlarmName/etc. cannot silently collide with an empty-string component in the dedup dictionary - On_Snapshot_Opens_GrpcStream renamed to On_Snapshot_Does_Not_Open_Additional_GrpcStream; assertion updated to confirm exactly one subscribe (the PreStart stream-first open) with no second subscribe after snapshot delivery - _stopped ordering in InstanceNotFound path moved after CleanupGrpc() for consistency with DebugStreamTerminated and ReceiveTimeout handlers --- .../Actors/DebugStreamBridgeActor.cs | 24 ++++++++--- .../Grpc/DebugStreamBridgeActorTests.cs | 42 ++++++++++++++----- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs index 941b38f8..63a3da61 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs @@ -147,11 +147,13 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers { _log.Warning("Instance {0} is not deployed on site; terminating debug stream", _instanceUniqueName); - _stopped = true; // M2.18: the stream-first subscription opened in PreStart is for a // non-deployed instance — cancel it (and any buffered gap events are // discarded with the actor). No pass-through. + // _stopped is set AFTER CleanupGrpc() to match the ordering in the + // DebugStreamTerminated and ReceiveTimeout handlers (cosmetic consistency). CleanupGrpc(); + _stopped = true; _preSnapshotBuffer.Clear(); _onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true // Note: after Context.Stop(Self) below the actor is dead. DebugStreamService @@ -358,17 +360,29 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers /// private const char KeyDelimiter = '\u0000'; - /// Per-entity dedup key for an attribute change. + /// + /// Per-entity dedup key for an attribute change. Each nullable component is guarded + /// with ?? string.Empty so a null can never silently collide with another + /// key via (e.g. two entries with null AttributePath + /// would otherwise share a key with any entry whose AttributePath is the empty string). + /// private static string AttributeKey(AttributeValueChanged a) => - string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName); + string.Concat( + a.InstanceUniqueName ?? string.Empty, KeyDelimiter, + a.AttributePath ?? string.Empty, KeyDelimiter, + a.AttributeName ?? string.Empty); /// /// Per-entity dedup key for an alarm change. Includes /// so native per-condition alarms (which share an AlarmName but differ by source - /// reference) are not conflated; empty for computed alarms. + /// reference) are not conflated; empty for computed alarms. Each nullable component is + /// guarded with ?? string.Empty to prevent silent key collisions. /// private static string AlarmKey(AlarmStateChanged al) => - string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference); + string.Concat( + al.InstanceUniqueName ?? string.Empty, KeyDelimiter, + al.AlarmName ?? string.Empty, KeyDelimiter, + al.SourceReference ?? string.Empty); /// protected override void PreStart() diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs index 619b05c5..6ea2faff 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -138,11 +138,18 @@ public class DebugStreamBridgeActorTests : TestKit } [Fact] - public void On_Snapshot_Opens_GrpcStream() + public void On_Snapshot_Does_Not_Open_Additional_GrpcStream() { + // M2.18 stream-first: the gRPC subscription is opened in PreStart, BEFORE the + // snapshot arrives. After the snapshot is delivered the actor switches to + // pass-through — it must NOT open a second subscription. Exactly ONE subscribe + // call should have been made (the PreStart one). var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); + // Verify the stream is already open before the snapshot. + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + var snapshot = new DebugViewSnapshot( InstanceName, new List(), @@ -151,11 +158,12 @@ public class DebugStreamBridgeActorTests : TestKit ctx.BridgeActor.Tell(snapshot); - AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); - - var call = ctx.MockGrpcClient.SubscribeCalls[0]; - Assert.Equal("corr-1", call.CorrelationId); - Assert.Equal(InstanceName, call.InstanceUniqueName); + // After snapshot delivery, still exactly ONE subscribe — no additional stream opened. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + var singleCall = Assert.Single(ctx.MockGrpcClient.SubscribeCalls); + Assert.Equal("corr-1", singleCall.CorrelationId); + Assert.Equal(InstanceName, singleCall.InstanceUniqueName); } [Fact] @@ -801,11 +809,25 @@ public class DebugStreamBridgeActorTests : TestKit /// /// Mock gRPC client that records SubscribeAsync and Unsubscribe calls. +/// +/// Thread safety: and +/// are written from the actor/background thread +/// (via and ) and read from the test +/// thread (via AwaitCondition / assertions). All access goes through a shared lock +/// to match the lock (events) pattern used for ctx.ReceivedEvents. +/// /// internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient { - public List SubscribeCalls { get; } = new(); - public List UnsubscribedCorrelationIds { get; } = new(); + private readonly object _lock = new(); + private readonly List _subscribeCalls = new(); + private readonly List _unsubscribedCorrelationIds = new(); + + /// Returns a snapshot of subscribe calls, taken under the internal lock. + public List SubscribeCalls { get { lock (_lock) { return _subscribeCalls.ToList(); } } } + + /// Returns a snapshot of unsubscribed correlation IDs, taken under the internal lock. + public List UnsubscribedCorrelationIds { get { lock (_lock) { return _unsubscribedCorrelationIds.ToList(); } } } private MockSiteStreamGrpcClient(bool _) : base() { } @@ -821,7 +843,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient CancellationToken ct) { var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct); - SubscribeCalls.Add(subscription); + lock (_lock) { _subscribeCalls.Add(subscription); } // Return a task that completes when cancelled (simulates long-running stream) var tcs = new TaskCompletionSource(); @@ -831,7 +853,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient public override void Unsubscribe(string correlationId) { - UnsubscribedCorrelationIds.Add(correlationId); + lock (_lock) { _unsubscribedCorrelationIds.Add(correlationId); } } }