From 31a6995d247f4c16df87ed1107e009b8c144a86e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 16 May 2026 20:58:03 -0400 Subject: [PATCH] =?UTF-8?q?fix(communication):=20resolve=20Communication-0?= =?UTF-8?q?04..008=20=E2=80=94=20Resume=20supervision,=20gRPC=20option=20w?= =?UTF-8?q?iring,=20address-load=20logging,=20sync=20dispose,=20flap=20det?= =?UTF-8?q?ection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code-reviews/Communication/findings.md | 80 +++++++++-- .../Actors/CentralCommunicationActor.cs | 28 ++++ .../Actors/DebugStreamBridgeActor.cs | 48 +++++-- .../Actors/SiteCommunicationActor.cs | 18 +++ .../Grpc/SiteStreamGrpcClient.cs | 57 +++++++- .../Grpc/SiteStreamGrpcClientFactory.cs | 49 +++++-- .../Grpc/SiteStreamGrpcServer.cs | 39 ++++++ .../CentralCommunicationActorTests.cs | 25 ++++ .../CoordinatorSupervisionTests.cs | 83 +++++++++++ .../Grpc/DebugStreamBridgeActorTests.cs | 84 +++++++++--- .../Grpc/GrpcOptionsWiringTests.cs | 67 +++++++++ ...SiteStreamGrpcClientFactoryDisposeTests.cs | 129 ++++++++++++++++++ 12 files changed, 656 insertions(+), 51 deletions(-) create mode 100644 tests/ScadaLink.Communication.Tests/CoordinatorSupervisionTests.cs create mode 100644 tests/ScadaLink.Communication.Tests/Grpc/GrpcOptionsWiringTests.cs create mode 100644 tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryDisposeTests.cs diff --git a/code-reviews/Communication/findings.md b/code-reviews/Communication/findings.md index 84896fb..3df4d73 100644 --- a/code-reviews/Communication/findings.md +++ b/code-reviews/Communication/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 8 | +| Open findings | 3 | ## Summary @@ -187,7 +187,7 @@ fail against the pre-fix logic and pass after. |--|--| | Severity | Medium | | Category | Akka.NET conventions | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:42`, `src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs:22` | **Description** @@ -212,7 +212,17 @@ strategy), matching the documented decision and other coordinator actors. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Root cause confirmed: neither +`CentralCommunicationActor` nor `SiteCommunicationActor` overrode `SupervisorStrategy`, +so child faults fell back to the Akka default (`Restart`). Note that an actor's own +`SupervisorStrategy` governs its *children* — a transient child fault would `Restart` +the child and discard its in-memory state, contrary to the CLAUDE.md "Resume for +coordinator actors" decision. Fix: both actors now override `SupervisorStrategy()` to +return a `OneForOneStrategy` with an unbounded `Decider` resolving to `Directive.Resume` +(mirroring `DataConnectionManagerActor`). Regression tests +`CoordinatorSupervisionTests.CentralCommunicationActor_SupervisorStrategy_IsResume` and +`CoordinatorSupervisionTests.SiteCommunicationActor_SupervisorStrategy_IsResume` fail +against the pre-fix code (decider yields `Restart`) and pass after. ### Communication-005 — gRPC keepalive and max-stream-lifetime options are defined but never applied @@ -220,7 +230,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs:25`, `src/ScadaLink.Communication/CommunicationOptions.cs:36` | **Description** @@ -249,7 +259,22 @@ option and update the design doc. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Root cause confirmed: `SiteStreamGrpcClient` +hard-coded the keepalive values, `GrpcMaxStreamLifetime` was referenced nowhere, and +`GrpcMaxConcurrentStreams` was never bound to the server. Fix (scoped to +`src/ScadaLink.Communication`): `SiteStreamGrpcClient` gained a constructor taking +`CommunicationOptions` and now applies `GrpcKeepAlivePingDelay`/`GrpcKeepAlivePingTimeout` +to its `SocketsHttpHandler`; `SiteStreamGrpcClientFactory` gained an +`IOptions` DI constructor and flows the options into every client +it creates; `SiteStreamGrpcServer` gained an `IOptions` DI +constructor that binds `GrpcMaxConcurrentStreams` and implements the documented 4-hour +session timeout via `CancellationTokenSource.CancelAfter(GrpcMaxStreamLifetime)` on the +per-stream CTS. The Host's existing `AddSingleton()` registration +resolves the new DI constructor via greedy resolution — no Host change required. +Regression tests `GrpcOptionsWiringTests.SiteStreamGrpcClient_AppliesKeepAliveFromOptions`, +`GrpcOptionsWiringTests.SiteStreamGrpcClientFactory_FlowsOptionsToCreatedClients`, and +`GrpcOptionsWiringTests.SiteStreamGrpcServer_BindsMaxConcurrentStreamsAndLifetimeFromOptions` +exercise the wiring (they require the new members to even compile). ### Communication-006 — Site address load failures are silently swallowed, leaving a stale cache @@ -257,7 +282,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs:204` | **Description** @@ -279,7 +304,16 @@ is down". Optionally surface a health metric for repeated load failures. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Root cause confirmed: a faulted +`LoadSiteAddressesFromDb` task is piped to `Self` as a `Status.Failure`, but the actor +had no handler for it — the failure became an unhandled message (debug-level only) and +the periodic refresh failed silently. Fix: added a `Receive` handler +that logs the load failure at `Warning` with the underlying exception as the cause, so +operators can distinguish a missing-addresses configuration from a database outage. +Regression test +`CentralCommunicationActorTests.LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed` +(repository query throws) asserts the Warning is emitted — it produces no warning +against the pre-fix code and passes after. ### Communication-007 — `SiteStreamGrpcClientFactory.Dispose` blocks on async work (sync-over-async) @@ -287,7 +321,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Performance & resource management | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs:53` | **Description** @@ -308,7 +342,17 @@ path, or document why blocking is safe here. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Root cause confirmed: `Dispose()` called +`DisposeAsync().AsTask().GetAwaiter().GetResult()`, the classic sync-over-async pattern. +Fix: `SiteStreamGrpcClient` now also implements `IDisposable` with a synchronous +`Dispose()` that releases its CancellationTokenSources and underlying `GrpcChannel` +directly (all of that teardown is inherently synchronous); `SiteStreamGrpcClientFactory.Dispose()` +now disposes each cached client via that synchronous path with no blocking on the async +path. A `CreateClient` seam was extracted so the test can substitute a tracking client +while still exercising the factory's real caching/disposal machinery. Regression test +`SiteStreamGrpcClientFactoryDisposeTests.Dispose_DisposesClientsSynchronously_NotViaAsyncPath` +fails against the pre-fix code (clients disposed via `DisposeAsync`) and passes after; +`Dispose_DoesNotDeadlock_UnderSingleThreadedSynchronizationContext` guards the stall path. ### Communication-008 — Reconnect retry-count reset can mask a flapping stream indefinitely @@ -316,7 +360,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:71`, `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs:174` | **Description** @@ -339,7 +383,21 @@ reconnects regardless of intervening events. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Root cause confirmed: `_retryCount` was reset to +0 on every received `AttributeValueChanged`/`AlarmStateChanged`, so a stream that +connected, delivered one event, then failed — repeatedly — never tripped `MaxRetries`. +Fix (recommendation option a): the per-event reset was removed; instead `OpenGrpcStream` +arms a single `StabilityWindow` timer (60s default, internal-settable for tests), and +only when it fires (`GrpcStreamStable`) — i.e. the stream stayed up long enough to be +considered recovered — is `_retryCount` reset. `HandleGrpcError` cancels that timer, so +a stream that fails before the window elapses does not recover its retry budget. A +flapping stream therefore terminates after `MaxRetries` regardless of intervening +events. Regression test +`DebugStreamBridgeActorTests.FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries` +fails against the pre-fix code (actor never terminates) and passes after; +`RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow` verifies the budget +is recovered after a stable interval. The pre-existing test that codified the buggy +per-event reset (`Grpc_Error_Resets_RetryCount_On_Successful_Event`) was replaced. ### Communication-009 — `_siteClients` field is mutable and reassignable; cache update is not atomic on failure diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 7d027df..5c8e0e7 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -84,6 +84,15 @@ public class CentralCommunicationActor : ReceiveActor // Periodic refresh trigger Receive(_ => LoadSiteAddressesFromDb()); + // Communication-006: a faulted LoadSiteAddressesFromDb task is piped here as a + // Status.Failure. Without this handler the failure was an unhandled message + // (debug-level only) and the refresh failed silently — operators could not + // distinguish "no sites configured" from "database is down". Log at Warning. + Receive(failure => + _log.Warning(failure.Cause, + "Failed to load site addresses from the database; the site ClusterClient " + + "cache was not refreshed and may be stale or empty")); + // Health monitoring: heartbeats and health reports from sites Receive(HandleHeartbeat); Receive(HandleSiteHealthReport); @@ -296,6 +305,25 @@ public class CentralCommunicationActor : ReceiveActor } } + /// + /// Coordinator supervision strategy (CLAUDE.md: "Resume for coordinator actors"). + /// CentralCommunicationActor is a long-lived coordinator that owns the per-site + /// ClusterClient map; a transient fault in a child (e.g. a ClusterClient child) + /// must Resume so the child's connection state is preserved rather than wiped by + /// a Restart. + /// + protected override SupervisorStrategy SupervisorStrategy() + { + return new OneForOneStrategy( + maxNrOfRetries: -1, + withinTimeRange: Timeout.InfiniteTimeSpan, + decider: Decider.From(ex => + { + _log.Warning(ex, "Child actor of CentralCommunicationActor faulted, resuming (state preserved)"); + return Directive.Resume; + })); + } + protected override void PreStart() { _log.Info("CentralCommunicationActor started"); diff --git a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs index 7dc6cfb..e2d7d9a 100644 --- a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs +++ b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs @@ -28,7 +28,19 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers private const int MaxRetries = 3; private const string ReconnectTimerKey = "grpc-reconnect"; + private const string StabilityTimerKey = "grpc-stability"; internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// How long a freshly-opened gRPC stream must stay up before its retry budget + /// is considered "recovered" and is reset to 0. + /// Communication-008: the retry count must NOT be reset by individual events — + /// a stream that connects, delivers one event, then fails repeatedly would + /// otherwise reconnect forever and never trip . Resetting + /// only after a stable interval bounds a flapping stream. + /// + internal static TimeSpan StabilityWindow { get; set; } = TimeSpan.FromSeconds(60); + private int _retryCount; private bool _useNodeA = true; private bool _stopped; @@ -66,16 +78,21 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers OpenGrpcStream(); }); - // Domain events arriving via Self.Tell from gRPC callback - Receive(changed => - { - _retryCount = 0; // Successful event resets retry count - _onEvent(changed); - }); - Receive(changed => + // Domain events arriving via Self.Tell from gRPC callback. + // Communication-008: receiving an event must NOT reset _retryCount — a + // flapping stream that delivers a single event between failures would + // otherwise never trip MaxRetries. The retry budget is recovered only by + // GrpcStreamStable (a stream that has stayed up for StabilityWindow). + Receive(changed => _onEvent(changed)); + Receive(changed => _onEvent(changed)); + + // Stream has been stably connected for StabilityWindow — recover the + // retry budget so a future transient fault gets a fresh set of retries. + Receive(_ => { + if (_stopped) return; _retryCount = 0; - _onEvent(changed); + _log.Debug("gRPC stream for {0} stable, retry count reset", _instanceUniqueName); }); // gRPC stream error — attempt reconnection @@ -151,6 +168,10 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers _grpcCts?.Dispose(); _grpcCts = new CancellationTokenSource(); + // Arm the stability timer: if the stream stays up for StabilityWindow the + // retry budget is recovered (Communication-008). Cancelled by HandleGrpcError. + Timers.StartSingleTimer(StabilityTimerKey, new GrpcStreamStable(), StabilityWindow); + var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint); var self = Self; var ct = _grpcCts.Token; @@ -171,6 +192,10 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers { if (_stopped) return; + // The stream failed before reaching the stability window — its retry + // budget is NOT recovered (Communication-008). + Timers.Cancel(StabilityTimerKey); + _retryCount++; if (_retryCount > MaxRetries) @@ -239,3 +264,10 @@ internal record GrpcStreamError(Exception Exception); /// Internal message to trigger gRPC stream reconnection. /// internal record ReconnectGrpcStream; + +/// +/// Internal message indicating the current gRPC stream has been connected long +/// enough () to be considered +/// stable, so the reconnect retry budget can be recovered. +/// +internal record GrpcStreamStable; diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 0038e8f..6ce3610 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -177,6 +177,24 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers } + /// + /// Coordinator supervision strategy (CLAUDE.md: "Resume for coordinator actors"). + /// SiteCommunicationActor is a long-lived coordinator routing all message + /// patterns to local handlers; a transient fault in a child must Resume so the + /// child's in-memory state is preserved rather than discarded by a Restart. + /// + protected override SupervisorStrategy SupervisorStrategy() + { + return new OneForOneStrategy( + maxNrOfRetries: -1, + withinTimeRange: Timeout.InfiniteTimeSpan, + decider: Decider.From(ex => + { + _log.Warning(ex, "Child actor of SiteCommunicationActor faulted, resuming (state preserved)"); + return Directive.Resume; + })); + } + protected override void PreStart() { _log.Info("SiteCommunicationActor started for site {0}", _siteId); diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs index da211f3..d3298de 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs @@ -13,21 +13,45 @@ namespace ScadaLink.Communication.Grpc; /// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this /// to open server-streaming calls for individual instances. /// -public class SiteStreamGrpcClient : IAsyncDisposable +public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable { private readonly GrpcChannel? _channel; private readonly SiteStreamService.SiteStreamServiceClient? _client; private readonly ILogger? _logger; private readonly ConcurrentDictionary _subscriptions = new(); + /// + /// The HTTP/2 keepalive ping delay actually applied to this client's channel. + /// Exposed for tests verifying that is honoured. + /// + internal TimeSpan KeepAlivePingDelay { get; } + + /// + /// The HTTP/2 keepalive ping timeout actually applied to this client's channel. + /// Exposed for tests verifying that is honoured. + /// + internal TimeSpan KeepAlivePingTimeout { get; } + public SiteStreamGrpcClient(string endpoint, ILogger logger) + : this(endpoint, logger, new CommunicationOptions()) { + } + + /// + /// Creates a client whose HTTP/2 keepalive is taken from + /// rather than hard-coded, satisfying the design doc's "gRPC Connection Keepalive" + /// section which states these values are configurable. + /// + public SiteStreamGrpcClient(string endpoint, ILogger logger, CommunicationOptions options) + { + KeepAlivePingDelay = options.GrpcKeepAlivePingDelay; + KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout; _channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions { HttpHandler = new SocketsHttpHandler { - KeepAlivePingDelay = TimeSpan.FromSeconds(15), - KeepAlivePingTimeout = TimeSpan.FromSeconds(10), + KeepAlivePingDelay = options.GrpcKeepAlivePingDelay, + KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout, KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always } }); @@ -205,7 +229,13 @@ public class SiteStreamGrpcClient : IAsyncDisposable _ => AlarmLevel.None }; - public async ValueTask DisposeAsync() + /// + /// Releases all subscription CancellationTokenSources and the underlying + /// gRPC channel. All teardown here is synchronous (CTS disposal and + /// ), so a synchronous + /// can release everything without sync-over-async blocking. + /// + private void ReleaseResources() { foreach (var cts in _subscriptions.Values) { @@ -214,9 +244,22 @@ public class SiteStreamGrpcClient : IAsyncDisposable } _subscriptions.Clear(); - if (_channel is not null) - _channel.Dispose(); + _channel?.Dispose(); + } - await ValueTask.CompletedTask; + public virtual ValueTask DisposeAsync() + { + ReleaseResources(); + return ValueTask.CompletedTask; + } + + /// + /// Synchronous disposal. All resources held by this client are released + /// synchronously, so callers (e.g. ) + /// need not block on the async disposal path. + /// + public virtual void Dispose() + { + ReleaseResources(); } } diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs index 261e9d2..ed2e89d 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace ScadaLink.Communication.Grpc; @@ -12,22 +13,43 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable { private readonly ConcurrentDictionary _clients = new(); private readonly ILoggerFactory _loggerFactory; + private readonly CommunicationOptions _options; public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory) + : this(loggerFactory, Options.Create(new CommunicationOptions())) { - _loggerFactory = loggerFactory; } /// - /// Returns an existing client for the site or creates a new one. + /// DI constructor — flows into every created + /// so the configured gRPC keepalive settings + /// are applied rather than hard-coded defaults. + /// + public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory, IOptions options) + { + _loggerFactory = loggerFactory; + _options = options.Value; + } + + /// + /// Returns an existing client for the site or creates a new one. The new + /// client is created via and tracked so the + /// factory's / release it. /// public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { - return _clients.GetOrAdd(siteIdentifier, _ => - { - var logger = _loggerFactory.CreateLogger(); - return new SiteStreamGrpcClient(grpcEndpoint, logger); - }); + return _clients.GetOrAdd(siteIdentifier, _ => CreateClient(grpcEndpoint)); + } + + /// + /// Creates a single . Overridable so tests + /// can substitute a tracking client while still exercising the factory's real + /// caching and disposal machinery. + /// + protected virtual SiteStreamGrpcClient CreateClient(string grpcEndpoint) + { + var logger = _loggerFactory.CreateLogger(); + return new SiteStreamGrpcClient(grpcEndpoint, logger, _options); } /// @@ -50,8 +72,19 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable _clients.Clear(); } + /// + /// Synchronous disposal. Communication-007: this used to block on + /// DisposeAsync().AsTask().GetAwaiter().GetResult() (sync-over-async, + /// a stall/deadlock risk during host shutdown). Each + /// releases all of its resources + /// synchronously, so we dispose them directly with no async path. + /// public void Dispose() { - DisposeAsync().AsTask().GetAwaiter().GetResult(); + foreach (var client in _clients.Values) + { + client.Dispose(); + } + _clients.Clear(); } } diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs index b3c35fe..a17481c 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs @@ -3,6 +3,7 @@ using System.Threading.Channels; using Akka.Actor; using Grpc.Core; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using GrpcStatus = Grpc.Core.Status; namespace ScadaLink.Communication.Grpc; @@ -19,6 +20,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase private readonly ILogger _logger; private readonly ConcurrentDictionary _activeStreams = new(); private readonly int _maxConcurrentStreams; + private readonly TimeSpan _maxStreamLifetime; private volatile bool _ready; private long _actorCounter; @@ -26,10 +28,36 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase ISiteStreamSubscriber streamSubscriber, ILogger logger, int maxConcurrentStreams = 100) + : this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4)) + { + } + + /// + /// DI constructor — binds + /// and so the documented + /// concurrency limit and the 4-hour zombie-stream session timeout are honoured + /// rather than hard-coded. + /// + public SiteStreamGrpcServer( + ISiteStreamSubscriber streamSubscriber, + ILogger logger, + IOptions options) + : this(streamSubscriber, logger, + options.Value.GrpcMaxConcurrentStreams, + options.Value.GrpcMaxStreamLifetime) + { + } + + private SiteStreamGrpcServer( + ISiteStreamSubscriber streamSubscriber, + ILogger logger, + int maxConcurrentStreams, + TimeSpan maxStreamLifetime) { _streamSubscriber = streamSubscriber; _logger = logger; _maxConcurrentStreams = maxConcurrentStreams; + _maxStreamLifetime = maxStreamLifetime; } /// @@ -49,6 +77,12 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase /// public int ActiveStreamCount => _activeStreams.Count; + /// Effective max concurrent stream limit. Exposed for tests. + internal int MaxConcurrentStreams => _maxConcurrentStreams; + + /// Effective per-stream session lifetime. Exposed for tests. + internal TimeSpan MaxStreamLifetime => _maxStreamLifetime; + public override async Task SubscribeInstance( InstanceStreamRequest request, IServerStreamWriter responseStream, @@ -69,6 +103,11 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached")); using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken); + // Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer + // of dead-client detection) — forces a long-lived zombie stream to terminate + // even if keepalive PINGs never detect the loss. + if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan) + streamCts.CancelAfter(_maxStreamLifetime); var entry = new StreamEntry(streamCts); _activeStreams[request.CorrelationId] = entry; diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs index e0301f0..383895f 100644 --- a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs @@ -197,6 +197,31 @@ public class CentralCommunicationActorTests : TestKit Assert.Equal("dep2", ((DeployInstanceCommand)msg2.Message).DeploymentId); } + [Fact] + public void LoadSiteAddressesFailure_IsLoggedNotSilentlySwallowed() + { + // Regression test for Communication-006. When the repository query throws, + // PipeTo delivers a Status.Failure to the actor. Without a Receive + // handler the failure becomes an unhandled message (debug-level only) and the + // periodic refresh fails silently — operators cannot tell "no addresses + // configured" from "database is down". The fix logs the failure at Warning. + var mockRepo = Substitute.For(); + mockRepo.GetAllSitesAsync(Arg.Any()) + .Returns>>(_ => throw new InvalidOperationException("database is down")); + + var services = new ServiceCollection(); + services.AddScoped(_ => mockRepo); + var sp = services.BuildServiceProvider(); + + var mockFactory = Substitute.For(); + + // The fix logs a Warning carrying the InvalidOperationException as the cause. + EventFilter.Warning(contains: "Failed to load site addresses from the database").ExpectOne(() => + { + Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); + }); + } + [Fact] public void BothContactPoints_UsedInSingleClient() { diff --git a/tests/ScadaLink.Communication.Tests/CoordinatorSupervisionTests.cs b/tests/ScadaLink.Communication.Tests/CoordinatorSupervisionTests.cs new file mode 100644 index 0000000..3c74071 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/CoordinatorSupervisionTests.cs @@ -0,0 +1,83 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using NSubstitute; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Communication.Actors; + +namespace ScadaLink.Communication.Tests; + +/// +/// Regression tests for Communication-004 — coordinator actors must declare an +/// explicit Resume supervision strategy per the CLAUDE.md decision +/// ("Resume for coordinator actors"). A child fault under the default +/// (Restart) strategy would wipe a child's in-memory state; the long-lived +/// coordinators own per-site ClusterClients and must not silently discard +/// their children on a transient fault. +/// +public class CoordinatorSupervisionTests : TestKit +{ + /// + /// Test-only subclass that exposes the protected + /// so the configured directive can be asserted directly. + /// + private sealed class CentralCommunicationActorProbe : CentralCommunicationActor + { + public CentralCommunicationActorProbe(IServiceProvider sp, ISiteClientFactory factory) + : base(sp, factory) { } + + public SupervisorStrategy GetSupervisorStrategy() => SupervisorStrategy(); + } + + /// + /// Test-only subclass that exposes the protected . + /// + private sealed class SiteCommunicationActorProbe : SiteCommunicationActor + { + public SiteCommunicationActorProbe(string siteId, CommunicationOptions options, IActorRef dm) + : base(siteId, options, dm) { } + + public SupervisorStrategy GetSupervisorStrategy() => SupervisorStrategy(); + } + + private static IServiceProvider EmptyServiceProvider() + { + var mockRepo = Substitute.For(); + mockRepo.GetAllSitesAsync(Arg.Any()) + .Returns(new List()); + var services = new ServiceCollection(); + services.AddScoped(_ => mockRepo); + return services.BuildServiceProvider(); + } + + [Fact] + public void CentralCommunicationActor_SupervisorStrategy_IsResume() + { + var sp = EmptyServiceProvider(); + var factory = Substitute.For(); + + var actorRef = new Akka.TestKit.TestActorRef( + Sys, Props.Create(() => new CentralCommunicationActorProbe(sp, factory))); + + var strategy = actorRef.UnderlyingActor.GetSupervisorStrategy(); + + var oneForOne = Assert.IsType(strategy); + var directive = oneForOne.Decider.Decide(new InvalidOperationException("transient child fault")); + Assert.Equal(Directive.Resume, directive); + } + + [Fact] + public void SiteCommunicationActor_SupervisorStrategy_IsResume() + { + var dmProbe = CreateTestProbe(); + + var actorRef = new Akka.TestKit.TestActorRef( + Sys, Props.Create(() => new SiteCommunicationActorProbe("site1", new CommunicationOptions(), dmProbe.Ref))); + + var strategy = actorRef.UnderlyingActor.GetSupervisorStrategy(); + + var oneForOne = Assert.IsType(strategy); + var directive = oneForOne.Decider.Decide(new InvalidOperationException("transient child fault")); + Assert.Equal(Directive.Resume, directive); + } +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs index d4d3c51..3618b4e 100644 --- a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -22,6 +22,9 @@ public class DebugStreamBridgeActorTests : TestKit { // Use a very short reconnect delay for testing DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100); + // Long stability window so streams are never considered "stable" mid-test + // unless a test deliberately waits it out. + DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); } private record TestContext( @@ -264,8 +267,13 @@ public class DebugStreamBridgeActorTests : TestKit } [Fact] - public void Grpc_Error_Resets_RetryCount_On_Successful_Event() + public void FlappingStream_DeliveringEventsBetweenFailures_StillTerminatesAfterMaxRetries() { + // Communication-008 regression: a stream that connects, delivers an event, + // then fails — repeatedly — must still trip MaxRetries. The retry count is + // NO LONGER reset by a received event (only by the stability window). The + // previous behaviour reset _retryCount on every event, so a flapping site + // reconnected forever and the debug session lived on indefinitely. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); @@ -275,30 +283,72 @@ public class DebugStreamBridgeActorTests : TestKit new List(), DateTimeOffset.UtcNow); + Watch(ctx.BridgeActor); ctx.BridgeActor.Tell(snapshot); AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); - // First error → retry 1 - ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); - AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); - - // Simulate successful event (resets retry count) var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); - ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange); - AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, - TimeSpan.FromSeconds(3)); - // Now another 3 errors should be tolerated (retry count was reset) - ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); - AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); + // Flap: deliver one event then fail, three times. Each event would, under + // the old buggy logic, reset the retry budget and prevent termination. + for (var i = 0; i < 3; i++) + { + var call = ctx.MockGrpcClient.SubscribeCalls[i]; + call.OnEvent(attrChange); + call.OnError(new Exception($"Flap {i + 1}")); + var expected = i + 2; + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); + } - ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); - AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); + // Fourth error (after the 3 retries) must exceed MaxRetries and terminate. + ctx.MockGrpcClient.SubscribeCalls[3].OnEvent(attrChange); + ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Flap 4")); - ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); - AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5)); + ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); + Assert.True(ctx.TerminatedFlag[0]); + } - // Still alive — 3 retries from the second failure point succeeded + [Fact] + public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow() + { + // Communication-008: after a stream has been connected for the stability + // window, the retry budget is recovered — a later transient fault then gets + // a fresh set of retries rather than being counted against the old budget. + DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromMilliseconds(300); + try + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + Watch(ctx.BridgeActor); + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Two failures — but each new stream stays up long enough (the mock + // stream only completes on cancel) for the stability window to elapse + // and reset the retry budget before the next failure. + for (var i = 0; i < 5; i++) + { + Thread.Sleep(450); // exceed the 300ms stability window + ctx.MockGrpcClient.SubscribeCalls[i].OnError(new Exception($"Error {i + 1}")); + var expected = i + 2; + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == expected, TimeSpan.FromSeconds(5)); + } + + // Five well-spaced failures did NOT terminate the actor because each + // reconnect recovered its retry budget after the stability window. + Assert.False(ctx.TerminatedFlag[0]); + } + finally + { + DebugStreamBridgeActor.StabilityWindow = TimeSpan.FromSeconds(30); + } } } diff --git a/tests/ScadaLink.Communication.Tests/Grpc/GrpcOptionsWiringTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/GrpcOptionsWiringTests.cs new file mode 100644 index 0000000..ce76421 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/GrpcOptionsWiringTests.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.Communication; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Regression tests for Communication-005 — the gRPC keepalive and +/// max-stream-lifetime / max-concurrent-stream options defined on +/// must actually be applied to the +/// gRPC client and server rather than hard-coded. +/// +public class GrpcOptionsWiringTests +{ + [Fact] + public void SiteStreamGrpcClient_AppliesKeepAliveFromOptions() + { + var options = new CommunicationOptions + { + GrpcKeepAlivePingDelay = TimeSpan.FromSeconds(42), + GrpcKeepAlivePingTimeout = TimeSpan.FromSeconds(7) + }; + + var client = new SiteStreamGrpcClient( + "http://localhost:9999", NullLogger.Instance, options); + + Assert.Equal(TimeSpan.FromSeconds(42), client.KeepAlivePingDelay); + Assert.Equal(TimeSpan.FromSeconds(7), client.KeepAlivePingTimeout); + } + + [Fact] + public void SiteStreamGrpcClientFactory_FlowsOptionsToCreatedClients() + { + var options = new CommunicationOptions + { + GrpcKeepAlivePingDelay = TimeSpan.FromSeconds(33), + GrpcKeepAlivePingTimeout = TimeSpan.FromSeconds(11) + }; + + using var factory = new SiteStreamGrpcClientFactory( + NullLoggerFactory.Instance, Options.Create(options)); + + var client = factory.GetOrCreate("site1", "http://localhost:9999"); + + Assert.Equal(TimeSpan.FromSeconds(33), client.KeepAlivePingDelay); + Assert.Equal(TimeSpan.FromSeconds(11), client.KeepAlivePingTimeout); + } + + [Fact] + public void SiteStreamGrpcServer_BindsMaxConcurrentStreamsAndLifetimeFromOptions() + { + var options = new CommunicationOptions + { + GrpcMaxConcurrentStreams = 250, + GrpcMaxStreamLifetime = TimeSpan.FromHours(2) + }; + + var subscriber = Substitute.For(); + var server = new SiteStreamGrpcServer( + subscriber, NullLogger.Instance, Options.Create(options)); + + Assert.Equal(250, server.MaxConcurrentStreams); + Assert.Equal(TimeSpan.FromHours(2), server.MaxStreamLifetime); + } +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryDisposeTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryDisposeTests.cs new file mode 100644 index 0000000..1f93ec8 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryDisposeTests.cs @@ -0,0 +1,129 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Regression tests for Communication-007 — the factory's synchronous +/// must not block on the +/// async disposal path (sync-over-async). It must dispose each client through +/// the client's synchronous . +/// +public class SiteStreamGrpcClientFactoryDisposeTests +{ + /// + /// Test client that records whether it was disposed via the sync or async path. + /// + private sealed class TrackingClient : SiteStreamGrpcClient + { + public bool SyncDisposeCalled { get; private set; } + public bool AsyncDisposeCalled { get; private set; } + + public override void Dispose() => SyncDisposeCalled = true; + + public override ValueTask DisposeAsync() + { + AsyncDisposeCalled = true; + return ValueTask.CompletedTask; + } + } + + /// + /// Test factory that hands out instances while + /// still exercising the base factory's real caching and disposal machinery. + /// + private sealed class TrackingFactory : SiteStreamGrpcClientFactory + { + private readonly ConcurrentBag _created = new(); + + public TrackingFactory() : base(NullLoggerFactory.Instance) { } + + public IReadOnlyCollection Created => _created.ToList(); + + protected override SiteStreamGrpcClient CreateClient(string grpcEndpoint) + { + var client = new TrackingClient(); + _created.Add(client); + return client; + } + } + + [Fact] + public void Dispose_DisposesClientsSynchronously_NotViaAsyncPath() + { + var factory = new TrackingFactory(); + factory.GetOrCreate("site-a", "http://localhost:5100"); + factory.GetOrCreate("site-b", "http://localhost:5200"); + + factory.Dispose(); + + Assert.NotEmpty(factory.Created); + Assert.All(factory.Created, c => + { + Assert.True(c.SyncDisposeCalled, "client should be disposed via synchronous Dispose()"); + Assert.False(c.AsyncDisposeCalled, "synchronous Dispose() must not route through DisposeAsync()"); + }); + } + + [Fact] + public void Dispose_DoesNotDeadlock_UnderSingleThreadedSynchronizationContext() + { + // A strict single-threaded SynchronizationContext: continuations posted to + // it are only pumped by the worker loop. Sync-over-async (blocking the only + // thread on an async continuation that needs that same thread) deadlocks here. + using var ctx = new SingleThreadSyncContext(); + Exception? captured = null; + var done = new ManualResetEventSlim(); + + ctx.Post(_ => + { + try + { + var factory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance); + factory.GetOrCreate("site-a", "http://localhost:5100"); + factory.Dispose(); + } + catch (Exception ex) + { + captured = ex; + } + finally + { + done.Set(); + } + }, null); + + Assert.True(done.Wait(TimeSpan.FromSeconds(5)), + "factory.Dispose() did not complete — likely a sync-over-async deadlock"); + Assert.Null(captured); + } + + /// Minimal single-threaded synchronization context for the deadlock test. + private sealed class SingleThreadSyncContext : SynchronizationContext, IDisposable + { + private readonly BlockingCollection<(SendOrPostCallback cb, object? state)> _queue = new(); + private readonly Thread _thread; + + public SingleThreadSyncContext() + { + _thread = new Thread(Run) { IsBackground = true }; + _thread.Start(); + } + + private void Run() + { + SetSynchronizationContext(this); + foreach (var (cb, state) in _queue.GetConsumingEnumerable()) + cb(state); + } + + public override void Post(SendOrPostCallback d, object? state) => _queue.Add((d, state)); + + public void Dispose() + { + _queue.CompleteAdding(); + _thread.Join(TimeSpan.FromSeconds(2)); + } + } +}