From 4df8737c86f2adfb7998734e977fd590186ce523 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 22 May 2026 05:54:33 -0400 Subject: [PATCH] fix(driver-galaxy): wire event-stream faults to the reconnect supervisor (Driver.Galaxy-001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ReconnectSupervisor was constructed but its trigger ReportTransportFailure was never called. When the gateway StreamEvents stream faulted, EventPump just logged and exited — the supervisor was never notified, so a transient gateway drop permanently stopped data-change notifications while GetHealth() still reported Healthy. EventPump gains an optional onStreamFault callback invoked from its stream-fault catch block (not on clean shutdown). GalaxyDriver wires it to ReconnectSupervisor.ReportTransportFailure so a transport drop drives reopen → replay. This is the minimal fix for -001; the pump-restart-on-reopen gap remains tracked as Driver.Galaxy-008. Regression tests cover the callback being invoked on fault, the end-to-end supervisor reopen/replay, and that a clean shutdown does not fire it. Driver.Galaxy suite: 206/206 pass. Resolves code-review finding Driver.Galaxy-001 (Critical). Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Driver.Galaxy/findings.md | 6 +- .../GalaxyDriver.cs | 32 ++++- .../Runtime/EventPump.cs | 28 ++++- .../Runtime/EventPumpStreamFaultTests.cs | 118 ++++++++++++++++++ 4 files changed, 175 insertions(+), 9 deletions(-) create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs diff --git a/code-reviews/Driver.Galaxy/findings.md b/code-reviews/Driver.Galaxy/findings.md index c25159d..2143d27 100644 --- a/code-reviews/Driver.Galaxy/findings.md +++ b/code-reviews/Driver.Galaxy/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 14 | +| Open findings | 13 | ## Checklist coverage @@ -33,13 +33,13 @@ | Severity | Critical | | Category | Error handling & resilience | | Location | `Runtime/EventPump.cs:128`, `GalaxyDriver.cs:222` | -| Status | Open | +| Status | Resolved | **Description:** The `ReconnectSupervisor` is constructed in `BuildProductionRuntimeAsync` and exposes `ReportTransportFailure(Exception)` as the only entry point that starts the reopen -> replay recovery loop. Nothing in the driver ever calls `ReportTransportFailure` (a repo-wide search finds only the declaration). When the gateway `StreamEvents` stream faults, `EventPump.RunAsync` catches the exception, logs "reconnect supervisor (PR 4.5) handles restart", completes the channel, and exits — but the supervisor is never told. The result: a transient gateway transport drop permanently kills the event stream. Data-change notifications stop, no reconnect/replay runs, and `GetHealth()` keeps reporting `Healthy` because `_supervisor.IsDegraded` stays false. This is a production outage with no self-recovery. **Recommendation:** Wire the EventPump (and any gw RPC that observes a transport fault) to call `_supervisor.ReportTransportFailure(ex)`. The simplest path: give `EventPump` a fault callback (or expose a `StreamFaulted` event) that `GalaxyDriver` subscribes to and forwards to the supervisor. The supervisor's `ReopenAsync`/`ReplayAsync` must also restart the EventPump itself (see Driver.Galaxy-008). -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — added an optional `onStreamFault` callback to `EventPump`; `RunAsync`'s stream-fault catch block now invokes it, and `GalaxyDriver.EnsureEventPumpStarted` wires it to `OnEventPumpStreamFault` which forwards the cause to `ReconnectSupervisor.ReportTransportFailure`, so a transient gw transport drop now drives reopen → replay. Regression coverage in `EventPumpStreamFaultTests`. Note: the EventPump itself is still not restarted on reconnect — that pump-restart gap remains tracked under Driver.Galaxy-008. ### Driver.Galaxy-002 diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index e10bedf..2bade95 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -732,13 +732,43 @@ public sealed class GalaxyDriver _eventPump = new EventPump( _subscriber!, _subscriptions, _logger, channelCapacity: _options.MxAccess.EventPumpChannelCapacity, - clientName: _options.MxAccess.ClientName); + clientName: _options.MxAccess.ClientName, + onStreamFault: OnEventPumpStreamFault); _eventPump.OnDataChange += OnPumpDataChange; _eventPump.Start(); return _eventPump; } } + /// + /// Stream-fault callback for the . The gw StreamEvents + /// stream faulted (transient gateway drop, network blip, gw restart). Forward + /// the cause to the so it drives reopen → + /// replay; without this hand-off a transient transport drop permanently kills + /// the event stream and GetHealth() keeps reporting Healthy. + /// + private void OnEventPumpStreamFault(Exception cause) + { + var supervisor = _supervisor; + if (supervisor is null) + { + // No production runtime (skeleton / injected-seam path) — nothing to drive. + _logger.LogWarning(cause, + "GalaxyDriver {InstanceId} event stream faulted but no reconnect supervisor is wired.", + _driverInstanceId); + return; + } + + try + { + supervisor.ReportTransportFailure(cause); + } + catch (ObjectDisposedException) + { + // Driver is being disposed — the stream fault is just shutdown noise. + } + } + // ===== IAlarmSource ===== /// diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs index 5cfe796..be505a7 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs @@ -16,9 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// /// /// -/// One pump per connected . Reconnect lives in PR 4.5's -/// supervisor; on transport failure here we log + propagate so the supervisor can -/// decide whether to restart. +/// One pump per connected . Reconnect lives in the +/// ; on transport failure here we log and invoke +/// the optional onStreamFault callback so the owner (GalaxyDriver) can +/// forward the fault to +/// and the supervisor can drive reopen → replay. /// /// /// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a @@ -50,6 +52,7 @@ internal sealed class EventPump : IAsyncDisposable private readonly SubscriptionRegistry _registry; private readonly ILogger _logger; private readonly Func _handleFactory; + private readonly Action? _onStreamFault; private readonly Channel _channel; private readonly KeyValuePair _clientTag; private readonly CancellationTokenSource _cts = new(); @@ -66,12 +69,14 @@ internal sealed class EventPump : IAsyncDisposable ILogger? logger = null, Func? handleFactory = null, int channelCapacity = DefaultChannelCapacity, - string? clientName = null) + string? clientName = null, + Action? onStreamFault = null) { _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _logger = logger ?? NullLogger.Instance; _handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id)); + _onStreamFault = onStreamFault; if (channelCapacity < 1) { @@ -127,7 +132,20 @@ internal sealed class EventPump : IAsyncDisposable catch (Exception ex) { _logger.LogWarning(ex, - "Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart."); + "Galaxy EventPump loop ended with an exception — notifying reconnect supervisor."); + + // The gw StreamEvents stream faulted. Signal the reconnect supervisor so it + // drives reopen → replay. Without this the stream silently dies and a + // transient gateway drop permanently stops data-change notifications. + if (_onStreamFault is not null) + { + try { _onStreamFault(ex); } + catch (Exception cbEx) + { + _logger.LogWarning(cbEx, + "Galaxy EventPump stream-fault callback threw — supervisor may not have been notified."); + } + } } finally { diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs new file mode 100644 index 0000000..680829e --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs @@ -0,0 +1,118 @@ +using System.Threading.Channels; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Regression coverage for Driver.Galaxy-001 (Critical): when the gw StreamEvents +/// stream faults, the must notify the reconnect supervisor +/// rather than silently logging and exiting. Without the onStreamFault +/// hand-off a transient gateway transport drop permanently kills the event stream. +/// +public sealed class EventPumpStreamFaultTests +{ + private const int WaitMs = 2_000; + + [Fact] + public async Task StreamFault_InvokesOnStreamFaultCallback_WithTheCause() + { + var subscriber = new FaultingSubscriber(); + var registry = new SubscriptionRegistry(); + var faultObserved = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + await using var pump = new EventPump( + subscriber, registry, channelCapacity: 4, clientName: "FaultTest", + onStreamFault: ex => faultObserved.TrySetResult(ex)); + pump.Start(); + + // Drop the gw stream — RunAsync's await foreach throws. + subscriber.FaultStream(new IOException("simulated gateway transport drop")); + + var completed = await Task.WhenAny(faultObserved.Task, Task.Delay(WaitMs)); + completed.ShouldBe(faultObserved.Task, + "EventPump must invoke onStreamFault when the gw StreamEvents stream faults"); + (await faultObserved.Task).ShouldBeOfType(); + } + + [Fact] + public async Task StreamFault_DrivesReconnectSupervisorReopenReplay() + { + // End-to-end: a faulting EventPump wired to a real ReconnectSupervisor must + // drive the supervisor through its reopen → replay recovery loop. + var subscriber = new FaultingSubscriber(); + var registry = new SubscriptionRegistry(); + + var reopenRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var replayRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var supervisor = new ReconnectSupervisor( + reopen: _ => { reopenRan.TrySetResult(); return Task.CompletedTask; }, + replay: _ => { replayRan.TrySetResult(); return Task.CompletedTask; }, + options: new ReconnectOptions( + InitialBackoffOverride: TimeSpan.FromMilliseconds(5), + MaxBackoffOverride: TimeSpan.FromMilliseconds(20))); + + await using var pump = new EventPump( + subscriber, registry, channelCapacity: 4, clientName: "FaultTest", + onStreamFault: supervisor.ReportTransportFailure); + pump.Start(); + + supervisor.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy); + + subscriber.FaultStream(new IOException("simulated gateway transport drop")); + + (await Task.WhenAny(reopenRan.Task, Task.Delay(WaitMs))).ShouldBe(reopenRan.Task, + "stream fault must trigger the supervisor's reopen path"); + (await Task.WhenAny(replayRan.Task, Task.Delay(WaitMs))).ShouldBe(replayRan.Task, + "stream fault must trigger the supervisor's replay path"); + + await supervisor.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); + supervisor.IsDegraded.ShouldBeFalse(); + } + + [Fact] + public async Task CleanShutdown_DoesNotInvokeOnStreamFault() + { + var subscriber = new FaultingSubscriber(); + var registry = new SubscriptionRegistry(); + var faulted = false; + + var pump = new EventPump( + subscriber, registry, channelCapacity: 4, clientName: "FaultTest", + onStreamFault: _ => faulted = true); + pump.Start(); + + // Graceful disposal cancels the loop — that is OperationCanceledException, + // not a transport fault, and must NOT trip the supervisor. + await pump.DisposeAsync(); + + faulted.ShouldBeFalse("clean shutdown must not be reported as a transport fault"); + } + + /// + /// fake whose StreamEvents stream can be faulted + /// on demand so the EventPump's RunAsync catch path is exercised. + /// + private sealed class FaultingSubscriber : IGalaxySubscriber + { + private readonly Channel _stream = + Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => Task.FromResult>([]); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + + /// Fault the stream so the pump's await foreach throws. + public void FaultStream(Exception cause) => _stream.Writer.TryComplete(cause); + } +}