diff --git a/code-reviews/Driver.Galaxy/findings.md b/code-reviews/Driver.Galaxy/findings.md
index c25159d..2143d27 100644
--- a/code-reviews/Driver.Galaxy/findings.md
+++ b/code-reviews/Driver.Galaxy/findings.md
@@ -7,7 +7,7 @@
| Review date | 2026-05-22 |
| Commit reviewed | `76d35d1` |
| Status | Reviewed |
-| Open findings | 14 |
+| Open findings | 13 |
## Checklist coverage
@@ -33,13 +33,13 @@
| Severity | Critical |
| Category | Error handling & resilience |
| Location | `Runtime/EventPump.cs:128`, `GalaxyDriver.cs:222` |
-| Status | Open |
+| Status | Resolved |
**Description:** The `ReconnectSupervisor` is constructed in `BuildProductionRuntimeAsync` and exposes `ReportTransportFailure(Exception)` as the only entry point that starts the reopen -> replay recovery loop. Nothing in the driver ever calls `ReportTransportFailure` (a repo-wide search finds only the declaration). When the gateway `StreamEvents` stream faults, `EventPump.RunAsync` catches the exception, logs "reconnect supervisor (PR 4.5) handles restart", completes the channel, and exits — but the supervisor is never told. The result: a transient gateway transport drop permanently kills the event stream. Data-change notifications stop, no reconnect/replay runs, and `GetHealth()` keeps reporting `Healthy` because `_supervisor.IsDegraded` stays false. This is a production outage with no self-recovery.
**Recommendation:** Wire the EventPump (and any gw RPC that observes a transport fault) to call `_supervisor.ReportTransportFailure(ex)`. The simplest path: give `EventPump` a fault callback (or expose a `StreamFaulted` event) that `GalaxyDriver` subscribes to and forwards to the supervisor. The supervisor's `ReopenAsync`/`ReplayAsync` must also restart the EventPump itself (see Driver.Galaxy-008).
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — added an optional `onStreamFault` callback to `EventPump`; `RunAsync`'s stream-fault catch block now invokes it, and `GalaxyDriver.EnsureEventPumpStarted` wires it to `OnEventPumpStreamFault` which forwards the cause to `ReconnectSupervisor.ReportTransportFailure`, so a transient gw transport drop now drives reopen → replay. Regression coverage in `EventPumpStreamFaultTests`. Note: the EventPump itself is still not restarted on reconnect — that pump-restart gap remains tracked under Driver.Galaxy-008.
### Driver.Galaxy-002
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
index e10bedf..2bade95 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
@@ -732,13 +732,43 @@ public sealed class GalaxyDriver
_eventPump = new EventPump(
_subscriber!, _subscriptions, _logger,
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
- clientName: _options.MxAccess.ClientName);
+ clientName: _options.MxAccess.ClientName,
+ onStreamFault: OnEventPumpStreamFault);
_eventPump.OnDataChange += OnPumpDataChange;
_eventPump.Start();
return _eventPump;
}
}
+ ///
+ /// Stream-fault callback for the . The gw StreamEvents
+ /// stream faulted (transient gateway drop, network blip, gw restart). Forward
+ /// the cause to the so it drives reopen →
+ /// replay; without this hand-off a transient transport drop permanently kills
+ /// the event stream and GetHealth() keeps reporting Healthy.
+ ///
+ private void OnEventPumpStreamFault(Exception cause)
+ {
+ var supervisor = _supervisor;
+ if (supervisor is null)
+ {
+ // No production runtime (skeleton / injected-seam path) — nothing to drive.
+ _logger.LogWarning(cause,
+ "GalaxyDriver {InstanceId} event stream faulted but no reconnect supervisor is wired.",
+ _driverInstanceId);
+ return;
+ }
+
+ try
+ {
+ supervisor.ReportTransportFailure(cause);
+ }
+ catch (ObjectDisposedException)
+ {
+ // Driver is being disposed — the stream fault is just shutdown noise.
+ }
+ }
+
// ===== IAlarmSource =====
///
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
index 5cfe796..be505a7 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
@@ -16,9 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
///
///
///
-/// One pump per connected . Reconnect lives in PR 4.5's
-/// supervisor; on transport failure here we log + propagate so the supervisor can
-/// decide whether to restart.
+/// One pump per connected . Reconnect lives in the
+/// ; on transport failure here we log and invoke
+/// the optional onStreamFault callback so the owner (GalaxyDriver) can
+/// forward the fault to
+/// and the supervisor can drive reopen → replay.
///
///
/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a
@@ -50,6 +52,7 @@ internal sealed class EventPump : IAsyncDisposable
private readonly SubscriptionRegistry _registry;
private readonly ILogger _logger;
private readonly Func _handleFactory;
+ private readonly Action? _onStreamFault;
private readonly Channel _channel;
private readonly KeyValuePair _clientTag;
private readonly CancellationTokenSource _cts = new();
@@ -66,12 +69,14 @@ internal sealed class EventPump : IAsyncDisposable
ILogger? logger = null,
Func? handleFactory = null,
int channelCapacity = DefaultChannelCapacity,
- string? clientName = null)
+ string? clientName = null,
+ Action? onStreamFault = null)
{
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_logger = logger ?? NullLogger.Instance;
_handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
+ _onStreamFault = onStreamFault;
if (channelCapacity < 1)
{
@@ -127,7 +132,20 @@ internal sealed class EventPump : IAsyncDisposable
catch (Exception ex)
{
_logger.LogWarning(ex,
- "Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart.");
+ "Galaxy EventPump loop ended with an exception — notifying reconnect supervisor.");
+
+ // The gw StreamEvents stream faulted. Signal the reconnect supervisor so it
+ // drives reopen → replay. Without this the stream silently dies and a
+ // transient gateway drop permanently stops data-change notifications.
+ if (_onStreamFault is not null)
+ {
+ try { _onStreamFault(ex); }
+ catch (Exception cbEx)
+ {
+ _logger.LogWarning(cbEx,
+ "Galaxy EventPump stream-fault callback threw — supervisor may not have been notified.");
+ }
+ }
}
finally
{
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs
new file mode 100644
index 0000000..680829e
--- /dev/null
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs
@@ -0,0 +1,118 @@
+using System.Threading.Channels;
+using MxGateway.Contracts.Proto;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
+
+///
+/// Regression coverage for Driver.Galaxy-001 (Critical): when the gw StreamEvents
+/// stream faults, the must notify the reconnect supervisor
+/// rather than silently logging and exiting. Without the onStreamFault
+/// hand-off a transient gateway transport drop permanently kills the event stream.
+///
+public sealed class EventPumpStreamFaultTests
+{
+ private const int WaitMs = 2_000;
+
+ [Fact]
+ public async Task StreamFault_InvokesOnStreamFaultCallback_WithTheCause()
+ {
+ var subscriber = new FaultingSubscriber();
+ var registry = new SubscriptionRegistry();
+ var faultObserved = new TaskCompletionSource(
+ TaskCreationOptions.RunContinuationsAsynchronously);
+
+ await using var pump = new EventPump(
+ subscriber, registry, channelCapacity: 4, clientName: "FaultTest",
+ onStreamFault: ex => faultObserved.TrySetResult(ex));
+ pump.Start();
+
+ // Drop the gw stream — RunAsync's await foreach throws.
+ subscriber.FaultStream(new IOException("simulated gateway transport drop"));
+
+ var completed = await Task.WhenAny(faultObserved.Task, Task.Delay(WaitMs));
+ completed.ShouldBe(faultObserved.Task,
+ "EventPump must invoke onStreamFault when the gw StreamEvents stream faults");
+ (await faultObserved.Task).ShouldBeOfType();
+ }
+
+ [Fact]
+ public async Task StreamFault_DrivesReconnectSupervisorReopenReplay()
+ {
+ // End-to-end: a faulting EventPump wired to a real ReconnectSupervisor must
+ // drive the supervisor through its reopen → replay recovery loop.
+ var subscriber = new FaultingSubscriber();
+ var registry = new SubscriptionRegistry();
+
+ var reopenRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var replayRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var supervisor = new ReconnectSupervisor(
+ reopen: _ => { reopenRan.TrySetResult(); return Task.CompletedTask; },
+ replay: _ => { replayRan.TrySetResult(); return Task.CompletedTask; },
+ options: new ReconnectOptions(
+ InitialBackoffOverride: TimeSpan.FromMilliseconds(5),
+ MaxBackoffOverride: TimeSpan.FromMilliseconds(20)));
+
+ await using var pump = new EventPump(
+ subscriber, registry, channelCapacity: 4, clientName: "FaultTest",
+ onStreamFault: supervisor.ReportTransportFailure);
+ pump.Start();
+
+ supervisor.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy);
+
+ subscriber.FaultStream(new IOException("simulated gateway transport drop"));
+
+ (await Task.WhenAny(reopenRan.Task, Task.Delay(WaitMs))).ShouldBe(reopenRan.Task,
+ "stream fault must trigger the supervisor's reopen path");
+ (await Task.WhenAny(replayRan.Task, Task.Delay(WaitMs))).ShouldBe(replayRan.Task,
+ "stream fault must trigger the supervisor's replay path");
+
+ await supervisor.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token);
+ supervisor.IsDegraded.ShouldBeFalse();
+ }
+
+ [Fact]
+ public async Task CleanShutdown_DoesNotInvokeOnStreamFault()
+ {
+ var subscriber = new FaultingSubscriber();
+ var registry = new SubscriptionRegistry();
+ var faulted = false;
+
+ var pump = new EventPump(
+ subscriber, registry, channelCapacity: 4, clientName: "FaultTest",
+ onStreamFault: _ => faulted = true);
+ pump.Start();
+
+ // Graceful disposal cancels the loop — that is OperationCanceledException,
+ // not a transport fault, and must NOT trip the supervisor.
+ await pump.DisposeAsync();
+
+ faulted.ShouldBeFalse("clean shutdown must not be reported as a transport fault");
+ }
+
+ ///
+ /// fake whose StreamEvents stream can be faulted
+ /// on demand so the EventPump's RunAsync catch path is exercised.
+ ///
+ private sealed class FaultingSubscriber : IGalaxySubscriber
+ {
+ private readonly Channel _stream =
+ Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true });
+
+ public Task> SubscribeBulkAsync(
+ IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
+ => Task.FromResult>([]);
+
+ public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken)
+ => Task.CompletedTask;
+
+ public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken)
+ => _stream.Reader.ReadAllAsync(cancellationToken);
+
+ /// Fault the stream so the pump's await foreach throws.
+ public void FaultStream(Exception cause) => _stream.Writer.TryComplete(cause);
+ }
+}