From 123e3e48b9c8af83cefca94202fb207bf285d975 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 29 Apr 2026 15:39:21 -0400 Subject: [PATCH] =?UTF-8?q?PR=204.5=20=E2=80=94=20ReconnectSupervisor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit State machine that drives GalaxyDriver's recovery from gw transport failure. Healthy → TransportLost → Reopening → Replaying → Healthy. Drivers report failure signals; the supervisor runs reopen + replay with capped exponential backoff (default 500ms → 30s) until both succeed. Files: - Runtime/ReconnectSupervisor.cs — state machine with snapshot, change event, last-error tracking, and a one-attempt-at-a-time recovery loop. Idempotent ReportTransportFailure: repeated failure reports during an in-flight recovery do not spawn parallel loops. Reopen + replay are caller-supplied callbacks (the driver injects them in the wire-up PR); reopen re-Registers the gw session, replay re-establishes every active subscription via gw's ReplaySubscriptionsCommand (mxaccessgw issue gw-3) or the SubscribeBulk fallback. Dispose cancels the loop cleanly. - Public StateTransition record + IsDegraded predicate the driver maps to DriverState.Degraded for health snapshots. Wiring (GalaxyDriver subscribes the supervisor to its EventPump's transport-failure signal, exposes IsDegraded through GetHealth(), routes reopen/replay callbacks through GalaxyMxSession + SubscriptionRegistry) lands in PR 4.W to avoid conflict with the parallel host-probe track (PR 4.7) and align the wire-up with the rest of Phase 4's plumbing. 9 supervisor tests (full state-machine traversal, retry-until-success on both reopen and replay failures, idempotent failure reports, last-error propagation, Dispose mid-recovery, post-dispose throws, fast-path Healthy WaitForHealthy). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Runtime/ReconnectSupervisor.cs | 268 ++++++++++++++++++ .../Runtime/ReconnectSupervisorTests.cs | 173 +++++++++++ 2 files changed, 441 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/ReconnectSupervisor.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/ReconnectSupervisorTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/ReconnectSupervisor.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/ReconnectSupervisor.cs new file mode 100644 index 0000000..a65e7d1 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/ReconnectSupervisor.cs @@ -0,0 +1,268 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Coordinates GalaxyDriver's recovery from gateway transport failure. Drives a +/// state machine — Healthy → TransportLost → Reopening → Replaying → Healthy +/// — and exposes the current state through a snapshot + change event so the +/// driver's DriverHealth reflects Degraded while we're not in +/// Healthy. +/// +/// +/// +/// The supervisor doesn't own the session, the subscription registry, or the +/// event pump. It receives transport-failure signals from the rest of the +/// driver (EventPump throws, a gw RPC raises, the heartbeat times out), runs +/// a one-attempt-at-a-time recovery loop, and lets the rest of the driver +/// continue serving cached state during recovery. +/// +/// +/// Reopen: caller-supplied callback that re-opens the gw session + +/// re-Registers the MXAccess client. Throws on failure. +/// +/// +/// Replay: caller-supplied callback that re-establishes every active +/// subscription. Production wraps gw's ReplaySubscriptionsCommand +/// (mxaccessgw issue #0.3); when that's not available, the callback falls +/// back to walking the SubscriptionRegistry and re-issuing SubscribeBulk for +/// every tracked tag. +/// +/// +/// Backoff is capped exponential — first retry after +/// , doubled per failed attempt, +/// capped at . Persistent failures +/// hold the supervisor in Reopening indefinitely; the supervisor never +/// gives up on its own — operators / Phase 6.4 soak handle that policy. +/// +/// +public sealed class ReconnectSupervisor : IDisposable +{ + /// Recovery state machine. + public enum State + { + Healthy, + TransportLost, + Reopening, + Replaying, + } + + private readonly Func _reopen; + private readonly Func _replay; + private readonly ReconnectOptions _options; + private readonly ILogger _logger; + private readonly Func? _backoffDelay; + + private readonly Lock _stateLock = new(); + private State _state = State.Healthy; + private string? _lastError; + private DateTime? _lastTransitionUtc; + + private Task? _recoveryLoop; + private CancellationTokenSource? _loopCts; + private bool _disposed; + + /// Fires after every state transition. + public event EventHandler? StateChanged; + + public ReconnectSupervisor( + Func reopen, + Func replay, + ReconnectOptions? options = null, + ILogger? logger = null, + Func? backoffDelay = null) + { + _reopen = reopen ?? throw new ArgumentNullException(nameof(reopen)); + _replay = replay ?? throw new ArgumentNullException(nameof(replay)); + _options = options ?? new ReconnectOptions(); + _logger = logger ?? NullLogger.Instance; + _backoffDelay = backoffDelay; + } + + /// Current state. Healthy = fully recovered + subscriptions live. + public State CurrentState + { + get { lock (_stateLock) return _state; } + } + + /// True when CurrentState != Healthy. Drivers map this to DriverState.Degraded. + public bool IsDegraded + { + get { lock (_stateLock) return _state != State.Healthy; } + } + + public string? LastError + { + get { lock (_stateLock) return _lastError; } + } + + public DateTime? LastTransitionUtc + { + get { lock (_stateLock) return _lastTransitionUtc; } + } + + /// + /// Notify the supervisor that a gw transport failure has been observed. Idempotent — + /// repeated calls during an in-flight recovery do not start a parallel loop. The + /// first call spawns a background task that drives reopen → replay until it + /// succeeds or cancels it. + /// + public void ReportTransportFailure(Exception cause) + { + ArgumentNullException.ThrowIfNull(cause); + ObjectDisposedException.ThrowIf(_disposed, this); + + lock (_stateLock) + { + _lastError = cause.Message; + if (_state != State.Healthy) + { + // Already recovering — nothing else to do. + _logger.LogDebug("Transport failure reported during {State}: {Message}", _state, cause.Message); + return; + } + + TransitionLocked(State.TransportLost, cause.Message); + + _loopCts = new CancellationTokenSource(); + _recoveryLoop = Task.Run(() => RecoveryLoopAsync(_loopCts.Token)); + } + } + + /// + /// Wait until the current recovery cycle reaches Healthy or the supplied token + /// is cancelled. Returns immediately when already Healthy. Useful for tests + /// and for orchestration that wants to gate calls on recovery completing. + /// + public async Task WaitForHealthyAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested && IsDegraded) + { + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + } + + private async Task RecoveryLoopAsync(CancellationToken ct) + { + var attempt = 0; + while (!ct.IsCancellationRequested) + { + attempt++; + if (attempt > 1) + { + var delay = ComputeBackoff(attempt); + _logger.LogInformation( + "Galaxy reconnect attempt {Attempt} — waiting {Delay} before retry", attempt, delay); + try { await Task.Delay(delay, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + } + + // === Reopening phase === + lock (_stateLock) TransitionLocked(State.Reopening, _lastError); + + try + { + await _reopen(ct).ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } + catch (Exception ex) + { + _logger.LogWarning(ex, "Galaxy reopen failed (attempt {Attempt})", attempt); + lock (_stateLock) { _lastError = ex.Message; } + continue; // back to backoff + retry + } + + // === Replaying phase === + lock (_stateLock) TransitionLocked(State.Replaying, _lastError); + + try + { + await _replay(ct).ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } + catch (Exception ex) + { + _logger.LogWarning(ex, "Galaxy replay failed (attempt {Attempt})", attempt); + lock (_stateLock) { _lastError = ex.Message; } + continue; // back to backoff + retry + } + + // === Done === + lock (_stateLock) + { + _lastError = null; + TransitionLocked(State.Healthy, null); + } + _logger.LogInformation("Galaxy reconnect succeeded after {Attempt} attempt(s)", attempt); + return; + } + } + + private TimeSpan ComputeBackoff(int attempt) + { + if (_backoffDelay is not null) + return _backoffDelay(attempt, _options.InitialBackoff, _options.MaxBackoff); + + // Standard capped exponential — InitialBackoff * 2^(attempt-2), capped at MaxBackoff. + // Attempt 2 → InitialBackoff, attempt 3 → 2x, attempt 4 → 4x, etc. + var multiplier = Math.Min(1L << Math.Max(0, attempt - 2), int.MaxValue); + var ticks = _options.InitialBackoff.Ticks * multiplier; + if (ticks <= 0 || ticks > _options.MaxBackoff.Ticks) ticks = _options.MaxBackoff.Ticks; + return TimeSpan.FromTicks(ticks); + } + + private void TransitionLocked(State next, string? cause) + { + if (next == _state) return; + var previous = _state; + _state = next; + _lastTransitionUtc = DateTime.UtcNow; + var transition = new StateTransition(previous, next, cause, _lastTransitionUtc.Value); + try { StateChanged?.Invoke(this, transition); } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Galaxy reconnect StateChanged handler threw — continuing."); + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + CancellationTokenSource? cts; + Task? loop; + lock (_stateLock) { cts = _loopCts; loop = _recoveryLoop; _loopCts = null; _recoveryLoop = null; } + + cts?.Cancel(); + if (loop is not null) + { + try { loop.GetAwaiter().GetResult(); } catch { /* shutdown */ } + } + cts?.Dispose(); + } +} + +/// +/// One state transition observed by the supervisor. +/// +public sealed record StateTransition( + ReconnectSupervisor.State Previous, + ReconnectSupervisor.State Next, + string? Cause, + DateTime AtUtc); + +/// +/// Knobs for the supervisor's backoff. on the driver +/// options record (PR 4.0) maps onto this — they're separate types so the supervisor +/// can be exercised in tests without the full driver options surface. +/// +public sealed record ReconnectOptions( + TimeSpan? InitialBackoffOverride = null, + TimeSpan? MaxBackoffOverride = null) +{ + public TimeSpan InitialBackoff => InitialBackoffOverride ?? TimeSpan.FromMilliseconds(500); + public TimeSpan MaxBackoff => MaxBackoffOverride ?? TimeSpan.FromSeconds(30); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/ReconnectSupervisorTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/ReconnectSupervisorTests.cs new file mode 100644 index 0000000..a2aa121 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/ReconnectSupervisorTests.cs @@ -0,0 +1,173 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Tests for 's state machine + backoff. Each +/// scenario drives the supervisor with controllable reopen/replay callbacks and +/// observes the resulting state transitions. +/// +public sealed class ReconnectSupervisorTests +{ + private const int WaitMs = 2_000; + + private static ReconnectOptions FastBackoff() => + new(InitialBackoffOverride: TimeSpan.FromMilliseconds(5), + MaxBackoffOverride: TimeSpan.FromMilliseconds(20)); + + [Fact] + public void InitialState_IsHealthy() + { + using var sup = new ReconnectSupervisor(_ => Task.CompletedTask, _ => Task.CompletedTask, FastBackoff()); + sup.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy); + sup.IsDegraded.ShouldBeFalse(); + sup.LastError.ShouldBeNull(); + } + + [Fact] + public async Task ReportTransportFailure_DrivesThroughReopenReplay_BackToHealthy() + { + var transitions = new List(); + var lockObj = new object(); + + using var sup = new ReconnectSupervisor( + reopen: _ => Task.CompletedTask, + replay: _ => Task.CompletedTask, + options: FastBackoff()); + + sup.StateChanged += (_, t) => { lock (lockObj) transitions.Add(t); }; + + sup.ReportTransportFailure(new IOException("transport drop")); + await sup.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); + + // Expected sequence: Healthy → TransportLost → Reopening → Replaying → Healthy. + IReadOnlyList snapshot; + lock (lockObj) snapshot = [.. transitions]; + var states = snapshot.Select(t => t.Next).ToArray(); + + states.ShouldContain(ReconnectSupervisor.State.TransportLost); + states.ShouldContain(ReconnectSupervisor.State.Reopening); + states.ShouldContain(ReconnectSupervisor.State.Replaying); + states[^1].ShouldBe(ReconnectSupervisor.State.Healthy); + sup.IsDegraded.ShouldBeFalse(); + } + + [Fact] + public async Task ReopenFailure_RetriesUntilSuccess_StaysInReopeningBetweenAttempts() + { + var attempts = 0; + using var sup = new ReconnectSupervisor( + reopen: _ => { attempts++; return attempts < 3 ? Task.FromException(new IOException("nope")) : Task.CompletedTask; }, + replay: _ => Task.CompletedTask, + options: FastBackoff()); + + sup.ReportTransportFailure(new IOException("kick off")); + await sup.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); + + attempts.ShouldBe(3); + sup.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy); + sup.LastError.ShouldBeNull(); // cleared on Healthy transition + } + + [Fact] + public async Task ReplayFailure_RetriesEntireCycle() + { + var reopens = 0; + var replays = 0; + using var sup = new ReconnectSupervisor( + reopen: _ => { reopens++; return Task.CompletedTask; }, + replay: _ => { replays++; return replays < 2 ? Task.FromException(new IOException("replay nope")) : Task.CompletedTask; }, + options: FastBackoff()); + + sup.ReportTransportFailure(new IOException("kick off")); + await sup.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); + + // First cycle: reopen succeeds, replay fails. Second cycle: both succeed. + reopens.ShouldBe(2); + replays.ShouldBe(2); + sup.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy); + } + + [Fact] + public async Task RepeatedFailureReports_DuringRecovery_DoNotSpawnParallelLoops() + { + var attempts = 0; + using var sup = new ReconnectSupervisor( + reopen: async ct => + { + attempts++; + await Task.Delay(50, ct).ConfigureAwait(false); + }, + replay: _ => Task.CompletedTask, + options: FastBackoff()); + + sup.ReportTransportFailure(new IOException("first")); + // Fire several more reports while reopen is in flight. + for (var i = 0; i < 5; i++) + { + sup.ReportTransportFailure(new IOException($"rapid-{i}")); + } + + await sup.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); + + // One Reopen call regardless of how many failures arrived during recovery. + attempts.ShouldBe(1); + } + + [Fact] + public async Task LastError_ReflectsMostRecentFailureCause() + { + using var sup = new ReconnectSupervisor( + reopen: _ => Task.FromException(new IOException("reopen broke")), + replay: _ => Task.CompletedTask, + options: new ReconnectOptions( + InitialBackoffOverride: TimeSpan.FromMilliseconds(5), + MaxBackoffOverride: TimeSpan.FromMilliseconds(10))); + + sup.ReportTransportFailure(new IOException("initial")); + + // Allow the loop to attempt at least twice. + await Task.Delay(100); + sup.LastError.ShouldNotBeNull(); + sup.LastError.ShouldContain("reopen broke"); // updates from the loop's failed reopen attempts + } + + [Fact] + public async Task Dispose_CancelsRunningRecoveryLoop_Cleanly() + { + var cancelled = false; + var sup = new ReconnectSupervisor( + reopen: async ct => + { + try { await Task.Delay(10_000, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { cancelled = true; throw; } + }, + replay: _ => Task.CompletedTask, + options: FastBackoff()); + + sup.ReportTransportFailure(new IOException("kick off")); + await Task.Delay(50); // let the loop start the long reopen + Should.NotThrow(() => sup.Dispose()); + cancelled.ShouldBeTrue(); + } + + [Fact] + public void ReportTransportFailure_AfterDispose_Throws() + { + var sup = new ReconnectSupervisor(_ => Task.CompletedTask, _ => Task.CompletedTask, FastBackoff()); + sup.Dispose(); + Should.Throw(() => sup.ReportTransportFailure(new IOException("x"))); + } + + [Fact] + public async Task WaitForHealthy_ReturnsImmediately_WhenAlreadyHealthy() + { + using var sup = new ReconnectSupervisor(_ => Task.CompletedTask, _ => Task.CompletedTask, FastBackoff()); + // No failure reported — should be Healthy from the start. + var deadline = DateTime.UtcNow.AddMilliseconds(50); + await sup.WaitForHealthyAsync(new CancellationTokenSource(50).Token); + DateTime.UtcNow.ShouldBeLessThan(deadline.AddMilliseconds(100)); // returned promptly + } +}