fix(communication): resolve Communication-001 — early stream termination handling

DebugStreamService.StartStreamAsync awaited the initial debug snapshot inside
a try whose only handler was catch (OperationCanceledException). When the
stream terminated before the snapshot arrived, onTerminatedWrapper completed
the await with an InvalidOperationException that escaped the catch — the
caller got a raw, untranslated exception and the service did no teardown of
its own on that path.

Replaced with catch (Exception): it removes the session entry, sends
StopDebugStream to the bridge actor via the local reference (deterministic
teardown, idempotent), and throws a descriptive exception — TimeoutException
for the 30s timeout, otherwise an InvalidOperationException naming the
instance/site and wrapping the cause.

Re-triaged Critical -> Medium: the originally-claimed multi-minute site-side
resource leak does not occur (the bridge actor self-terminates on every
onTerminated path). Adds the first DebugStreamService test, which fails
against the pre-fix code.
This commit is contained in:
Joseph Doherty
2026-05-16 18:32:52 -04:00
parent 239bee3bc4
commit a9ceba00d0
4 changed files with 146 additions and 44 deletions

View File

@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 11 | | Open findings | 10 |
## Summary ## Summary
@@ -16,9 +16,7 @@ The Communication module is generally well-structured and matches the design doc
two-transport model (ClusterClient for command/control, gRPC server-streaming for two-transport model (ClusterClient for command/control, gRPC server-streaming for
real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for
async work, and the gRPC server/client lifecycle is mostly disciplined. However the async work, and the gRPC server/client lifecycle is mostly disciplined. However the
review found one Critical issue (a `TimeoutException` from `DebugStreamService` leaves review found several High and Medium issues clustered around two themes:
an orphaned bridge actor and an active site-side subscription, leaking resources on
every snapshot timeout) and several High/Medium issues clustered around two themes:
**(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and **(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and
removes subscription entries by correlation ID without disposal or ownership checks, removes subscription entries by correlation ID without disposal or ownership checks,
so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream; so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream;
@@ -44,43 +42,55 @@ mutation races, and the snapshot-timeout cleanup path.
## Findings ## Findings
### Communication-001 — Snapshot timeout leaves orphaned bridge actor and site subscription ### Communication-001 — Early stream termination escapes StartStreamAsync's narrow exception handling
| | | | | |
|--|--| |--|--|
| Severity | Critical | | Severity | Medium |
| Category | Performance & resource management | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.Communication/DebugStreamService.cs:139`, `src/ScadaLink.Communication/DebugStreamService.cs:149` | | Location | `src/ScadaLink.Communication/DebugStreamService.cs:130-143` |
**Re-triaged 2026-05-16:** originally filed Critical, claiming an orphaned bridge actor
and a multi-minute site-side resource leak on every snapshot timeout. On verification
that impact does **not** occur: `DebugStreamBridgeActor` calls `CleanupGrpc()` and
`Context.Stop(Self)` on every path that invokes `onTerminated` (site disconnect, gRPC
max-retries, `ReceiveTimeout`), so it always self-terminates and releases its gRPC
subscription; and the pure-timeout path does reach `StopStream`, which also stops it.
The genuine defect described below is an error-handling gap, not a leak — severity
corrected to Medium.
**Description** **Description**
When `StartStreamAsync` times out waiting for the initial snapshot it calls `StartStreamAsync` awaits the initial snapshot inside a `try` whose only handler is
`StopStream(sessionId)` and throws. `StopStream` only sends `StopDebugStream` to the `catch (OperationCanceledException)`. When the stream terminates before the snapshot
bridge actor **if the session is still in `_sessions`**. But the bridge actor was added arrives, `onTerminatedWrapper` completes the await via
to `_sessions` at line 124 and is only removed by `onTerminatedWrapper`. The serious `snapshotTcs.TrySetException(new InvalidOperationException(...))`. That
case is the race where `onTerminatedWrapper` fires first (e.g. site disconnect arrives `InvalidOperationException` is not an `OperationCanceledException`, so it escapes the
during the wait): `snapshotTcs.TrySetException` completes the await with an catch entirely: the caller (Blazor debug view / SignalR hub) receives a raw,
`InvalidOperationException` rather than `OperationCanceledException`, which is **not** untranslated exception, and `StartStreamAsync` performs no teardown of its own on that
caught by the `catch (OperationCanceledException)` block. The exception propagates path — it relies implicitly on the bridge actor self-terminating. Cleanup from the
uncaught, `StopStream` is never reached, and if the bridge actor is instead orphaned service side is therefore not deterministic, and the failure surfaced to the caller is
(snapshot never arrives, site silent, no terminate) the only cleanup is the 5-minute not a meaningful, documented result.
`ReceiveTimeout` in the actor — meaning a site-side `StreamRelayActor` and gRPC stream
can stay alive for up to 5 minutes after the central caller has given up. Combined with
the 30s timeout, every transient snapshot delay leaks site resources for minutes.
**Recommendation** **Recommendation**
In `StartStreamAsync`, wrap the `await` so that *any* failure or cancellation In `StartStreamAsync`, catch any exception from the snapshot await, deterministically
deterministically calls `StopStream(sessionId)` (e.g. `try/catch (Exception)` or a tear down the bridge actor (`Tell(StopDebugStream)` via the local actor reference, since
`finally` that stops the session when the result was not returned). Ensure a racing `onTerminatedWrapper` may already have removed the session entry), and translate
`StopStream` is idempotent and always sends `StopDebugStream` even if the session was the failure into a meaningful exception for the caller.
already removed, so the bridge actor (and its site-side subscription) is torn down
promptly rather than waiting for the orphan `ReceiveTimeout`.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. The `catch (OperationCanceledException)`-only block in
`StartStreamAsync` was replaced with `catch (Exception)`: it removes the session entry,
sends `StopDebugStream` to the bridge actor via the local reference (idempotent — the
actor may already be stopping itself), and throws a descriptive exception —
`TimeoutException` for the 30s timeout, otherwise an `InvalidOperationException` that
names the instance/site and wraps the underlying cause. Regression test
`DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException`
fails against the pre-fix code and passes after. Fixed by the commit whose message
references `Communication-001`.
### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors ### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors

View File

@@ -30,15 +30,15 @@ code-reviews/
All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High, All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High,
100 Medium, 89 Low). The tables below track what remains **open** as findings are 100 Medium, 89 Low). The tables below track what remains **open** as findings are
resolved. resolved and re-triaged.
| Severity | Open findings | | Severity | Open findings |
|----------|---------------| |----------|---------------|
| Critical | 5 | | Critical | 4 |
| High | 46 | | High | 46 |
| Medium | 100 | | Medium | 100 |
| Low | 89 | | Low | 89 |
| **Total** | **240** | | **Total** | **239** |
## Module Status ## Module Status
@@ -48,7 +48,7 @@ resolved.
| [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 | | [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 |
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 | | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 |
| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 | | [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 | 11 | | [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/3 | 10 | 11 |
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 | | [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 |
| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 | | [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 |
| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 | | [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 |
@@ -71,12 +71,11 @@ Resolved findings drop off this list but remain recorded in their module's
`findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4§5). Full detail — `findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4§5). Full detail —
description, location, recommendation — lives in the module's `findings.md`. description, location, recommendation — lives in the module's `findings.md`.
### Critical (5) ### Critical (4)
| ID | Module | Title | | ID | Module | Title |
|----|--------|-------| |----|--------|-------|
| CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement | | CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement |
| Communication-001 | [Communication](Communication/findings.md) | Snapshot timeout leaves orphaned bridge actor and site subscription |
| ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered | | ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered |
| NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) | | NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) |
| StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node | | StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node |

View File

@@ -127,21 +127,37 @@ public class DebugStreamService
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
DebugViewSnapshot snapshot;
try try
{ {
var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token); snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
}
catch (Exception ex)
{
// Any failure before the snapshot arrives — the 30s timeout, or the stream
// terminating early (site disconnect / gRPC failure, surfaced by
// onTerminatedWrapper as an InvalidOperationException) — must deterministically
// tear down the bridge actor and its site-side subscription. Use the local
// actor reference: a racing onTerminatedWrapper may already have removed the
// session, which would make StopStream a no-op. StopDebugStream is idempotent
// (the actor may already be stopping itself).
_sessions.TryRemove(sessionId, out _);
bridgeActor.Tell(new StopDebugStream());
if (ex is OperationCanceledException)
throw new TimeoutException(
$"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
throw new InvalidOperationException(
$"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.",
ex);
}
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}", _logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
sessionId, instanceUniqueName, siteIdentifier); sessionId, instanceUniqueName, siteIdentifier);
return new DebugStreamSession(sessionId, snapshot); return new DebugStreamSession(sessionId, snapshot);
} }
catch (OperationCanceledException)
{
StopStream(sessionId);
throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
}
}
/// <summary> /// <summary>
/// Stops an active debug stream session. /// Stops an active debug stream session.

View File

@@ -0,0 +1,77 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.Commons.Entities.Instances;
using ScadaLink.Commons.Entities.Sites;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Communication;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests;
/// <summary>
/// Tests for DebugStreamService session lifecycle.
/// </summary>
public class DebugStreamServiceTests : TestKit
{
[Fact]
public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException()
{
// Regression test for Communication-001. When the debug stream terminates before
// the initial snapshot arrives, StartStreamAsync used to let the raw
// InvalidOperationException from onTerminatedWrapper escape its
// OperationCanceledException-only catch — the caller saw an untranslated exception
// and the failure path did not deterministically tear the bridge actor down.
// The fix catches any failure, tells the bridge actor StopDebugStream, and throws
// a descriptive exception that names the instance and wraps the underlying cause.
var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 };
var site = new Site("Site One", "site-1")
{
Id = 3,
GrpcNodeAAddress = "http://localhost:5100",
GrpcNodeBAddress = "http://localhost:5200"
};
var instanceRepo = Substitute.For<ITemplateEngineRepository>();
instanceRepo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
var siteRepo = Substitute.For<ISiteRepository>();
siteRepo.GetSiteByIdAsync(3, Arg.Any<CancellationToken>()).Returns(site);
var services = new ServiceCollection();
services.AddScoped(_ => instanceRepo);
services.AddScoped(_ => siteRepo);
using var provider = services.BuildServiceProvider();
var commProbe = CreateTestProbe();
var commService = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
commService.SetCommunicationActor(commProbe.Ref);
using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
var service = new DebugStreamService(
commService, provider, grpcFactory, NullLogger<DebugStreamService>.Instance);
service.SetActorSystem(Sys);
// Act — start the stream; it blocks awaiting the initial snapshot.
var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { });
// The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor;
// the envelope's sender is the bridge actor itself.
commProbe.ExpectMsg<SiteEnvelope>(TimeSpan.FromSeconds(5));
var bridgeActor = commProbe.LastSender;
// Simulate the site terminating the stream before any snapshot is delivered.
bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr"));
// Assert — a descriptive exception that names the instance and wraps the cause,
// not the raw "terminated before snapshot received" InvalidOperationException.
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => startTask);
Assert.Contains("Site1.Pump01", ex.Message);
Assert.NotNull(ex.InnerException);
}
}