26 KiB
Code Review — Communication
| Field | Value |
|---|---|
| Module | src/ScadaLink.Communication |
| Design doc | docs/requirements/Component-Communication.md |
| Status | Reviewed |
| Last reviewed | 2026-05-16 |
| Reviewer | claude-agent |
| Commit reviewed | 9c60592 |
| Open findings | 3 |
Summary
The Communication module is generally well-structured and matches the design doc's
two-transport model (ClusterClient for command/control, gRPC server-streaming for
real-time data). The actors keep mutable state on the actor thread, use PipeTo for
async work, and the gRPC server/client lifecycle is mostly disciplined. However the
review found several High and Medium issues clustered around two themes:
(a) gRPC subscription bookkeeping races — SiteStreamGrpcClient overwrites and
removes subscription entries by correlation ID without disposal or ownership checks,
so reconnect cycles leak CancellationTokenSourcees and can cancel the wrong stream;
and (b) missing supervision strategy on the coordinator actors, contrary to the
CLAUDE.md "Resume for coordinator actors" decision. Design-doc adherence is otherwise
good. Test coverage is broad for happy paths but has gaps around failover, cache
mutation races, and the snapshot-timeout cleanup path.
Checklist coverage
| # | Category | Examined | Notes |
|---|---|---|---|
| 1 | Correctness & logic bugs | ✓ | Snapshot-timeout orphan, reconnect not calling CleanupGrpc, subscription-map races. |
| 2 | Akka.NET conventions | ✓ | No supervision strategy on coordinators; Sender captured in async-launched closure path. |
| 3 | Concurrency & thread safety | ✓ | SiteStreamGrpcClient._subscriptions overwrite/remove race; _siteClients field reassignment unused but non-readonly. |
| 4 | Error handling & resilience | ✓ | gRPC reconnect leaks server-side relay; LoadSiteAddressesFromDb swallows DB failures silently. |
| 5 | Security | ✓ | No findings in module code. DebugStreamHub auth lives outside this module (Central UI). |
| 6 | Performance & resource management | ✓ | Orphaned subscriptions/CTS leaks; SiteStreamGrpcClientFactory.Dispose blocks on async. |
| 7 | Design-document adherence | ✓ | GrpcMaxStreamLifetime / keepalive options defined but never applied; hard-coded values used instead. |
| 8 | Code organization & conventions | ✓ | Options pattern correct; minor: public records declared in actor files. No structural issues. |
| 9 | Testing coverage | ✓ | No tests for snapshot-timeout cleanup, address-cache refresh races, or gRPC server reconnect-leak. |
| 10 | Documentation & comments | ✓ | XML comment on DebugStreamBridgeActor says "Persistent actor" — it is not an Akka.Persistence actor. |
Findings
Communication-001 — Early stream termination escapes StartStreamAsync's narrow exception handling
| Severity | Medium |
| Category | Error handling & resilience |
| Status | Resolved |
| Location | src/ScadaLink.Communication/DebugStreamService.cs:130-143 |
Re-triaged 2026-05-16: originally filed Critical, claiming an orphaned bridge actor
and a multi-minute site-side resource leak on every snapshot timeout. On verification
that impact does not occur: DebugStreamBridgeActor calls CleanupGrpc() and
Context.Stop(Self) on every path that invokes onTerminated (site disconnect, gRPC
max-retries, ReceiveTimeout), so it always self-terminates and releases its gRPC
subscription; and the pure-timeout path does reach StopStream, which also stops it.
The genuine defect described below is an error-handling gap, not a leak — severity
corrected to Medium.
Description
StartStreamAsync awaits the initial snapshot inside a try whose only handler is
catch (OperationCanceledException). When the stream terminates before the snapshot
arrives, onTerminatedWrapper completes the await via
snapshotTcs.TrySetException(new InvalidOperationException(...)). That
InvalidOperationException is not an OperationCanceledException, so it escapes the
catch entirely: the caller (Blazor debug view / SignalR hub) receives a raw,
untranslated exception, and StartStreamAsync performs no teardown of its own on that
path — it relies implicitly on the bridge actor self-terminating. Cleanup from the
service side is therefore not deterministic, and the failure surfaced to the caller is
not a meaningful, documented result.
Recommendation
In StartStreamAsync, catch any exception from the snapshot await, deterministically
tear down the bridge actor (Tell(StopDebugStream) via the local actor reference, since
a racing onTerminatedWrapper may already have removed the session entry), and translate
the failure into a meaningful exception for the caller.
Resolution
Resolved 2026-05-16. The catch (OperationCanceledException)-only block in
StartStreamAsync was replaced with catch (Exception): it removes the session entry,
sends StopDebugStream to the bridge actor via the local reference (idempotent — the
actor may already be stopping itself), and throws a descriptive exception —
TimeoutException for the 30s timeout, otherwise an InvalidOperationException that
names the instance/site and wraps the underlying cause. Regression test
DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException
fails against the pre-fix code and passes after. Fixed by the commit whose message
references Communication-001.
Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors
| Severity | High |
| Category | Error handling & resilience |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:170, src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:143 |
Description
On a gRPC stream error, HandleGrpcError increments the retry count, flips
_useNodeA, and schedules OpenGrpcStream. OpenGrpcStream cancels and disposes
_grpcCts and starts a fresh SubscribeInstance call — but it never calls
client.Unsubscribe(_correlationId) on the old node's client, and the site-side
SiteStreamGrpcServer keys active streams by correlation_id only. Because the new
subscription goes to the other node (_useNodeA flipped), the old node's
SiteStreamGrpcServer still has an active stream + StreamRelayActor +
SiteStreamManager subscription for that correlation ID. The old node only learns the
client is gone via TCP RST or keepalive — exactly the failure mode that triggered the
reconnect (network partition / silent node), so detection may take ~25s or never. Each
reconnect can therefore leave a zombie relay actor on the failed node. CleanupGrpc
(which does call Unsubscribe) is only invoked on terminal paths, not between
reconnect attempts.
Recommendation
Before reconnecting in HandleGrpcError / at the top of OpenGrpcStream, call
Unsubscribe(_correlationId) on the client for the previous endpoint (the one that
just failed) so the local CTS is cancelled and — where the channel is still alive —
the gRPC cancellation reaches the site and stops the relay actor.
Resolution
Resolved 2026-05-16 (commit <pending>). Root cause confirmed against source:
HandleGrpcError flipped _useNodeA and scheduled OpenGrpcStream without ever
unsubscribing the failed stream, leaving the old node's StreamRelayActor zombie until
TCP/keepalive timeout. Fix: HandleGrpcError now resolves the client for the
previous endpoint (before flipping _useNodeA) and calls Unsubscribe(_correlationId)
on it, so the local CTS is cancelled and gRPC cancellation reaches the still-alive site.
Regression test DebugStreamBridgeActorTests.On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect
fails against the pre-fix code and passes after.
Communication-003 — SiteStreamGrpcClient subscription map overwritten without disposal; reconnect can cancel the wrong stream
| Severity | High |
| Category | Concurrency & thread safety |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:77, src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:106 |
Description
SubscribeAsync does _subscriptions[correlationId] = cts; (line 77),
unconditionally overwriting any existing entry for that correlation ID without
cancelling or disposing the previous CancellationTokenSource. The finally block
then does _subscriptions.TryRemove(correlationId, out _) (line 106) which removes
the entry by key only, regardless of which CTS is stored. Because
DebugStreamBridgeActor reuses the same _correlationId across reconnect attempts
(and SiteStreamGrpcClientFactory returns the same SiteStreamGrpcClient for a site
even after a node flip), two SubscribeAsync calls can briefly share a correlation
ID. The first call's finally then removes the second call's CTS entry, so a later
Unsubscribe(correlationId) finds nothing and the live stream is never cancelled — an
orphan. Conversely the overwritten CTS is leaked (never disposed).
Recommendation
When inserting, cancel+dispose any prior CTS for that correlation ID. In the finally,
remove only if the stored CTS is the one this call created (use the
TryRemove(KeyValuePair) overload, mirroring what SiteStreamGrpcServer already does
with StreamEntry). Consider keying subscriptions by a per-call GUID rather than the
caller-supplied correlation ID.
Resolution
Resolved 2026-05-16 (commit <pending>). Root cause confirmed against source: the
inline _subscriptions[correlationId] = cts overwrote a prior CTS without
cancel/dispose (leak), and the finally's TryRemove(correlationId, out _) removed by
key only — a racing reconnect's live CTS could be removed by the prior call's finally,
orphaning the live stream. Fix: extracted two internal helpers used by SubscribeAsync
— RegisterSubscription cancels+disposes any existing CTS for the correlation ID before
inserting, and RemoveSubscription uses the ConcurrentDictionary.TryRemove(KeyValuePair)
overload so it removes only the CTS that call created (mirroring SiteStreamGrpcServer's
StreamEntry pattern). Regression tests
SiteStreamGrpcClientTests.RegisterSubscription_ReusedCorrelationId_CancelsAndDisposesPriorCts
and SiteStreamGrpcClientTests.RemoveSubscription_OnlyRemovesOwnCts_NotAReplacement
fail against the pre-fix logic and pass after.
Communication-004 — Coordinator actors declare no SupervisorStrategy (design requires Resume)
| Severity | Medium |
| Category | Akka.NET conventions |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:42, src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs:22 |
Description
CLAUDE.md ("Explicit supervision strategies: Resume for coordinator actors, Stop for
short-lived execution actors") requires coordinator actors to use an explicit Resume
supervision strategy. CentralCommunicationActor and SiteCommunicationActor are
long-lived coordinators (they own the per-site ClusterClient map, debug
subscriptions, in-progress deployments) but neither overrides SupervisorStrategy.
They fall back to the Akka default (OneForOneStrategy with Restart). A child fault
— e.g. a ClusterClient child of CentralCommunicationActor created by
DefaultSiteClientFactory — would Restart under the default strategy, and any
exception in the coordinator itself would restart it, wiping _siteClients,
_debugSubscriptions, and _inProgressDeployments silently. The design intent is
Resume so transient child faults do not discard coordinator state.
Recommendation
Override SupervisorStrategy on both actors to return an explicit
OneForOneStrategy with Directive.Resume (or the project's standard coordinator
strategy), matching the documented decision and other coordinator actors.
Resolution
Resolved 2026-05-16 (commit pending). Root cause confirmed: neither
CentralCommunicationActor nor SiteCommunicationActor overrode SupervisorStrategy,
so child faults fell back to the Akka default (Restart). Note that an actor's own
SupervisorStrategy governs its children — a transient child fault would Restart
the child and discard its in-memory state, contrary to the CLAUDE.md "Resume for
coordinator actors" decision. Fix: both actors now override SupervisorStrategy() to
return a OneForOneStrategy with an unbounded Decider resolving to Directive.Resume
(mirroring DataConnectionManagerActor). Regression tests
CoordinatorSupervisionTests.CentralCommunicationActor_SupervisorStrategy_IsResume and
CoordinatorSupervisionTests.SiteCommunicationActor_SupervisorStrategy_IsResume fail
against the pre-fix code (decider yields Restart) and pass after.
Communication-005 — gRPC keepalive and max-stream-lifetime options are defined but never applied
| Severity | Medium |
| Category | Design-document adherence |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:25, src/ScadaLink.Communication/CommunicationOptions.cs:36 |
Description
CommunicationOptions exposes GrpcKeepAlivePingDelay, GrpcKeepAlivePingTimeout,
GrpcMaxStreamLifetime, and GrpcMaxConcurrentStreams, and the design doc's
"gRPC Connection Keepalive" section explicitly states these are configurable. However
SiteStreamGrpcClient's constructor hard-codes KeepAlivePingDelay = TimeSpan.FromSeconds(15) and KeepAlivePingTimeout = TimeSpan.FromSeconds(10)
instead of reading the options. GrpcMaxStreamLifetime (the documented "Session
timeout — 4 hours" third layer of dead-client detection) is not referenced anywhere
— SiteStreamGrpcServer.SubscribeInstance creates a linked CTS from the call
cancellation token only, with no CancelAfter. The 4-hour zombie-stream safety net
described in the design doc does not exist in code. GrpcMaxConcurrentStreams is also
not wired to the server (SiteStreamGrpcServer takes a maxConcurrentStreams
constructor parameter defaulting to 100, but nothing binds the option to it).
Recommendation
Flow CommunicationOptions into SiteStreamGrpcClient and SiteStreamGrpcServer
(via the factory / DI). Apply GrpcKeepAlivePingDelay / GrpcKeepAlivePingTimeout to
the SocketsHttpHandler, bind GrpcMaxConcurrentStreams to the server's limit, and
implement the GrpcMaxStreamLifetime session timeout with CancelAfter on the
server-side stream CTS — or, if the 4-hour cap is intentionally dropped, remove the
option and update the design doc.
Resolution
Resolved 2026-05-16 (commit pending). Root cause confirmed: SiteStreamGrpcClient
hard-coded the keepalive values, GrpcMaxStreamLifetime was referenced nowhere, and
GrpcMaxConcurrentStreams was never bound to the server. Fix (scoped to
src/ScadaLink.Communication): SiteStreamGrpcClient gained a constructor taking
CommunicationOptions and now applies GrpcKeepAlivePingDelay/GrpcKeepAlivePingTimeout
to its SocketsHttpHandler; SiteStreamGrpcClientFactory gained an
IOptions<CommunicationOptions> DI constructor and flows the options into every client
it creates; SiteStreamGrpcServer gained an IOptions<CommunicationOptions> DI
constructor that binds GrpcMaxConcurrentStreams and implements the documented 4-hour
session timeout via CancellationTokenSource.CancelAfter(GrpcMaxStreamLifetime) on the
per-stream CTS. The Host's existing AddSingleton<SiteStreamGrpcServer>() registration
resolves the new DI constructor via greedy resolution — no Host change required.
Regression tests GrpcOptionsWiringTests.SiteStreamGrpcClient_AppliesKeepAliveFromOptions,
GrpcOptionsWiringTests.SiteStreamGrpcClientFactory_FlowsOptionsToCreatedClients, and
GrpcOptionsWiringTests.SiteStreamGrpcServer_BindsMaxConcurrentStreamsAndLifetimeFromOptions
exercise the wiring (they require the new members to even compile).
Communication-006 — Site address load failures are silently swallowed, leaving a stale cache
| Severity | Medium |
| Category | Error handling & resilience |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:204 |
Description
LoadSiteAddressesFromDb runs the repository query inside Task.Run(...).PipeTo(self).
If GetAllSitesAsync throws (database unavailable, transient connection error), the
faulted task is piped to Self as a Status.Failure. CentralCommunicationActor has
no Receive<Status.Failure> handler, so the failure becomes an unhandled message
(logged at debug, not surfaced) and the periodic refresh silently fails. If the
first startup load fails the actor runs with an empty _siteClients map — every
SiteEnvelope is dropped (line 187) and every Ask times out with no indication of the
root cause.
Recommendation
Add a Receive<Status.Failure> handler that logs the load failure at Warning/Error
level so operators can distinguish "site has no addresses configured" from "database
is down". Optionally surface a health metric for repeated load failures.
Resolution
Resolved 2026-05-16 (commit pending). Root cause confirmed: a faulted
LoadSiteAddressesFromDb task is piped to Self as a Status.Failure, but the actor
had no handler for it — the failure became an unhandled message (debug-level only) and
the periodic refresh failed silently. Fix: added a Receive<Status.Failure> handler
that logs the load failure at Warning with the underlying exception as the cause, so
operators can distinguish a missing-addresses configuration from a database outage.
Regression test
CentralCommunicationActorTests.LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed
(repository query throws) asserts the Warning is emitted — it produces no warning
against the pre-fix code and passes after.
Communication-007 — SiteStreamGrpcClientFactory.Dispose blocks on async work (sync-over-async)
| Severity | Medium |
| Category | Performance & resource management |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:53 |
Description
Dispose() calls DisposeAsync().AsTask().GetAwaiter().GetResult(). This is the
classic sync-over-async pattern: it blocks the calling thread until all per-site
SiteStreamGrpcClient.DisposeAsync calls complete. If Dispose is invoked from a
context with a single-threaded synchronization context or from DI container shutdown
on a constrained thread pool, this can deadlock or stall host shutdown. The class
already implements IAsyncDisposable.
Recommendation
Prefer registering and disposing the factory through IAsyncDisposable only (modern
.NET DI honours it for singletons). If a synchronous Dispose must remain, dispose
the underlying GrpcChannels directly (synchronous) rather than blocking on the async
path, or document why blocking is safe here.
Resolution
Resolved 2026-05-16 (commit pending). Root cause confirmed: Dispose() called
DisposeAsync().AsTask().GetAwaiter().GetResult(), the classic sync-over-async pattern.
Fix: SiteStreamGrpcClient now also implements IDisposable with a synchronous
Dispose() that releases its CancellationTokenSources and underlying GrpcChannel
directly (all of that teardown is inherently synchronous); SiteStreamGrpcClientFactory.Dispose()
now disposes each cached client via that synchronous path with no blocking on the async
path. A CreateClient seam was extracted so the test can substitute a tracking client
while still exercising the factory's real caching/disposal machinery. Regression test
SiteStreamGrpcClientFactoryDisposeTests.Dispose_DisposesClientsSynchronously_NotViaAsyncPath
fails against the pre-fix code (clients disposed via DisposeAsync) and passes after;
Dispose_DoesNotDeadlock_UnderSingleThreadedSynchronizationContext guards the stall path.
Communication-008 — Reconnect retry-count reset can mask a flapping stream indefinitely
| Severity | Medium |
| Category | Correctness & logic bugs |
| Status | Resolved |
| Location | src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:71, src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:174 |
Description
_retryCount is reset to 0 every time a single AttributeValueChanged or
AlarmStateChanged event is received (lines 72, 77). Combined with MaxRetries = 3,
a stream that connects, delivers exactly one event, then fails — repeatedly — will
reconnect forever. The design doc states "max 3 retries, terminate the session if all
retries fail"; the current logic only terminates after 3 consecutive failures with
zero intervening events, so a flapping site never trips the limit and the debug
session (and its site-side relay) lives on indefinitely. The ReceiveTimeout orphan
net is also reset by every received message, so it does not bound this case either.
Recommendation
Either reset _retryCount only after the stream has been stably connected for some
minimum duration (e.g. a timer armed on stream open, cancelled on the next error), or
keep a separate cumulative reconnect counter / time window that bounds total
reconnects regardless of intervening events.
Resolution
Resolved 2026-05-16 (commit pending). Root cause confirmed: _retryCount was reset to
0 on every received AttributeValueChanged/AlarmStateChanged, so a stream that
connected, delivered one event, then failed — repeatedly — never tripped MaxRetries.
Fix (recommendation option a): the per-event reset was removed; instead OpenGrpcStream
arms a single StabilityWindow timer (60s default, internal-settable for tests), and
only when it fires (GrpcStreamStable) — i.e. the stream stayed up long enough to be
considered recovered — is _retryCount reset. HandleGrpcError cancels that timer, so
a stream that fails before the window elapses does not recover its retry budget. A
flapping stream therefore terminates after MaxRetries regardless of intervening
events. Regression test
DebugStreamBridgeActorTests.FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries
fails against the pre-fix code (actor never terminates) and passes after;
RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow verifies the budget
is recovered after a stable interval. The pre-existing test that codified the buggy
per-event reset (Grpc_Error_Resets_RetryCount_On_Successful_Event) was replaced.
Communication-009 — _siteClients field is mutable and reassignable; cache update is not atomic on failure
| Severity | Low |
| Category | Concurrency & thread safety |
| Status | Open |
| Location | src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:53, src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:240 |
Description
_siteClients is a non-readonly Dictionary field. It is only mutated on the actor
thread (correct), but the field is needlessly reassignable, and
HandleSiteAddressCacheLoaded mutates it in place across several loops. If
ActorPath.Parse throws on a malformed address mid-loop (e.g. a site row with a
garbage NodeAAddress), the method aborts partway through, having already stopped
some ClusterClients and added others — leaving the cache partially updated with no
recovery until the next 60s refresh. The other actor mutable collections
(_debugSubscriptions, _inProgressDeployments) are correctly readonly.
Recommendation
Mark _siteClients readonly. Validate/parse all addresses up front (or wrap
ActorPath.Parse in a try/catch that logs and skips the bad site) so a single
malformed site record cannot abort the whole refresh and leave a half-updated cache.
Resolution
Unresolved.
Communication-010 — DebugStreamBridgeActor XML doc incorrectly describes it as a "Persistent actor"
| Severity | Low |
| Category | Documentation & comments |
| Status | Open |
| Location | src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:10 |
Description
The class summary opens with "Persistent actor (one per active debug session)...".
The actor derives from ReceiveActor, not a persistent actor base class, holds no
PersistenceId, and writes no journal/snapshot. "Persistent" is misleading — debug
sessions are explicitly "session-based and temporary" per the design doc. A reader
could assume state survives restart, which it does not.
Recommendation
Reword the summary to "Long-lived (per active debug session) actor on the central side..." or similar, removing the word "Persistent".
Resolution
Unresolved.
Communication-011 — No test coverage for snapshot-timeout cleanup, address-cache failure, or gRPC reconnect leak
| Severity | Low |
| Category | Testing coverage |
| Status | Open |
| Location | tests/ScadaLink.Communication.Tests/ (module-wide) |
Description
The test suite covers happy-path routing, handler-not-registered failures, heartbeat bumping, cache refresh, and gRPC bridge reconnect/retry. However several critical paths identified in this review have no coverage:
- The
DebugStreamService.StartStreamAsyncsnapshot-timeout path (Communication-001) — no test verifies bridge actor / site subscription teardown on timeout, nor theonTerminated-before-snapshot race that throws a non-OperationCanceledException. CentralCommunicationActorbehaviour whenLoadSiteAddressesFromDbfaults (Communication-006) —RefreshSiteAddresses_UpdatesCacheonly exercises success.SiteStreamGrpcClientsubscription-map overwrite/removal race (Communication-003) and gRPC reconnect not unsubscribing the old node (Communication-002).- A malformed
NodeAAddressabortingHandleSiteAddressCacheLoaded(Communication-009).
Recommendation
Add tests for: snapshot timeout / pre-snapshot termination cleanup; address-load
failure logging and empty-cache behaviour; reusing a correlation ID across
SubscribeAsync calls; and a malformed site address during cache refresh.
Resolution
Unresolved.