diff --git a/code-reviews/Communication/findings.md b/code-reviews/Communication/findings.md index f3faf39..4d333f3 100644 --- a/code-reviews/Communication/findings.md +++ b/code-reviews/Communication/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-17 | | Reviewer | claude-agent | | Commit reviewed | `39d737e` | -| Open findings | 4 | +| Open findings | 0 | ## Summary @@ -543,7 +543,7 @@ The full module suite (`dotnet test tests/ScadaLink.Communication.Tests`) is gre |--|--| | Severity | High | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:39`, `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:166` | **Description** @@ -582,7 +582,17 @@ targets the other endpoint. **Resolution** -_Unresolved._ +Resolved 2026-05-17 (commit pending). Root cause confirmed against source: +`GetOrCreate` was `_clients.GetOrAdd(siteIdentifier, …)` — keyed by site identifier +only, so the `grpcEndpoint` argument was honoured solely on first creation and the +NodeA→NodeB flip reconnected to the dead endpoint forever. Fix: +`SiteStreamGrpcClient` now exposes its bound `Endpoint`, and `GetOrCreate` compares +the cached client's endpoint against the requested one — on a mismatch it atomically +installs (via `ConcurrentDictionary.AddOrUpdate`) a fresh client for the new endpoint +and disposes the stale one, so a node flip actually moves to the surviving node. +Regression tests `SiteStreamGrpcClientFactoryTests.GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint` +and `GetOrCreate_SameEndpoint_DoesNotDisposeOrRecreate` fail against the pre-fix +factory and pass after. ### Communication-013 — Site gRPC address changes are never applied; `RemoveSiteAsync` has no production caller @@ -590,7 +600,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:58` | **Description** @@ -619,7 +629,19 @@ the on-the-fly address-change requirement is intentionally dropped, remove **Resolution** -_Unresolved._ +Resolved 2026-05-17 (commit pending). The address-*change* staleness — the primary +impact ("never have working debug streaming until the central node is restarted") — +is fixed in-module by the Communication-012 change: `GetOrCreate` is now +endpoint-change-aware, so the next time `DebugStreamBridgeActor` requests a stream +with a corrected `GrpcNodeAAddress`/`GrpcNodeBAddress` the stale cached client is +disposed and replaced — no central restart needed and no external wiring required. +`RemoveSiteAsync` is retained as the disposal path for full site *removal* (a deleted +site record) and its doc comment now states that role explicitly; wiring a +delete-site callback belongs to the site-management flow in another module and is out +of this module's scope. Regression test +`SiteStreamGrpcClientFactoryTests.GetOrCreate_EndpointChanged_DisposesPriorClient` +fails against the pre-fix factory (stale client never disposed/replaced) and passes +after. ### Communication-014 — Untrusted gRPC `correlation_id` flows directly into an Akka actor name @@ -627,7 +649,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Security | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs:124` | **Description** @@ -652,7 +674,17 @@ actor state / dictionary key. **Resolution** -_Unresolved._ +Resolved 2026-05-17 (commit pending). Root cause confirmed: `SubscribeInstance` fed +the off-the-wire `request.CorrelationId` straight into the `stream-relay-…` actor +name, so an id with `/`, whitespace, or other disallowed characters made `ActorOf` +throw `InvalidActorNameException` as an unhandled RPC fault. Fix: `SubscribeInstance` +now validates `CorrelationId` on entry — rejecting null/empty or any value failing +`ActorPath.IsValidPathElement` with `StatusCode.InvalidArgument` before any actor or +subscription state is created. Regression test +`SiteStreamGrpcServerTests.RejectsCorrelationIdThatIsNotActorNameSafe` (theory: +`/`-bearing, whitespace, empty, `$`-prefixed ids) fails against the pre-fix server +and passes after; `AcceptsActorNameSafeCorrelationId` confirms a normal GUID is still +accepted. ### Communication-015 — No test exercises the real gRPC client factory across a node flip @@ -660,7 +692,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Testing coverage | -| Status | Open | +| Status | Resolved | | Location | `tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs:401`, `tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs` | **Description** @@ -683,4 +715,14 @@ test's mock factory track the endpoint per call so node-flip coverage is meaning **Resolution** -_Unresolved._ +Resolved 2026-05-17 (commit pending). `SiteStreamGrpcClientFactoryTests` gained +`GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint` and +`GetOrCreate_EndpointChanged_DisposesPriorClient`, which drive the *real* +`SiteStreamGrpcClientFactory` across a node flip and assert the second call yields a +client bound to the new endpoint with the stale one disposed — both fail against the +pre-fix factory and pass after (the Communication-012 fix). `DebugStreamBridgeActorTests` +gained `On_GrpcError_Reconnects_To_Other_Node_Endpoint`, which uses a new +`EndpointTrackingGrpcClientFactory` test double that hands out a distinct mock client +per endpoint (instead of one fixed mock regardless of endpoint), so the bridge actor's +NodeA→NodeB reconnect is now verified to actually target the NodeB endpoint rather +than being masked by an endpoint-agnostic mock. diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs index d3298de..08f6230 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs @@ -20,6 +20,14 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable private readonly ILogger? _logger; private readonly ConcurrentDictionary _subscriptions = new(); + /// + /// The gRPC endpoint (site node address) this client is bound to. The + /// compares this against the requested + /// endpoint so a NodeA→NodeB failover flip (or a site address edit) is honoured + /// rather than served stale from cache. + /// + public virtual string Endpoint { get; } = string.Empty; + /// /// The HTTP/2 keepalive ping delay actually applied to this client's channel. /// Exposed for tests verifying that is honoured. @@ -44,6 +52,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable /// public SiteStreamGrpcClient(string endpoint, ILogger logger, CommunicationOptions options) { + Endpoint = endpoint; KeepAlivePingDelay = options.GrpcKeepAlivePingDelay; KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout; _channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions @@ -67,6 +76,16 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable { } + /// + /// Protected constructor for unit testing — records the endpoint without + /// opening a real gRPC channel, so endpoint-aware factory behaviour can be + /// exercised by test doubles. + /// + protected SiteStreamGrpcClient(string endpoint) + { + Endpoint = endpoint; + } + /// /// Creates a test-only instance that has no gRPC channel. Used to test /// Unsubscribe and Dispose behavior without needing a real endpoint. diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs index ed2e89d..2de8277 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs @@ -32,13 +32,40 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable } /// - /// Returns an existing client for the site or creates a new one. The new - /// client is created via and tracked so the - /// factory's / release it. + /// Returns the cached client for the site, or creates a new one. If a client is + /// already cached but bound to a *different* — the + /// NodeA→NodeB failover flip, or a site whose gRPC address was edited — the stale + /// client is disposed and replaced with one bound to the requested endpoint. + /// Communication-012/013: keying purely by site identifier and ignoring the + /// endpoint on a cache hit defeated debug-stream node failover and meant a + /// corrected gRPC address never took effect without a central restart. /// public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { - return _clients.GetOrAdd(siteIdentifier, _ => CreateClient(grpcEndpoint)); + // Fast path: a client is cached and already bound to the requested endpoint. + if (_clients.TryGetValue(siteIdentifier, out var existing) && + string.Equals(existing.Endpoint, grpcEndpoint, StringComparison.Ordinal)) + { + return existing; + } + + // Either no client is cached, or the cached one is bound to a different + // endpoint. AddOrUpdate atomically installs a client for the requested + // endpoint; the prior (stale) client, if any, is disposed afterwards. + SiteStreamGrpcClient? stale = null; + var client = _clients.AddOrUpdate( + siteIdentifier, + _ => CreateClient(grpcEndpoint), + (_, current) => + { + if (string.Equals(current.Endpoint, grpcEndpoint, StringComparison.Ordinal)) + return current; + stale = current; + return CreateClient(grpcEndpoint); + }); + + stale?.Dispose(); + return client; } /// @@ -53,7 +80,11 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable } /// - /// Removes and disposes the client for the given site. + /// Removes and disposes the client for the given site. Site *address changes* are + /// now handled transparently by (it disposes and recreates + /// a client whose endpoint no longer matches). This method remains the disposal + /// path for full site *removal* — call it when a site record is deleted so its + /// cached gRPC client does not linger for the life of the process. /// public async Task RemoveSiteAsync(string siteIdentifier) { diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs index 9a62e46..02d0daa 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs @@ -95,6 +95,18 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase if (!_ready) throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready")); + // Communication-014: correlation_id arrives off the wire on a public gRPC + // endpoint and is used (below) to compose an Akka actor name. Akka actor names + // have a restricted character set — a id containing '/', whitespace, or other + // disallowed characters would make ActorOf throw InvalidActorNameException, + // escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front. + if (string.IsNullOrEmpty(request.CorrelationId) || + !ActorPath.IsValidPathElement(request.CorrelationId)) + { + throw new RpcException(new GrpcStatus( + StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier")); + } + // Duplicate prevention -- cancel existing stream for this correlationId if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry)) { diff --git a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs index 3618b4e..b5fb06f 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -308,6 +308,46 @@ public class DebugStreamBridgeActorTests : TestKit Assert.True(ctx.TerminatedFlag[0]); } + [Fact] + public void On_GrpcError_Reconnects_To_Other_Node_Endpoint() + { + // Communication-015 regression: drive the bridge actor through a node flip + // with an endpoint-aware factory (one distinct mock client per endpoint). + // The first subscribe targets NodeA; after a gRPC error the bridge must + // reconnect via a client bound to the *NodeB* endpoint. + var commProbe = CreateTestProbe(); + var factory = new EndpointTrackingGrpcClientFactory(); + var events = new List(); + var terminated = new[] { false }; + + var props = Props.Create(typeof(DebugStreamBridgeActor), + SiteId, InstanceName, "corr-1", commProbe.Ref, + (Action)(evt => { lock (events) { events.Add(evt); } }), + (Action)(() => terminated[0] = true), + factory, GrpcNodeA, GrpcNodeB); + + var actor = Sys.ActorOf(props); + commProbe.ExpectMsg(); + + actor.Tell(new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow)); + + // First subscribe goes to NodeA. + AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1, + TimeSpan.FromSeconds(3)); + + // gRPC error → bridge flips to NodeB. + factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down")); + + // The reconnect must reach a client bound to the NodeB endpoint. + AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1, + TimeSpan.FromSeconds(5)); + Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId); + } + [Fact] public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow() { @@ -415,3 +455,24 @@ internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory return _mockClient; } } + +/// +/// Endpoint-aware mock factory: hands out a distinct +/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour +/// so node-flip coverage is meaningful (Communication-015). +/// +internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory +{ + private readonly System.Collections.Concurrent.ConcurrentDictionary _byEndpoint = new(); + + public EndpointTrackingGrpcClientFactory() + : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) + { + } + + public MockSiteStreamGrpcClient ClientFor(string endpoint) => + _byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient()); + + public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) + => ClientFor(grpcEndpoint); +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs index fe5bb54..4642935 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs @@ -63,4 +63,70 @@ public class SiteStreamGrpcClientFactoryTests // After dispose, creating new clients should work (new instances) // This tests that Dispose doesn't throw } + + [Fact] + public void GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint() + { + // Communication-012 regression: when the same site is requested with a + // *different* endpoint (the NodeA→NodeB failover flip), the factory must + // hand back a client bound to the new endpoint, not the stale cached one. + using var factory = new TrackingEndpointFactory(); + + var nodeA = factory.GetOrCreate("site-a", "http://localhost:5100"); + var nodeB = factory.GetOrCreate("site-a", "http://localhost:5200"); + + Assert.NotSame(nodeA, nodeB); + Assert.Equal("http://localhost:5100", nodeA.Endpoint); + Assert.Equal("http://localhost:5200", nodeB.Endpoint); + } + + [Fact] + public void GetOrCreate_EndpointChanged_DisposesPriorClient() + { + // Communication-013 regression: a later edit to a site's gRPC address must + // invalidate (and dispose) the stale cached client, so the corrected + // endpoint takes effect without a central restart. + using var factory = new TrackingEndpointFactory(); + + var first = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100"); + var second = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5200"); + + Assert.NotSame(first, second); + Assert.True(first.Disposed, "stale client for the old endpoint should be disposed"); + Assert.False(second.Disposed, "fresh client for the new endpoint should still be live"); + } + + [Fact] + public void GetOrCreate_SameEndpoint_DoesNotDisposeOrRecreate() + { + // Endpoint unchanged → the cached client is reused untouched. + using var factory = new TrackingEndpointFactory(); + + var first = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100"); + var second = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100"); + + Assert.Same(first, second); + Assert.False(first.Disposed); + } + + /// Test client that records its endpoint and disposal (no real channel). + private sealed class TrackingEndpointClient : SiteStreamGrpcClient + { + public TrackingEndpointClient(string endpoint) : base(endpoint) { } + public bool Disposed { get; private set; } + public override void Dispose() => Disposed = true; + public override ValueTask DisposeAsync() + { + Disposed = true; + return ValueTask.CompletedTask; + } + } + + /// Factory that hands out endpoint-tracking clients. + private sealed class TrackingEndpointFactory : SiteStreamGrpcClientFactory + { + public TrackingEndpointFactory() : base(NullLoggerFactory.Instance) { } + protected override SiteStreamGrpcClient CreateClient(string grpcEndpoint) + => new TrackingEndpointClient(grpcEndpoint); + } } diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs index 3207031..438bca6 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs @@ -207,6 +207,50 @@ public class SiteStreamGrpcServerTests : TestKit await streamTask; } + [Theory] + [InlineData("corr/with/slash")] + [InlineData("corr with space")] + [InlineData("")] + [InlineData("$weird")] + public async Task RejectsCorrelationIdThatIsNotActorNameSafe(string badCorrelationId) + { + // Communication-014 regression: a public gRPC SubscribeInstance must not feed + // an untrusted correlation_id straight into an Akka actor name. An unsafe id + // must be rejected cleanly with InvalidArgument rather than escaping as an + // unhandled InvalidActorNameException. + var server = CreateServer(); + server.SetReady(Sys); + + var writer = Substitute.For>(); + var context = CreateMockContext(); + + var ex = await Assert.ThrowsAsync( + () => server.SubscribeInstance(MakeRequest(badCorrelationId), writer, context)); + + Assert.Equal(StatusCode.InvalidArgument, ex.StatusCode); + Assert.Equal(0, server.ActiveStreamCount); + } + + [Fact] + public async Task AcceptsActorNameSafeCorrelationId() + { + // A normal GUID-style correlation id (what central always supplies) is accepted. + var server = CreateServer(); + server.SetReady(Sys); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest(Guid.NewGuid().ToString()), writer, context)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + cts.Cancel(); + await streamTask; + } + [Fact] public void SetReady_AllowsStreamCreation() {