diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
index 941b38f8..63a3da61 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
@@ -147,11 +147,13 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
_log.Warning("Instance {0} is not deployed on site; terminating debug stream",
_instanceUniqueName);
- _stopped = true;
// M2.18: the stream-first subscription opened in PreStart is for a
// non-deployed instance — cancel it (and any buffered gap events are
// discarded with the actor). No pass-through.
+ // _stopped is set AFTER CleanupGrpc() to match the ordering in the
+ // DebugStreamTerminated and ReceiveTimeout handlers (cosmetic consistency).
CleanupGrpc();
+ _stopped = true;
_preSnapshotBuffer.Clear();
_onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true
// Note: after Context.Stop(Self) below the actor is dead. DebugStreamService
@@ -358,17 +360,29 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
///
private const char KeyDelimiter = '\u0000';
- /// Per-entity dedup key for an attribute change.
+ ///
+ /// Per-entity dedup key for an attribute change. Each nullable component is guarded
+ /// with ?? string.Empty so a null can never silently collide with another
+ /// key via (e.g. two entries with null AttributePath
+ /// would otherwise share a key with any entry whose AttributePath is the empty string).
+ ///
private static string AttributeKey(AttributeValueChanged a) =>
- string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName);
+ string.Concat(
+ a.InstanceUniqueName ?? string.Empty, KeyDelimiter,
+ a.AttributePath ?? string.Empty, KeyDelimiter,
+ a.AttributeName ?? string.Empty);
///
/// Per-entity dedup key for an alarm change. Includes
/// so native per-condition alarms (which share an AlarmName but differ by source
- /// reference) are not conflated; empty for computed alarms.
+ /// reference) are not conflated; empty for computed alarms. Each nullable component is
+ /// guarded with ?? string.Empty to prevent silent key collisions.
///
private static string AlarmKey(AlarmStateChanged al) =>
- string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference);
+ string.Concat(
+ al.InstanceUniqueName ?? string.Empty, KeyDelimiter,
+ al.AlarmName ?? string.Empty, KeyDelimiter,
+ al.SourceReference ?? string.Empty);
///
protected override void PreStart()
diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
index 619b05c5..6ea2faff 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
@@ -138,11 +138,18 @@ public class DebugStreamBridgeActorTests : TestKit
}
[Fact]
- public void On_Snapshot_Opens_GrpcStream()
+ public void On_Snapshot_Does_Not_Open_Additional_GrpcStream()
{
+ // M2.18 stream-first: the gRPC subscription is opened in PreStart, BEFORE the
+ // snapshot arrives. After the snapshot is delivered the actor switches to
+ // pass-through — it must NOT open a second subscription. Exactly ONE subscribe
+ // call should have been made (the PreStart one).
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg();
+ // Verify the stream is already open before the snapshot.
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
var snapshot = new DebugViewSnapshot(
InstanceName,
new List(),
@@ -151,11 +158,12 @@ public class DebugStreamBridgeActorTests : TestKit
ctx.BridgeActor.Tell(snapshot);
- AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
-
- var call = ctx.MockGrpcClient.SubscribeCalls[0];
- Assert.Equal("corr-1", call.CorrelationId);
- Assert.Equal(InstanceName, call.InstanceUniqueName);
+ // After snapshot delivery, still exactly ONE subscribe — no additional stream opened.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
+ TimeSpan.FromSeconds(3));
+ var singleCall = Assert.Single(ctx.MockGrpcClient.SubscribeCalls);
+ Assert.Equal("corr-1", singleCall.CorrelationId);
+ Assert.Equal(InstanceName, singleCall.InstanceUniqueName);
}
[Fact]
@@ -801,11 +809,25 @@ public class DebugStreamBridgeActorTests : TestKit
///
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
+///
+/// Thread safety: and
+/// are written from the actor/background thread
+/// (via and ) and read from the test
+/// thread (via AwaitCondition / assertions). All access goes through a shared lock
+/// to match the lock (events) pattern used for ctx.ReceivedEvents.
+///
///
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
- public List SubscribeCalls { get; } = new();
- public List UnsubscribedCorrelationIds { get; } = new();
+ private readonly object _lock = new();
+ private readonly List _subscribeCalls = new();
+ private readonly List _unsubscribedCorrelationIds = new();
+
+ /// Returns a snapshot of subscribe calls, taken under the internal lock.
+ public List SubscribeCalls { get { lock (_lock) { return _subscribeCalls.ToList(); } } }
+
+ /// Returns a snapshot of unsubscribed correlation IDs, taken under the internal lock.
+ public List UnsubscribedCorrelationIds { get { lock (_lock) { return _unsubscribedCorrelationIds.ToList(); } } }
private MockSiteStreamGrpcClient(bool _) : base() { }
@@ -821,7 +843,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
- SubscribeCalls.Add(subscription);
+ lock (_lock) { _subscribeCalls.Add(subscription); }
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
@@ -831,7 +853,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
public override void Unsubscribe(string correlationId)
{
- UnsubscribedCorrelationIds.Add(correlationId);
+ lock (_lock) { _unsubscribedCorrelationIds.Add(correlationId); }
}
}