fix(communication): resolve Communication-012..015 — endpoint-aware gRPC client cache, address-change recreation, correlation-id validation, node-flip tests
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
| Last reviewed | 2026-05-17 |
|
||||
| Reviewer | claude-agent |
|
||||
| Commit reviewed | `39d737e` |
|
||||
| Open findings | 4 |
|
||||
| Open findings | 0 |
|
||||
|
||||
## Summary
|
||||
|
||||
@@ -543,7 +543,7 @@ The full module suite (`dotnet test tests/ScadaLink.Communication.Tests`) is gre
|
||||
|--|--|
|
||||
| Severity | High |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:39`, `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:166` |
|
||||
|
||||
**Description**
|
||||
@@ -582,7 +582,17 @@ targets the other endpoint.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-17 (commit pending). Root cause confirmed against source:
|
||||
`GetOrCreate` was `_clients.GetOrAdd(siteIdentifier, …)` — keyed by site identifier
|
||||
only, so the `grpcEndpoint` argument was honoured solely on first creation and the
|
||||
NodeA→NodeB flip reconnected to the dead endpoint forever. Fix:
|
||||
`SiteStreamGrpcClient` now exposes its bound `Endpoint`, and `GetOrCreate` compares
|
||||
the cached client's endpoint against the requested one — on a mismatch it atomically
|
||||
installs (via `ConcurrentDictionary.AddOrUpdate`) a fresh client for the new endpoint
|
||||
and disposes the stale one, so a node flip actually moves to the surviving node.
|
||||
Regression tests `SiteStreamGrpcClientFactoryTests.GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint`
|
||||
and `GetOrCreate_SameEndpoint_DoesNotDisposeOrRecreate` fail against the pre-fix
|
||||
factory and pass after.
|
||||
|
||||
### Communication-013 — Site gRPC address changes are never applied; `RemoveSiteAsync` has no production caller
|
||||
|
||||
@@ -590,7 +600,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Category | Design-document adherence |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:58` |
|
||||
|
||||
**Description**
|
||||
@@ -619,7 +629,19 @@ the on-the-fly address-change requirement is intentionally dropped, remove
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-17 (commit pending). The address-*change* staleness — the primary
|
||||
impact ("never have working debug streaming until the central node is restarted") —
|
||||
is fixed in-module by the Communication-012 change: `GetOrCreate` is now
|
||||
endpoint-change-aware, so the next time `DebugStreamBridgeActor` requests a stream
|
||||
with a corrected `GrpcNodeAAddress`/`GrpcNodeBAddress` the stale cached client is
|
||||
disposed and replaced — no central restart needed and no external wiring required.
|
||||
`RemoveSiteAsync` is retained as the disposal path for full site *removal* (a deleted
|
||||
site record) and its doc comment now states that role explicitly; wiring a
|
||||
delete-site callback belongs to the site-management flow in another module and is out
|
||||
of this module's scope. Regression test
|
||||
`SiteStreamGrpcClientFactoryTests.GetOrCreate_EndpointChanged_DisposesPriorClient`
|
||||
fails against the pre-fix factory (stale client never disposed/replaced) and passes
|
||||
after.
|
||||
|
||||
### Communication-014 — Untrusted gRPC `correlation_id` flows directly into an Akka actor name
|
||||
|
||||
@@ -627,7 +649,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Low |
|
||||
| Category | Security |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs:124` |
|
||||
|
||||
**Description**
|
||||
@@ -652,7 +674,17 @@ actor state / dictionary key.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-17 (commit pending). Root cause confirmed: `SubscribeInstance` fed
|
||||
the off-the-wire `request.CorrelationId` straight into the `stream-relay-…` actor
|
||||
name, so an id with `/`, whitespace, or other disallowed characters made `ActorOf`
|
||||
throw `InvalidActorNameException` as an unhandled RPC fault. Fix: `SubscribeInstance`
|
||||
now validates `CorrelationId` on entry — rejecting null/empty or any value failing
|
||||
`ActorPath.IsValidPathElement` with `StatusCode.InvalidArgument` before any actor or
|
||||
subscription state is created. Regression test
|
||||
`SiteStreamGrpcServerTests.RejectsCorrelationIdThatIsNotActorNameSafe` (theory:
|
||||
`/`-bearing, whitespace, empty, `$`-prefixed ids) fails against the pre-fix server
|
||||
and passes after; `AcceptsActorNameSafeCorrelationId` confirms a normal GUID is still
|
||||
accepted.
|
||||
|
||||
### Communication-015 — No test exercises the real gRPC client factory across a node flip
|
||||
|
||||
@@ -660,7 +692,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Low |
|
||||
| Category | Testing coverage |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs:401`, `tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs` |
|
||||
|
||||
**Description**
|
||||
@@ -683,4 +715,14 @@ test's mock factory track the endpoint per call so node-flip coverage is meaning
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-17 (commit pending). `SiteStreamGrpcClientFactoryTests` gained
|
||||
`GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint` and
|
||||
`GetOrCreate_EndpointChanged_DisposesPriorClient`, which drive the *real*
|
||||
`SiteStreamGrpcClientFactory` across a node flip and assert the second call yields a
|
||||
client bound to the new endpoint with the stale one disposed — both fail against the
|
||||
pre-fix factory and pass after (the Communication-012 fix). `DebugStreamBridgeActorTests`
|
||||
gained `On_GrpcError_Reconnects_To_Other_Node_Endpoint`, which uses a new
|
||||
`EndpointTrackingGrpcClientFactory` test double that hands out a distinct mock client
|
||||
per endpoint (instead of one fixed mock regardless of endpoint), so the bridge actor's
|
||||
NodeA→NodeB reconnect is now verified to actually target the NodeB endpoint rather
|
||||
than being masked by an endpoint-agnostic mock.
|
||||
|
||||
@@ -20,6 +20,14 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable
|
||||
private readonly ILogger? _logger;
|
||||
private readonly ConcurrentDictionary<string, CancellationTokenSource> _subscriptions = new();
|
||||
|
||||
/// <summary>
|
||||
/// The gRPC endpoint (site node address) this client is bound to. The
|
||||
/// <see cref="SiteStreamGrpcClientFactory"/> compares this against the requested
|
||||
/// endpoint so a NodeA→NodeB failover flip (or a site address edit) is honoured
|
||||
/// rather than served stale from cache.
|
||||
/// </summary>
|
||||
public virtual string Endpoint { get; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// The HTTP/2 keepalive ping delay actually applied to this client's channel.
|
||||
/// Exposed for tests verifying that <see cref="CommunicationOptions"/> is honoured.
|
||||
@@ -44,6 +52,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable
|
||||
/// </summary>
|
||||
public SiteStreamGrpcClient(string endpoint, ILogger logger, CommunicationOptions options)
|
||||
{
|
||||
Endpoint = endpoint;
|
||||
KeepAlivePingDelay = options.GrpcKeepAlivePingDelay;
|
||||
KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout;
|
||||
_channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
|
||||
@@ -67,6 +76,16 @@ public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Protected constructor for unit testing — records the endpoint without
|
||||
/// opening a real gRPC channel, so endpoint-aware factory behaviour can be
|
||||
/// exercised by test doubles.
|
||||
/// </summary>
|
||||
protected SiteStreamGrpcClient(string endpoint)
|
||||
{
|
||||
Endpoint = endpoint;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a test-only instance that has no gRPC channel. Used to test
|
||||
/// Unsubscribe and Dispose behavior without needing a real endpoint.
|
||||
|
||||
@@ -32,13 +32,40 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns an existing client for the site or creates a new one. The new
|
||||
/// client is created via <see cref="CreateClient"/> and tracked so the
|
||||
/// factory's <see cref="Dispose"/> / <see cref="DisposeAsync"/> release it.
|
||||
/// Returns the cached client for the site, or creates a new one. If a client is
|
||||
/// already cached but bound to a *different* <paramref name="grpcEndpoint"/> — the
|
||||
/// NodeA→NodeB failover flip, or a site whose gRPC address was edited — the stale
|
||||
/// client is disposed and replaced with one bound to the requested endpoint.
|
||||
/// Communication-012/013: keying purely by site identifier and ignoring the
|
||||
/// endpoint on a cache hit defeated debug-stream node failover and meant a
|
||||
/// corrected gRPC address never took effect without a central restart.
|
||||
/// </summary>
|
||||
public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
||||
{
|
||||
return _clients.GetOrAdd(siteIdentifier, _ => CreateClient(grpcEndpoint));
|
||||
// Fast path: a client is cached and already bound to the requested endpoint.
|
||||
if (_clients.TryGetValue(siteIdentifier, out var existing) &&
|
||||
string.Equals(existing.Endpoint, grpcEndpoint, StringComparison.Ordinal))
|
||||
{
|
||||
return existing;
|
||||
}
|
||||
|
||||
// Either no client is cached, or the cached one is bound to a different
|
||||
// endpoint. AddOrUpdate atomically installs a client for the requested
|
||||
// endpoint; the prior (stale) client, if any, is disposed afterwards.
|
||||
SiteStreamGrpcClient? stale = null;
|
||||
var client = _clients.AddOrUpdate(
|
||||
siteIdentifier,
|
||||
_ => CreateClient(grpcEndpoint),
|
||||
(_, current) =>
|
||||
{
|
||||
if (string.Equals(current.Endpoint, grpcEndpoint, StringComparison.Ordinal))
|
||||
return current;
|
||||
stale = current;
|
||||
return CreateClient(grpcEndpoint);
|
||||
});
|
||||
|
||||
stale?.Dispose();
|
||||
return client;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -53,7 +80,11 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes and disposes the client for the given site.
|
||||
/// Removes and disposes the client for the given site. Site *address changes* are
|
||||
/// now handled transparently by <see cref="GetOrCreate"/> (it disposes and recreates
|
||||
/// a client whose endpoint no longer matches). This method remains the disposal
|
||||
/// path for full site *removal* — call it when a site record is deleted so its
|
||||
/// cached gRPC client does not linger for the life of the process.
|
||||
/// </summary>
|
||||
public async Task RemoveSiteAsync(string siteIdentifier)
|
||||
{
|
||||
|
||||
@@ -95,6 +95,18 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
if (!_ready)
|
||||
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
|
||||
|
||||
// Communication-014: correlation_id arrives off the wire on a public gRPC
|
||||
// endpoint and is used (below) to compose an Akka actor name. Akka actor names
|
||||
// have a restricted character set — a id containing '/', whitespace, or other
|
||||
// disallowed characters would make ActorOf throw InvalidActorNameException,
|
||||
// escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front.
|
||||
if (string.IsNullOrEmpty(request.CorrelationId) ||
|
||||
!ActorPath.IsValidPathElement(request.CorrelationId))
|
||||
{
|
||||
throw new RpcException(new GrpcStatus(
|
||||
StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier"));
|
||||
}
|
||||
|
||||
// Duplicate prevention -- cancel existing stream for this correlationId
|
||||
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
|
||||
{
|
||||
|
||||
@@ -308,6 +308,46 @@ public class DebugStreamBridgeActorTests : TestKit
|
||||
Assert.True(ctx.TerminatedFlag[0]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void On_GrpcError_Reconnects_To_Other_Node_Endpoint()
|
||||
{
|
||||
// Communication-015 regression: drive the bridge actor through a node flip
|
||||
// with an endpoint-aware factory (one distinct mock client per endpoint).
|
||||
// The first subscribe targets NodeA; after a gRPC error the bridge must
|
||||
// reconnect via a client bound to the *NodeB* endpoint.
|
||||
var commProbe = CreateTestProbe();
|
||||
var factory = new EndpointTrackingGrpcClientFactory();
|
||||
var events = new List<object>();
|
||||
var terminated = new[] { false };
|
||||
|
||||
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
||||
SiteId, InstanceName, "corr-1", commProbe.Ref,
|
||||
(Action<object>)(evt => { lock (events) { events.Add(evt); } }),
|
||||
(Action)(() => terminated[0] = true),
|
||||
factory, GrpcNodeA, GrpcNodeB);
|
||||
|
||||
var actor = Sys.ActorOf(props);
|
||||
commProbe.ExpectMsg<SiteEnvelope>();
|
||||
|
||||
actor.Tell(new DebugViewSnapshot(
|
||||
InstanceName,
|
||||
new List<AttributeValueChanged>(),
|
||||
new List<AlarmStateChanged>(),
|
||||
DateTimeOffset.UtcNow));
|
||||
|
||||
// First subscribe goes to NodeA.
|
||||
AwaitCondition(() => factory.ClientFor(GrpcNodeA).SubscribeCalls.Count == 1,
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
// gRPC error → bridge flips to NodeB.
|
||||
factory.ClientFor(GrpcNodeA).SubscribeCalls[0].OnError(new Exception("NodeA down"));
|
||||
|
||||
// The reconnect must reach a client bound to the NodeB endpoint.
|
||||
AwaitCondition(() => factory.ClientFor(GrpcNodeB).SubscribeCalls.Count == 1,
|
||||
TimeSpan.FromSeconds(5));
|
||||
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
|
||||
{
|
||||
@@ -415,3 +455,24 @@ internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
|
||||
return _mockClient;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Endpoint-aware mock factory: hands out a distinct <see cref="MockSiteStreamGrpcClient"/>
|
||||
/// per endpoint, mirroring the real factory's corrected NodeA→NodeB failover behaviour
|
||||
/// so node-flip coverage is meaningful (Communication-015).
|
||||
/// </summary>
|
||||
internal class EndpointTrackingGrpcClientFactory : SiteStreamGrpcClientFactory
|
||||
{
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, MockSiteStreamGrpcClient> _byEndpoint = new();
|
||||
|
||||
public EndpointTrackingGrpcClientFactory()
|
||||
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
||||
{
|
||||
}
|
||||
|
||||
public MockSiteStreamGrpcClient ClientFor(string endpoint) =>
|
||||
_byEndpoint.GetOrAdd(endpoint, _ => new MockSiteStreamGrpcClient());
|
||||
|
||||
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
||||
=> ClientFor(grpcEndpoint);
|
||||
}
|
||||
|
||||
@@ -63,4 +63,70 @@ public class SiteStreamGrpcClientFactoryTests
|
||||
// After dispose, creating new clients should work (new instances)
|
||||
// This tests that Dispose doesn't throw
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetOrCreate_EndpointChanged_ReturnsClientBoundToNewEndpoint()
|
||||
{
|
||||
// Communication-012 regression: when the same site is requested with a
|
||||
// *different* endpoint (the NodeA→NodeB failover flip), the factory must
|
||||
// hand back a client bound to the new endpoint, not the stale cached one.
|
||||
using var factory = new TrackingEndpointFactory();
|
||||
|
||||
var nodeA = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
var nodeB = factory.GetOrCreate("site-a", "http://localhost:5200");
|
||||
|
||||
Assert.NotSame(nodeA, nodeB);
|
||||
Assert.Equal("http://localhost:5100", nodeA.Endpoint);
|
||||
Assert.Equal("http://localhost:5200", nodeB.Endpoint);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetOrCreate_EndpointChanged_DisposesPriorClient()
|
||||
{
|
||||
// Communication-013 regression: a later edit to a site's gRPC address must
|
||||
// invalidate (and dispose) the stale cached client, so the corrected
|
||||
// endpoint takes effect without a central restart.
|
||||
using var factory = new TrackingEndpointFactory();
|
||||
|
||||
var first = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
var second = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5200");
|
||||
|
||||
Assert.NotSame(first, second);
|
||||
Assert.True(first.Disposed, "stale client for the old endpoint should be disposed");
|
||||
Assert.False(second.Disposed, "fresh client for the new endpoint should still be live");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetOrCreate_SameEndpoint_DoesNotDisposeOrRecreate()
|
||||
{
|
||||
// Endpoint unchanged → the cached client is reused untouched.
|
||||
using var factory = new TrackingEndpointFactory();
|
||||
|
||||
var first = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
var second = (TrackingEndpointClient)factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
|
||||
Assert.Same(first, second);
|
||||
Assert.False(first.Disposed);
|
||||
}
|
||||
|
||||
/// <summary>Test client that records its endpoint and disposal (no real channel).</summary>
|
||||
private sealed class TrackingEndpointClient : SiteStreamGrpcClient
|
||||
{
|
||||
public TrackingEndpointClient(string endpoint) : base(endpoint) { }
|
||||
public bool Disposed { get; private set; }
|
||||
public override void Dispose() => Disposed = true;
|
||||
public override ValueTask DisposeAsync()
|
||||
{
|
||||
Disposed = true;
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Factory that hands out endpoint-tracking clients.</summary>
|
||||
private sealed class TrackingEndpointFactory : SiteStreamGrpcClientFactory
|
||||
{
|
||||
public TrackingEndpointFactory() : base(NullLoggerFactory.Instance) { }
|
||||
protected override SiteStreamGrpcClient CreateClient(string grpcEndpoint)
|
||||
=> new TrackingEndpointClient(grpcEndpoint);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,6 +207,50 @@ public class SiteStreamGrpcServerTests : TestKit
|
||||
await streamTask;
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("corr/with/slash")]
|
||||
[InlineData("corr with space")]
|
||||
[InlineData("")]
|
||||
[InlineData("$weird")]
|
||||
public async Task RejectsCorrelationIdThatIsNotActorNameSafe(string badCorrelationId)
|
||||
{
|
||||
// Communication-014 regression: a public gRPC SubscribeInstance must not feed
|
||||
// an untrusted correlation_id straight into an Akka actor name. An unsafe id
|
||||
// must be rejected cleanly with InvalidArgument rather than escaping as an
|
||||
// unhandled InvalidActorNameException.
|
||||
var server = CreateServer();
|
||||
server.SetReady(Sys);
|
||||
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
var context = CreateMockContext();
|
||||
|
||||
var ex = await Assert.ThrowsAsync<RpcException>(
|
||||
() => server.SubscribeInstance(MakeRequest(badCorrelationId), writer, context));
|
||||
|
||||
Assert.Equal(StatusCode.InvalidArgument, ex.StatusCode);
|
||||
Assert.Equal(0, server.ActiveStreamCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcceptsActorNameSafeCorrelationId()
|
||||
{
|
||||
// A normal GUID-style correlation id (what central always supplies) is accepted.
|
||||
var server = CreateServer();
|
||||
server.SetReady(Sys);
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest(Guid.NewGuid().ToString()), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetReady_AllowsStreamCreation()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user