From a9ceba00d02ced8ef039b49e9999d8694705f17d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 16 May 2026 18:32:52 -0400 Subject: [PATCH] =?UTF-8?q?fix(communication):=20resolve=20Communication-0?= =?UTF-8?q?01=20=E2=80=94=20early=20stream=20termination=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DebugStreamService.StartStreamAsync awaited the initial debug snapshot inside a try whose only handler was catch (OperationCanceledException). When the stream terminated before the snapshot arrived, onTerminatedWrapper completed the await with an InvalidOperationException that escaped the catch — the caller got a raw, untranslated exception and the service did no teardown of its own on that path. Replaced with catch (Exception): it removes the session entry, sends StopDebugStream to the bridge actor via the local reference (deterministic teardown, idempotent), and throws a descriptive exception — TimeoutException for the 30s timeout, otherwise an InvalidOperationException naming the instance/site and wrapping the cause. Re-triaged Critical -> Medium: the originally-claimed multi-minute site-side resource leak does not occur (the bridge actor self-terminates on every onTerminated path). Adds the first DebugStreamService test, which fails against the pre-fix code. --- code-reviews/Communication/findings.md | 68 +++++++++------- code-reviews/README.md | 11 ++- .../DebugStreamService.cs | 34 +++++--- .../DebugStreamServiceTests.cs | 77 +++++++++++++++++++ 4 files changed, 146 insertions(+), 44 deletions(-) create mode 100644 tests/ScadaLink.Communication.Tests/DebugStreamServiceTests.cs diff --git a/code-reviews/Communication/findings.md b/code-reviews/Communication/findings.md index 08e473f..62370c9 100644 --- a/code-reviews/Communication/findings.md +++ b/code-reviews/Communication/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 11 | +| Open findings | 10 | ## Summary @@ -16,9 +16,7 @@ The Communication module is generally well-structured and matches the design doc two-transport model (ClusterClient for command/control, gRPC server-streaming for real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for async work, and the gRPC server/client lifecycle is mostly disciplined. However the -review found one Critical issue (a `TimeoutException` from `DebugStreamService` leaves -an orphaned bridge actor and an active site-side subscription, leaking resources on -every snapshot timeout) and several High/Medium issues clustered around two themes: +review found several High and Medium issues clustered around two themes: **(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and removes subscription entries by correlation ID without disposal or ownership checks, so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream; @@ -44,43 +42,55 @@ mutation races, and the snapshot-timeout cleanup path. ## Findings -### Communication-001 — Snapshot timeout leaves orphaned bridge actor and site subscription +### Communication-001 — Early stream termination escapes StartStreamAsync's narrow exception handling | | | |--|--| -| Severity | Critical | -| Category | Performance & resource management | -| Status | Open | -| Location | `src/ScadaLink.Communication/DebugStreamService.cs:139`, `src/ScadaLink.Communication/DebugStreamService.cs:149` | +| Severity | Medium | +| Category | Error handling & resilience | +| Status | Resolved | +| Location | `src/ScadaLink.Communication/DebugStreamService.cs:130-143` | + +**Re-triaged 2026-05-16:** originally filed Critical, claiming an orphaned bridge actor +and a multi-minute site-side resource leak on every snapshot timeout. On verification +that impact does **not** occur: `DebugStreamBridgeActor` calls `CleanupGrpc()` and +`Context.Stop(Self)` on every path that invokes `onTerminated` (site disconnect, gRPC +max-retries, `ReceiveTimeout`), so it always self-terminates and releases its gRPC +subscription; and the pure-timeout path does reach `StopStream`, which also stops it. +The genuine defect described below is an error-handling gap, not a leak — severity +corrected to Medium. **Description** -When `StartStreamAsync` times out waiting for the initial snapshot it calls -`StopStream(sessionId)` and throws. `StopStream` only sends `StopDebugStream` to the -bridge actor **if the session is still in `_sessions`**. But the bridge actor was added -to `_sessions` at line 124 and is only removed by `onTerminatedWrapper`. The serious -case is the race where `onTerminatedWrapper` fires first (e.g. site disconnect arrives -during the wait): `snapshotTcs.TrySetException` completes the await with an -`InvalidOperationException` rather than `OperationCanceledException`, which is **not** -caught by the `catch (OperationCanceledException)` block. The exception propagates -uncaught, `StopStream` is never reached, and if the bridge actor is instead orphaned -(snapshot never arrives, site silent, no terminate) the only cleanup is the 5-minute -`ReceiveTimeout` in the actor — meaning a site-side `StreamRelayActor` and gRPC stream -can stay alive for up to 5 minutes after the central caller has given up. Combined with -the 30s timeout, every transient snapshot delay leaks site resources for minutes. +`StartStreamAsync` awaits the initial snapshot inside a `try` whose only handler is +`catch (OperationCanceledException)`. When the stream terminates before the snapshot +arrives, `onTerminatedWrapper` completes the await via +`snapshotTcs.TrySetException(new InvalidOperationException(...))`. That +`InvalidOperationException` is not an `OperationCanceledException`, so it escapes the +catch entirely: the caller (Blazor debug view / SignalR hub) receives a raw, +untranslated exception, and `StartStreamAsync` performs no teardown of its own on that +path — it relies implicitly on the bridge actor self-terminating. Cleanup from the +service side is therefore not deterministic, and the failure surfaced to the caller is +not a meaningful, documented result. **Recommendation** -In `StartStreamAsync`, wrap the `await` so that *any* failure or cancellation -deterministically calls `StopStream(sessionId)` (e.g. `try/catch (Exception)` or a -`finally` that stops the session when the result was not returned). Ensure -`StopStream` is idempotent and always sends `StopDebugStream` even if the session was -already removed, so the bridge actor (and its site-side subscription) is torn down -promptly rather than waiting for the orphan `ReceiveTimeout`. +In `StartStreamAsync`, catch any exception from the snapshot await, deterministically +tear down the bridge actor (`Tell(StopDebugStream)` via the local actor reference, since +a racing `onTerminatedWrapper` may already have removed the session entry), and translate +the failure into a meaningful exception for the caller. **Resolution** -_Unresolved._ +Resolved 2026-05-16. The `catch (OperationCanceledException)`-only block in +`StartStreamAsync` was replaced with `catch (Exception)`: it removes the session entry, +sends `StopDebugStream` to the bridge actor via the local reference (idempotent — the +actor may already be stopping itself), and throws a descriptive exception — +`TimeoutException` for the 30s timeout, otherwise an `InvalidOperationException` that +names the instance/site and wraps the underlying cause. Regression test +`DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException` +fails against the pre-fix code and passes after. Fixed by the commit whose message +references `Communication-001`. ### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors diff --git a/code-reviews/README.md b/code-reviews/README.md index 9fcf56d..fc57e07 100644 --- a/code-reviews/README.md +++ b/code-reviews/README.md @@ -30,15 +30,15 @@ code-reviews/ All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High, 100 Medium, 89 Low). The tables below track what remains **open** as findings are -resolved. +resolved and re-triaged. | Severity | Open findings | |----------|---------------| -| Critical | 5 | +| Critical | 4 | | High | 46 | | Medium | 100 | | Low | 89 | -| **Total** | **240** | +| **Total** | **239** | ## Module Status @@ -48,7 +48,7 @@ resolved. | [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 | | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 | | [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 | -| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 | 11 | +| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/3 | 10 | 11 | | [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 | | [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 | | [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 | @@ -71,12 +71,11 @@ Resolved findings drop off this list but remain recorded in their module's `findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4–§5). Full detail — description, location, recommendation — lives in the module's `findings.md`. -### Critical (5) +### Critical (4) | ID | Module | Title | |----|--------|-------| | CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement | -| Communication-001 | [Communication](Communication/findings.md) | Snapshot timeout leaves orphaned bridge actor and site subscription | | ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered | | NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) | | StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node | diff --git a/src/ScadaLink.Communication/DebugStreamService.cs b/src/ScadaLink.Communication/DebugStreamService.cs index b17197e..507e93e 100644 --- a/src/ScadaLink.Communication/DebugStreamService.cs +++ b/src/ScadaLink.Communication/DebugStreamService.cs @@ -127,20 +127,36 @@ public class DebugStreamService using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); + DebugViewSnapshot snapshot; try { - var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token); - - _logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}", - sessionId, instanceUniqueName, siteIdentifier); - - return new DebugStreamSession(sessionId, snapshot); + snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token); } - catch (OperationCanceledException) + catch (Exception ex) { - StopStream(sessionId); - throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}."); + // Any failure before the snapshot arrives — the 30s timeout, or the stream + // terminating early (site disconnect / gRPC failure, surfaced by + // onTerminatedWrapper as an InvalidOperationException) — must deterministically + // tear down the bridge actor and its site-side subscription. Use the local + // actor reference: a racing onTerminatedWrapper may already have removed the + // session, which would make StopStream a no-op. StopDebugStream is idempotent + // (the actor may already be stopping itself). + _sessions.TryRemove(sessionId, out _); + bridgeActor.Tell(new StopDebugStream()); + + if (ex is OperationCanceledException) + throw new TimeoutException( + $"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}."); + + throw new InvalidOperationException( + $"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.", + ex); } + + _logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}", + sessionId, instanceUniqueName, siteIdentifier); + + return new DebugStreamSession(sessionId, snapshot); } /// diff --git a/tests/ScadaLink.Communication.Tests/DebugStreamServiceTests.cs b/tests/ScadaLink.Communication.Tests/DebugStreamServiceTests.cs new file mode 100644 index 0000000..1ddf2f9 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/DebugStreamServiceTests.cs @@ -0,0 +1,77 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.Commons.Entities.Instances; +using ScadaLink.Commons.Entities.Sites; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Communication; +using ScadaLink.Communication.Actors; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests; + +/// +/// Tests for DebugStreamService session lifecycle. +/// +public class DebugStreamServiceTests : TestKit +{ + [Fact] + public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException() + { + // Regression test for Communication-001. When the debug stream terminates before + // the initial snapshot arrives, StartStreamAsync used to let the raw + // InvalidOperationException from onTerminatedWrapper escape its + // OperationCanceledException-only catch — the caller saw an untranslated exception + // and the failure path did not deterministically tear the bridge actor down. + // The fix catches any failure, tells the bridge actor StopDebugStream, and throws + // a descriptive exception that names the instance and wraps the underlying cause. + var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 }; + var site = new Site("Site One", "site-1") + { + Id = 3, + GrpcNodeAAddress = "http://localhost:5100", + GrpcNodeBAddress = "http://localhost:5200" + }; + + var instanceRepo = Substitute.For(); + instanceRepo.GetInstanceByIdAsync(7, Arg.Any()).Returns(instance); + var siteRepo = Substitute.For(); + siteRepo.GetSiteByIdAsync(3, Arg.Any()).Returns(site); + + var services = new ServiceCollection(); + services.AddScoped(_ => instanceRepo); + services.AddScoped(_ => siteRepo); + using var provider = services.BuildServiceProvider(); + + var commProbe = CreateTestProbe(); + var commService = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + commService.SetCommunicationActor(commProbe.Ref); + + using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance); + var service = new DebugStreamService( + commService, provider, grpcFactory, NullLogger.Instance); + service.SetActorSystem(Sys); + + // Act — start the stream; it blocks awaiting the initial snapshot. + var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { }); + + // The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor; + // the envelope's sender is the bridge actor itself. + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + var bridgeActor = commProbe.LastSender; + + // Simulate the site terminating the stream before any snapshot is delivered. + bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr")); + + // Assert — a descriptive exception that names the instance and wraps the cause, + // not the raw "terminated before snapshot received" InvalidOperationException. + var ex = await Assert.ThrowsAsync(() => startTask); + Assert.Contains("Site1.Pump01", ex.Message); + Assert.NotNull(ex.InnerException); + } +}