diff --git a/code-reviews/Communication/findings.md b/code-reviews/Communication/findings.md index 62370c9..84896fb 100644 --- a/code-reviews/Communication/findings.md +++ b/code-reviews/Communication/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 10 | +| Open findings | 8 | ## Summary @@ -98,7 +98,7 @@ references `Communication-001`. |--|--| | Severity | High | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:170`, `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:143` | **Description** @@ -126,7 +126,14 @@ the gRPC cancellation reaches the site and stops the relay actor. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit ``). Root cause confirmed against source: +`HandleGrpcError` flipped `_useNodeA` and scheduled `OpenGrpcStream` without ever +unsubscribing the failed stream, leaving the old node's `StreamRelayActor` zombie until +TCP/keepalive timeout. Fix: `HandleGrpcError` now resolves the client for the +*previous* endpoint (before flipping `_useNodeA`) and calls `Unsubscribe(_correlationId)` +on it, so the local CTS is cancelled and gRPC cancellation reaches the still-alive site. +Regression test `DebugStreamBridgeActorTests.On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect` +fails against the pre-fix code and passes after. ### Communication-003 — SiteStreamGrpcClient subscription map overwritten without disposal; reconnect can cancel the wrong stream @@ -134,7 +141,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:77`, `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:106` | **Description** @@ -161,7 +168,18 @@ caller-supplied correlation ID. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit ``). Root cause confirmed against source: the +inline `_subscriptions[correlationId] = cts` overwrote a prior CTS without +cancel/dispose (leak), and the `finally`'s `TryRemove(correlationId, out _)` removed by +key only — a racing reconnect's live CTS could be removed by the prior call's `finally`, +orphaning the live stream. Fix: extracted two internal helpers used by `SubscribeAsync` +— `RegisterSubscription` cancels+disposes any existing CTS for the correlation ID before +inserting, and `RemoveSubscription` uses the `ConcurrentDictionary.TryRemove(KeyValuePair)` +overload so it removes only the CTS that call created (mirroring `SiteStreamGrpcServer`'s +`StreamEntry` pattern). Regression tests +`SiteStreamGrpcClientTests.RegisterSubscription_ReusedCorrelationId_CancelsAndDisposesPriorCts` +and `SiteStreamGrpcClientTests.RemoveSubscription_OnlyRemovesOwnCts_NotAReplacement` +fail against the pre-fix logic and pass after. ### Communication-004 — Coordinator actors declare no SupervisorStrategy (design requires Resume) diff --git a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs index 5e14aab..7dc6cfb 100644 --- a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs +++ b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs @@ -183,6 +183,15 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers return; } + // Unsubscribe the failed stream on the *previous* endpoint before reconnecting. + // This cancels the local subscription CTS and -- where the channel is still + // alive -- propagates gRPC cancellation to the site so its SiteStreamGrpcServer + // stops the StreamRelayActor for this correlation ID, rather than leaving a + // zombie relay actor until TCP RST / keepalive eventually detects the loss. + var previousEndpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress; + var previousClient = _grpcFactory.GetOrCreate(_siteIdentifier, previousEndpoint); + previousClient.Unsubscribe(_correlationId); + // Flip to the other node _useNodeA = !_useNodeA; diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs index b9a9dcb..da211f3 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs @@ -57,6 +57,32 @@ public class SiteStreamGrpcClient : IAsyncDisposable _subscriptions[correlationId] = cts; } + /// + /// Registers a subscription's CancellationTokenSource for a correlation ID. + /// If an entry already exists for that correlation ID (a reconnect race where two + /// calls briefly share an ID), the prior CTS is + /// cancelled and disposed so it cannot leak. Internal for testability. + /// + internal void RegisterSubscription(string correlationId, CancellationTokenSource cts) + { + if (_subscriptions.TryGetValue(correlationId, out var prior) && !ReferenceEquals(prior, cts)) + { + prior.Cancel(); + prior.Dispose(); + } + _subscriptions[correlationId] = cts; + } + + /// + /// Removes the subscription entry for a correlation ID only if the stored CTS is + /// exactly the one supplied. A racing replacement stream may already own the slot, + /// in which case this is a no-op. Internal for testability. + /// + internal void RemoveSubscription(string correlationId, CancellationTokenSource cts) + { + _subscriptions.TryRemove(new KeyValuePair(correlationId, cts)); + } + /// /// Opens a server-streaming subscription for a specific instance. /// This is a long-running async method; the caller launches it as a background task. @@ -74,7 +100,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable throw new InvalidOperationException("Cannot subscribe on a test-only client."); var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - _subscriptions[correlationId] = cts; + RegisterSubscription(correlationId, cts); var request = new InstanceStreamRequest { @@ -103,7 +129,8 @@ public class SiteStreamGrpcClient : IAsyncDisposable } finally { - _subscriptions.TryRemove(correlationId, out _); + // Remove only our own entry -- a racing reconnect may already own the slot. + RemoveSubscription(correlationId, cts); } } diff --git a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs index c713c82..d4d3c51 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -159,6 +159,34 @@ public class DebugStreamBridgeActorTests : TestKit Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId); } + [Fact] + public void On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect() + { + // Communication-002 regression: a reconnect must unsubscribe the previous + // stream so the old node does not keep a zombie relay actor / subscription. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Simulate gRPC error → reconnect + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); + + // Should resubscribe... + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + + // ...and must have unsubscribed the prior correlation ID so the old node's + // relay actor is released rather than left zombie. + Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); + } + [Fact] public void After_MaxRetries_Terminates() { diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs index b737927..92739e6 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs @@ -176,4 +176,49 @@ public class SiteStreamGrpcClientTests Assert.True(cts1.IsCancellationRequested); Assert.True(cts2.IsCancellationRequested); } + + // --- Communication-003 regression tests --- + + [Fact] + public void RegisterSubscription_ReusedCorrelationId_CancelsAndDisposesPriorCts() + { + // Two SubscribeAsync calls briefly sharing a correlation ID (reconnect race). + // Inserting the second must cancel + dispose the first so it does not leak. + var client = SiteStreamGrpcClient.CreateForTesting(); + + var first = new CancellationTokenSource(); + var second = new CancellationTokenSource(); + + client.RegisterSubscription("corr-shared", first); + client.RegisterSubscription("corr-shared", second); + + Assert.True(first.IsCancellationRequested); + // Disposed CTS throws ObjectDisposedException when its token is touched. + Assert.Throws(() => _ = first.Token); + + // The second (live) CTS must remain intact. + Assert.False(second.IsCancellationRequested); + } + + [Fact] + public void RemoveSubscription_OnlyRemovesOwnCts_NotAReplacement() + { + // First call's finally must NOT remove the second call's live entry. + var client = SiteStreamGrpcClient.CreateForTesting(); + + var first = new CancellationTokenSource(); + var second = new CancellationTokenSource(); + + client.RegisterSubscription("corr-shared", first); + // A racing second SubscribeAsync replaces the entry. + client.RegisterSubscription("corr-shared", second); + + // The first call's finally runs and tries to remove its (already-replaced) entry. + client.RemoveSubscription("corr-shared", first); + + // The live (second) subscription must still be cancellable via Unsubscribe. + Assert.False(second.IsCancellationRequested); + client.Unsubscribe("corr-shared"); + Assert.True(second.IsCancellationRequested); + } }