fix(debug-stream): M2.18 review nits — thread-safe test mock + AlarmKey null-guard + rename stale test (#26)
- MockSiteStreamGrpcClient.SubscribeCalls and UnsubscribedCorrelationIds switched from bare List<T> to lock-guarded backing fields with snapshot accessors, eliminating the actor-thread/test-thread data race (matches the existing lock(events) pattern for ReceivedEvents) - AttributeKey and AlarmKey null-guard each component with ?? string.Empty so a null SourceReference/AlarmName/etc. cannot silently collide with an empty-string component in the dedup dictionary - On_Snapshot_Opens_GrpcStream renamed to On_Snapshot_Does_Not_Open_Additional_GrpcStream; assertion updated to confirm exactly one subscribe (the PreStart stream-first open) with no second subscribe after snapshot delivery - _stopped ordering in InstanceNotFound path moved after CleanupGrpc() for consistency with DebugStreamTerminated and ReceiveTimeout handlers
This commit is contained in:
@@ -147,11 +147,13 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
|
|||||||
{
|
{
|
||||||
_log.Warning("Instance {0} is not deployed on site; terminating debug stream",
|
_log.Warning("Instance {0} is not deployed on site; terminating debug stream",
|
||||||
_instanceUniqueName);
|
_instanceUniqueName);
|
||||||
_stopped = true;
|
|
||||||
// M2.18: the stream-first subscription opened in PreStart is for a
|
// M2.18: the stream-first subscription opened in PreStart is for a
|
||||||
// non-deployed instance — cancel it (and any buffered gap events are
|
// non-deployed instance — cancel it (and any buffered gap events are
|
||||||
// discarded with the actor). No pass-through.
|
// discarded with the actor). No pass-through.
|
||||||
|
// _stopped is set AFTER CleanupGrpc() to match the ordering in the
|
||||||
|
// DebugStreamTerminated and ReceiveTimeout handlers (cosmetic consistency).
|
||||||
CleanupGrpc();
|
CleanupGrpc();
|
||||||
|
_stopped = true;
|
||||||
_preSnapshotBuffer.Clear();
|
_preSnapshotBuffer.Clear();
|
||||||
_onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true
|
_onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true
|
||||||
// Note: after Context.Stop(Self) below the actor is dead. DebugStreamService
|
// Note: after Context.Stop(Self) below the actor is dead. DebugStreamService
|
||||||
@@ -358,17 +360,29 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private const char KeyDelimiter = '\u0000';
|
private const char KeyDelimiter = '\u0000';
|
||||||
|
|
||||||
/// <summary>Per-entity dedup key for an attribute change.</summary>
|
/// <summary>
|
||||||
|
/// Per-entity dedup key for an attribute change. Each nullable component is guarded
|
||||||
|
/// with <c>?? string.Empty</c> so a null can never silently collide with another
|
||||||
|
/// key via <see cref="string.Concat"/> (e.g. two entries with null AttributePath
|
||||||
|
/// would otherwise share a key with any entry whose AttributePath is the empty string).
|
||||||
|
/// </summary>
|
||||||
private static string AttributeKey(AttributeValueChanged a) =>
|
private static string AttributeKey(AttributeValueChanged a) =>
|
||||||
string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName);
|
string.Concat(
|
||||||
|
a.InstanceUniqueName ?? string.Empty, KeyDelimiter,
|
||||||
|
a.AttributePath ?? string.Empty, KeyDelimiter,
|
||||||
|
a.AttributeName ?? string.Empty);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Per-entity dedup key for an alarm change. Includes <see cref="AlarmStateChanged.SourceReference"/>
|
/// Per-entity dedup key for an alarm change. Includes <see cref="AlarmStateChanged.SourceReference"/>
|
||||||
/// so native per-condition alarms (which share an AlarmName but differ by source
|
/// so native per-condition alarms (which share an AlarmName but differ by source
|
||||||
/// reference) are not conflated; empty for computed alarms.
|
/// reference) are not conflated; empty for computed alarms. Each nullable component is
|
||||||
|
/// guarded with <c>?? string.Empty</c> to prevent silent key collisions.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private static string AlarmKey(AlarmStateChanged al) =>
|
private static string AlarmKey(AlarmStateChanged al) =>
|
||||||
string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference);
|
string.Concat(
|
||||||
|
al.InstanceUniqueName ?? string.Empty, KeyDelimiter,
|
||||||
|
al.AlarmName ?? string.Empty, KeyDelimiter,
|
||||||
|
al.SourceReference ?? string.Empty);
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
protected override void PreStart()
|
protected override void PreStart()
|
||||||
|
|||||||
+32
-10
@@ -138,11 +138,18 @@ public class DebugStreamBridgeActorTests : TestKit
|
|||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void On_Snapshot_Opens_GrpcStream()
|
public void On_Snapshot_Does_Not_Open_Additional_GrpcStream()
|
||||||
{
|
{
|
||||||
|
// M2.18 stream-first: the gRPC subscription is opened in PreStart, BEFORE the
|
||||||
|
// snapshot arrives. After the snapshot is delivered the actor switches to
|
||||||
|
// pass-through — it must NOT open a second subscription. Exactly ONE subscribe
|
||||||
|
// call should have been made (the PreStart one).
|
||||||
var ctx = CreateBridgeActor();
|
var ctx = CreateBridgeActor();
|
||||||
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
// Verify the stream is already open before the snapshot.
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
var snapshot = new DebugViewSnapshot(
|
var snapshot = new DebugViewSnapshot(
|
||||||
InstanceName,
|
InstanceName,
|
||||||
new List<AttributeValueChanged>(),
|
new List<AttributeValueChanged>(),
|
||||||
@@ -151,11 +158,12 @@ public class DebugStreamBridgeActorTests : TestKit
|
|||||||
|
|
||||||
ctx.BridgeActor.Tell(snapshot);
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
|
||||||
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
// After snapshot delivery, still exactly ONE subscribe — no additional stream opened.
|
||||||
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
||||||
var call = ctx.MockGrpcClient.SubscribeCalls[0];
|
TimeSpan.FromSeconds(3));
|
||||||
Assert.Equal("corr-1", call.CorrelationId);
|
var singleCall = Assert.Single(ctx.MockGrpcClient.SubscribeCalls);
|
||||||
Assert.Equal(InstanceName, call.InstanceUniqueName);
|
Assert.Equal("corr-1", singleCall.CorrelationId);
|
||||||
|
Assert.Equal(InstanceName, singleCall.InstanceUniqueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -801,11 +809,25 @@ public class DebugStreamBridgeActorTests : TestKit
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
|
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
|
||||||
|
/// <para>
|
||||||
|
/// <b>Thread safety:</b> <see cref="SubscribeCalls"/> and
|
||||||
|
/// <see cref="UnsubscribedCorrelationIds"/> are written from the actor/background thread
|
||||||
|
/// (via <see cref="SubscribeAsync"/> and <see cref="Unsubscribe"/>) and read from the test
|
||||||
|
/// thread (via <c>AwaitCondition</c> / assertions). All access goes through a shared lock
|
||||||
|
/// to match the <c>lock (events)</c> pattern used for <c>ctx.ReceivedEvents</c>.
|
||||||
|
/// </para>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
||||||
{
|
{
|
||||||
public List<MockSubscription> SubscribeCalls { get; } = new();
|
private readonly object _lock = new();
|
||||||
public List<string> UnsubscribedCorrelationIds { get; } = new();
|
private readonly List<MockSubscription> _subscribeCalls = new();
|
||||||
|
private readonly List<string> _unsubscribedCorrelationIds = new();
|
||||||
|
|
||||||
|
/// <summary>Returns a snapshot of subscribe calls, taken under the internal lock.</summary>
|
||||||
|
public List<MockSubscription> SubscribeCalls { get { lock (_lock) { return _subscribeCalls.ToList(); } } }
|
||||||
|
|
||||||
|
/// <summary>Returns a snapshot of unsubscribed correlation IDs, taken under the internal lock.</summary>
|
||||||
|
public List<string> UnsubscribedCorrelationIds { get { lock (_lock) { return _unsubscribedCorrelationIds.ToList(); } } }
|
||||||
|
|
||||||
private MockSiteStreamGrpcClient(bool _) : base() { }
|
private MockSiteStreamGrpcClient(bool _) : base() { }
|
||||||
|
|
||||||
@@ -821,7 +843,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
|||||||
CancellationToken ct)
|
CancellationToken ct)
|
||||||
{
|
{
|
||||||
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
|
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
|
||||||
SubscribeCalls.Add(subscription);
|
lock (_lock) { _subscribeCalls.Add(subscription); }
|
||||||
|
|
||||||
// Return a task that completes when cancelled (simulates long-running stream)
|
// Return a task that completes when cancelled (simulates long-running stream)
|
||||||
var tcs = new TaskCompletionSource();
|
var tcs = new TaskCompletionSource();
|
||||||
@@ -831,7 +853,7 @@ internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
|||||||
|
|
||||||
public override void Unsubscribe(string correlationId)
|
public override void Unsubscribe(string correlationId)
|
||||||
{
|
{
|
||||||
UnsubscribedCorrelationIds.Add(correlationId);
|
lock (_lock) { _unsubscribedCorrelationIds.Add(correlationId); }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user