fix(communication): resolve Communication-009,010,011 — atomic site-cache refresh, XML doc correction, test coverage

This commit is contained in:
Joseph Doherty
2026-05-16 22:04:21 -04:00
parent c07f524ca4
commit 0b4c1563aa
4 changed files with 88 additions and 12 deletions

View File

@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 3 | | Open findings | 0 |
## Summary ## Summary
@@ -405,7 +405,7 @@ per-event reset (`Grpc_Error_Resets_RetryCount_On_Successful_Event`) was replace
|--|--| |--|--|
| Severity | Low | | Severity | Low |
| Category | Concurrency & thread safety | | Category | Concurrency & thread safety |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:53`, `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:240` | | Location | `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:53`, `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:240` |
**Description** **Description**
@@ -427,7 +427,16 @@ malformed site record cannot abort the whole refresh and leave a half-updated ca
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit pending). Root cause confirmed: `_siteClients` was a
non-`readonly` field, and `HandleSiteAddressCacheLoaded`'s add/update loop called
`ActorPath.Parse` per site with no guard — a malformed `NodeAAddress` threw mid-loop and
aborted the refresh, leaving the cache half-updated until the next 60s cycle. Fix:
`_siteClients` is now `readonly`, and the per-site `ActorPath.Parse` is wrapped in a
try/catch that logs the bad address at Warning and `continue`s to the next site, so a
single garbage row cannot starve other sites of their ClusterClient. Regression test
`CentralCommunicationActorTests.MalformedSiteAddress_DoesNotAbortRefresh_OtherSitesStillRegistered`
(bad site ordered before a good one) fails against the pre-fix code (good site never
registered) and passes after.
### Communication-010 — `DebugStreamBridgeActor` XML doc incorrectly describes it as a "Persistent actor" ### Communication-010 — `DebugStreamBridgeActor` XML doc incorrectly describes it as a "Persistent actor"
@@ -435,7 +444,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | Low | | Severity | Low |
| Category | Documentation & comments | | Category | Documentation & comments |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:10` | | Location | `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:10` |
**Description** **Description**
@@ -453,7 +462,13 @@ side..." or similar, removing the word "Persistent".
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit pending). Root cause confirmed: the class summary opened
with "Persistent actor (one per active debug session)..." but the actor derives from
`ReceiveActor`, holds no `PersistenceId`, and writes no journal/snapshot. Fix
(documentation only — no behaviour change, so no regression test): the summary was
reworded to "Long-lived (one per active debug session) actor on the central side. Debug
sessions are session-based and temporary — this actor holds no persisted state and does
not derive from an Akka.Persistence base class; its state does not survive a restart."
### Communication-011 — No test coverage for snapshot-timeout cleanup, address-cache failure, or gRPC reconnect leak ### Communication-011 — No test coverage for snapshot-timeout cleanup, address-cache failure, or gRPC reconnect leak
@@ -461,7 +476,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | Low | | Severity | Low |
| Category | Testing coverage | | Category | Testing coverage |
| Status | Open | | Status | Resolved |
| Location | `tests/ScadaLink.Communication.Tests/` (module-wide) | | Location | `tests/ScadaLink.Communication.Tests/` (module-wide) |
**Description** **Description**
@@ -487,4 +502,20 @@ failure logging and empty-cache behaviour; reusing a correlation ID across
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit pending). This is a meta-coverage finding; every gap it
enumerates is now covered by a regression test (each fails against its pre-fix code and
passes after):
- Snapshot timeout / pre-snapshot termination (Communication-001) —
`DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException`.
- gRPC reconnect not unsubscribing the old node (Communication-002) —
`DebugStreamBridgeActorTests.On_GrpcError_Unsubscribes_Old_Stream_Before_Reconnect`.
- `SiteStreamGrpcClient` subscription-map overwrite/removal race (Communication-003) —
`SiteStreamGrpcClientTests.RegisterSubscription_ReusedCorrelationId_CancelsAndDisposesPriorCts`
and `RemoveSubscription_OnlyRemovesOwnCts_NotAReplacement`.
- `LoadSiteAddressesFromDb` fault (Communication-006) —
`CentralCommunicationActorTests.LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed`.
- Malformed `NodeAAddress` aborting `HandleSiteAddressCacheLoaded` (Communication-009) —
`CentralCommunicationActorTests.MalformedSiteAddress_DoesNotAbortRefresh_OtherSitesStillRegistered`
(added with this finding's resolution).
The full module suite (`dotnet test tests/ScadaLink.Communication.Tests`) is green at
111 passing tests.

View File

@@ -50,7 +50,7 @@ public class CentralCommunicationActor : ReceiveActor
/// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings). /// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings).
/// Refreshed periodically via RefreshSiteAddresses. /// Refreshed periodically via RefreshSiteAddresses.
/// </summary> /// </summary>
private Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new(); private readonly Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new();
/// <summary> /// <summary>
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber). /// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
@@ -262,9 +262,23 @@ public class CentralCommunicationActor : ReceiveActor
// Add or update // Add or update
foreach (var (siteId, addresses) in msg.SiteContacts) foreach (var (siteId, addresses) in msg.SiteContacts)
{ {
var contactPaths = addresses // Communication-009: parse all addresses up front inside a try/catch so a
.Select(a => ActorPath.Parse($"{a}/system/receptionist")) // single malformed site row cannot abort the whole refresh loop and leave
.ToImmutableHashSet(); // the cache half-updated. A bad site is logged and skipped; others proceed.
ImmutableHashSet<ActorPath> contactPaths;
try
{
contactPaths = addresses
.Select(a => ActorPath.Parse($"{a}/system/receptionist"))
.ToImmutableHashSet();
}
catch (Exception ex)
{
_log.Warning(ex,
"Malformed contact address for site {0}; skipping this site in the refresh "
+ "(other sites are unaffected)", siteId);
continue;
}
var contactStrings = addresses.ToImmutableHashSet(); var contactStrings = addresses.ToImmutableHashSet();

View File

@@ -7,7 +7,9 @@ using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Actors; namespace ScadaLink.Communication.Actors;
/// <summary> /// <summary>
/// Persistent actor (one per active debug session) on the central side. /// Long-lived (one per active debug session) actor on the central side. Debug sessions
/// are session-based and temporary — this actor holds no persisted state and does not
/// derive from an Akka.Persistence base class; its state does not survive a restart.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor /// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC /// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events. /// server-streaming subscription via SiteStreamGrpcClient for ongoing events.

View File

@@ -222,6 +222,35 @@ public class CentralCommunicationActorTests : TestKit
}); });
} }
[Fact]
public void MalformedSiteAddress_DoesNotAbortRefresh_OtherSitesStillRegistered()
{
// Regression test for Communication-009. HandleSiteAddressCacheLoaded calls
// ActorPath.Parse for every site in a single loop. A malformed NodeAAddress
// throws inside that loop; before the fix the whole refresh aborted partway
// through, leaving the cache half-updated (some sites registered, others not).
// The fix wraps the parse in a try/catch that logs and skips the bad site so
// a single garbage row cannot starve every other site of its ClusterClient.
var goodSite = CreateSite("good-site", "akka.tcp://scadalink@host1:8082");
// A garbage address that ActorPath.Parse rejects.
var badSite = CreateSite("bad-site", "this is not a valid actor path !!!");
// Order the bad site first so a non-resilient loop aborts before reaching good-site.
var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { badSite, goodSite });
Thread.Sleep(1000);
// good-site must still be registered and routable despite bad-site failing to parse.
var cmd = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("good-site", cmd));
Assert.True(siteProbes.ContainsKey("good-site"),
"good-site should have a ClusterClient even though bad-site's address is malformed");
var msg = siteProbes["good-site"].ExpectMsg<ClusterClient.Send>();
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
}
[Fact] [Fact]
public void BothContactPoints_UsedInSingleClient() public void BothContactPoints_UsedInSingleClient()
{ {