mbproxy: Wave 4 — fix issues introduced by the Wave-1/2 fixes
Closes the new findings from the post-remediation re-review (codereviews/2026-05-14/ReReviewAfterRemediation.md): NC1 — ProxyWorker.StopAsync drain loop is structurally always-zero Wave 1's W1.5 inherited the original ShutdownCoordinator bug it was meant to replace. Supervisor.StopAsync nulls the per-mux counter provider before the drain loop runs, so CountInFlight always returns 0 and the drain budget is never spent on actual draining. Fix: snapshot the in-flight count BEFORE supervisor stop, drop the theatrical post-stop loop, and report InFlightAtCancel as the snapshot count (= the number of in-flight requests dropped by the stop). The supervisor stop IS the drain — there is nothing to drain that wouldn't be killed by the stop itself. NM1 — TearDownBackendAsync._connectGate.WaitAsync uncancellable Without a token, a long Polly-wrapped EnsureBackendConnectedAsync against an unreachable host could hold the gate for the full BackendConnectTimeoutMs * MaxAttempts window, blocking DisposeAsync (and therefore ProxyWorker.StopAsync) for that duration. Fix: bound the wait with a 2 s teardown deadline; on timeout proceed best-effort without the gate. Worst-case consequence is one orphaned in-flight cycle on the dying backend, surfaced to upstream as exception 0x0B by the watchdog. NM2 — ReplaceContext non-atomic ctx + provider swap Snapshot path reads `_cacheStatsProvider` independently of `_ctx`. If `_ctx` was swapped first, a snapshot taken in the gap would still hold the OLD adapter wrapping the OLD cache — which the supervisor disposes immediately after we return. Fix: set the provider FIRST, then swap `_ctx`. Snapshots in the swap window now read either (old, old) or (new, new), never (old-after-disposed). NM5 — Self-cascade ObjectDisposedException after dispose Writer/reader fault catches fired `_ = TearDownBackendAsync(...)` unconditionally. After DisposeAsync runs `_connectGate.Dispose()`, the fire-and-forget TearDown threw ObjectDisposedException on WaitAsync as an unobserved Task exception. Fix: skip self-cascade when `_disposeCts.IsCancellationRequested` — DisposeAsync runs an explicit TearDown anyway. Nm1 — Saturation cleanup uses await SendResponseAsync W1.2's per-attacher delivery loop awaited the blocking SendResponseAsync, which would serialise on a wedged late-attacher's full bounded channel and stall delivery to its peers — contradicting the W1.3 doctrine that the fan-out path must never await per-pipe writes. Fix: use TrySendResponse and increment ResponseDropForFullUpstream on drop. T2 — WatchdogVsResponse_Race seeded Random fragility Used `new Random(12345)` over [350, 450) ms with watchdog at 400 ms; Random's algorithm is implementation-defined across .NET major versions (legacy → Xoshiro128 in .NET 6) so a runtime upgrade could land all samples on one side of the deadline and break the "both branches must fire" assertion. Fix: deterministic counter-based alternation (15 fast + 15 slow across 30 iterations) — guaranteed by construction. Latent items NM3 (_supervisorCts leak on re-Start) and NM4 (TCS single-shot semantics) are unfixed: no caller actually re-Starts a supervisor today; both become real only if the reconciler ever changes to re-Start instead of dispose-and-rebuild. Documented in the re-review. Tests: 387 pass / 0 fail. Three back-to-back race-test runs in isolation all green (T2 alternation is deterministic). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -174,13 +174,23 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
_ctx = newContext;
|
||||
|
||||
// Re-register the cache stats provider on the (preserved) counters so the status
|
||||
// page sees the new cache's count/bytes immediately. Pass null when the new context
|
||||
// opted out of caching to clear any stale provider from the previous context.
|
||||
// Phase 12 (W4 / NM2) — provider FIRST, then _ctx. The status page's snapshot
|
||||
// path reads `_cacheStatsProvider` independently of `_ctx`. If we swapped `_ctx`
|
||||
// first, a snapshot taken in the gap between the two writes would still hold the
|
||||
// OLD adapter wrapping the OLD cache — which the supervisor is about to dispose
|
||||
// (`PlcListenerSupervisor.ReplaceContextAsync` runs `oldCache.Dispose()` after we
|
||||
// return). Setting the provider first means snapshots in the swap window read
|
||||
// either (old provider, old ctx) or (new provider, new ctx) — both coherent —
|
||||
// never (old provider after old cache disposed).
|
||||
//
|
||||
// In the typical reseat case `oldContext.Counters == newContext.Counters` (the
|
||||
// reconciler preserves counters across reseat), so this updates the same instance
|
||||
// both paths share. The order still matters because the snapshot reads the
|
||||
// provider field, not the per-context counters reference.
|
||||
newContext.Counters.SetCacheStatsProvider(
|
||||
newContext.Cache is not null ? new CacheStatsAdapter(newContext.Cache) : null);
|
||||
|
||||
_ctx = newContext;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -330,7 +340,33 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
// this, a fresh EnsureBackendConnectedAsync racing with the channel drain below
|
||||
// could see stranded frames sent on its new socket with old (already-released) TxIds,
|
||||
// producing orphaned responses that hang upstream peers via the watchdog.
|
||||
await _connectGate.WaitAsync().ConfigureAwait(false);
|
||||
//
|
||||
// Phase 12 (W4 / NM1) — bound the wait. Without a timeout, a long Polly-wrapped
|
||||
// EnsureBackendConnectedAsync against an unreachable host can hold the gate for
|
||||
// the full BackendConnectTimeoutMs * MaxAttempts window, blocking DisposeAsync (and
|
||||
// therefore ProxyWorker.StopAsync) for that duration. A 2 s teardown deadline
|
||||
// bounds disposal latency; if the gate is unavailable we proceed best-effort
|
||||
// without it (the worst-case consequence is one orphaned in-flight cycle on the
|
||||
// dying backend, which the upstream watchdog will surface as exception 0x0B).
|
||||
bool gateHeld = false;
|
||||
try
|
||||
{
|
||||
using var teardownCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
||||
await _connectGate.WaitAsync(teardownCts.Token).ConfigureAwait(false);
|
||||
gateHeld = true;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Best-effort: proceed without the gate. Concurrent connect attempts will
|
||||
// observe _disposed (or the now-null _backendSocket) and short-circuit.
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// _connectGate already disposed — TearDown is racing past DisposeAsync.
|
||||
// Skip the body entirely; there's nothing useful to do at this point.
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Socket? oldSocket;
|
||||
@@ -416,7 +452,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectGate.Release();
|
||||
// Only release if we acquired (W4 / NM1) — best-effort path may have skipped.
|
||||
if (gateHeld)
|
||||
{
|
||||
try { _connectGate.Release(); }
|
||||
catch (ObjectDisposedException) { /* dispose race — harmless */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -446,8 +487,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Backend failure — cascade.
|
||||
_ = TearDownBackendAsync($"writer fault: {ex.Message}", cascadeUpstreams: true);
|
||||
// Backend failure — cascade. Phase 12 (W4 / NM5) — skip if disposal is
|
||||
// already in progress; DisposeAsync runs an explicit TearDown and the
|
||||
// fire-and-forget here would race against it, hitting a disposed
|
||||
// _connectGate and producing an unobserved-task exception.
|
||||
if (!_disposeCts.IsCancellationRequested)
|
||||
_ = TearDownBackendAsync($"writer fault: {ex.Message}", cascadeUpstreams: true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -637,7 +682,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
}
|
||||
|
||||
// Reader exited cleanly — backend closed by remote. Cascade.
|
||||
_ = TearDownBackendAsync("backend reader EOF", cascadeUpstreams: true);
|
||||
// Phase 12 (W4 / NM5) — skip if dispose is already in progress (see writer-side
|
||||
// comment above for rationale).
|
||||
if (!_disposeCts.IsCancellationRequested)
|
||||
_ = TearDownBackendAsync("backend reader EOF", cascadeUpstreams: true);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -645,7 +693,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_ = TearDownBackendAsync($"reader fault: {ex.Message}", cascadeUpstreams: true);
|
||||
if (!_disposeCts.IsCancellationRequested)
|
||||
_ = TearDownBackendAsync($"reader fault: {ex.Message}", cascadeUpstreams: true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -834,18 +883,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
|
||||
if (_inFlightByKey.TryRemove(key, out var stub))
|
||||
{
|
||||
// Phase 12 (W4 / Nm1) — non-blocking delivery via TrySendResponse.
|
||||
// Previously this loop awaited SendResponseAsync per party, which would
|
||||
// serialise on a wedged late-attacher's full bounded channel and stall
|
||||
// delivery to its peers. Same doctrine as the W1.3 backend-reader fix:
|
||||
// the per-PLC fan-out path must never await per-pipe writes.
|
||||
foreach (var party in stub.InterestedParties)
|
||||
{
|
||||
byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, unitId, fcByte, exceptionCode: 4);
|
||||
try
|
||||
{
|
||||
await party.Pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Best-effort delivery. A dead pipe will be collected by its own
|
||||
// socket close path; nothing more we can do here.
|
||||
}
|
||||
if (!party.Pipe.TrySendResponse(excFrame))
|
||||
_ctx.Counters.IncrementResponseDropForFullUpstream();
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -853,7 +900,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
// The stub was already removed by another path (extremely unlikely, but
|
||||
// defensive). Surface the exception to the original requester.
|
||||
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4);
|
||||
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
|
||||
if (!pipe.TrySendResponse(excFrame))
|
||||
_ctx.Counters.IncrementResponseDropForFullUpstream();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -244,18 +244,26 @@ internal sealed partial class ProxyWorker : BackgroundService
|
||||
/// <c>ShutdownCoordinator</c>):
|
||||
/// <list type="number">
|
||||
/// <item>Cancel <see cref="ExecuteAsync"/> via <c>base.StopAsync</c>.</item>
|
||||
/// <item>Stop all supervisors with a 5 s hard deadline (no new connections; existing
|
||||
/// pipes are cascaded by <see cref="PlcListenerSupervisor"/> teardown).</item>
|
||||
/// <item>Wait for in-flight PDUs to drain via the live
|
||||
/// <see cref="ConnectionOptions.GracefulShutdownTimeoutMs"/> (read fresh from
|
||||
/// <see cref="IOptionsMonitor{T}.CurrentValue"/> so a hot-reloaded value is
|
||||
/// honoured at stop time).</item>
|
||||
/// <item>Stop the admin endpoint LAST so the status page survives the drain phase
|
||||
/// and an operator polling it sees the in-flight count fall to zero.</item>
|
||||
/// <item><b>Snapshot</b> per-PLC in-flight counts BEFORE stopping supervisors —
|
||||
/// this is the only honest reading of "how many requests were in flight when
|
||||
/// we decided to stop." Once supervisors stop, their multiplexers are torn
|
||||
/// down and the per-mux counter providers are nulled, so any later read
|
||||
/// returns 0 regardless of what was actually dropped.</item>
|
||||
/// <item>Stop all supervisors with the configured graceful timeout. Supervisor
|
||||
/// stop is the actual drain — it cancels the listener, which exits its
|
||||
/// accept loop, which disposes the multiplexer, which cascades all attached
|
||||
/// pipes. There is no separate "drain in-flight" phase because there is
|
||||
/// nothing to drain that wouldn't be killed by the supervisor stop itself
|
||||
/// (the original Phase-08 ShutdownCoordinator's drain loop had this same
|
||||
/// shape and was structurally always-zero — call out from
|
||||
/// codereviews/2026-05-14/ReReviewAfterRemediation.md NC1).</item>
|
||||
/// <item>Stop the admin endpoint LAST so the status page survives the supervisor
|
||||
/// stop phase and operators can observe the live state right up to shutdown.</item>
|
||||
/// <item>Dispose every supervisor to release sockets, channels, and watchdog timers.</item>
|
||||
/// </list>
|
||||
/// Logs <c>mbproxy.shutdown.complete</c> on the way out with the in-flight count at
|
||||
/// drain-deadline (zero on a clean shutdown, positive when forced cancel).
|
||||
/// Logs <c>mbproxy.shutdown.complete</c> with <c>InFlightAtCancel</c> equal to the
|
||||
/// snapshot count from step 2 (= the number of in-flight requests dropped by the
|
||||
/// stop) and <c>ElapsedMs</c> for the whole sequence.
|
||||
/// </summary>
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
@@ -263,12 +271,21 @@ internal sealed partial class ProxyWorker : BackgroundService
|
||||
await base.StopAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
|
||||
// Phase 12 (W4 / NC1) — snapshot in-flight count BEFORE supervisor stop. After
|
||||
// supervisor.StopAsync, multiplexers are disposed and CountInFlight returns 0
|
||||
// unconditionally; reading after the stop produced a meaningless always-zero log
|
||||
// (the original ShutdownCoordinator had the same defect — see
|
||||
// codereviews/2026-05-14/ReReviewAfterRemediation.md NC1).
|
||||
int inFlightAtCancel = CountInFlight();
|
||||
|
||||
// Phase 12 (W2.20) — supervisor stop deadline read from the live config so a
|
||||
// hot-reloaded GracefulShutdownTimeoutMs is honoured. Previously hard-coded 5 s.
|
||||
// The supervisor stop budget is bounded by the same total-shutdown budget.
|
||||
// hot-reloaded GracefulShutdownTimeoutMs is honoured. Supervisor stop is the
|
||||
// drain: cancelling the supervisor cancels the listener, which exits accept, which
|
||||
// disposes the multiplexer, which cascades all attached pipes.
|
||||
int gracefulMs = _options.CurrentValue.Connection.GracefulShutdownTimeoutMs;
|
||||
|
||||
// ── 1. Stop accepting new connections ─────────────────────────────────────────
|
||||
// ── 1. Stop accepting new connections + drain (one combined phase) ────────────
|
||||
using var stopCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(gracefulMs));
|
||||
using var linked = CancellationTokenSource.CreateLinkedTokenSource(
|
||||
stopCts.Token, cancellationToken);
|
||||
@@ -286,30 +303,7 @@ internal sealed partial class ProxyWorker : BackgroundService
|
||||
// Best effort — don't let individual supervisor failures block shutdown.
|
||||
}
|
||||
|
||||
// ── 2. Drain in-flight PDUs ───────────────────────────────────────────────────
|
||||
// Same `gracefulMs` budget the supervisor-stop step used.
|
||||
int drainDeadlineMs = gracefulMs;
|
||||
int inFlightAtCancel = 0;
|
||||
|
||||
if (drainDeadlineMs > 0)
|
||||
{
|
||||
using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(drainDeadlineMs));
|
||||
try
|
||||
{
|
||||
while (!drainCts.Token.IsCancellationRequested)
|
||||
{
|
||||
int total = CountInFlight();
|
||||
if (total == 0) break;
|
||||
await Task.Delay(10, drainCts.Token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
inFlightAtCancel = CountInFlight();
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Stop admin endpoint LAST ───────────────────────────────────────────────
|
||||
// ── 2. Stop admin endpoint LAST ───────────────────────────────────────────────
|
||||
if (_admin is not null)
|
||||
{
|
||||
try
|
||||
@@ -323,7 +317,7 @@ internal sealed partial class ProxyWorker : BackgroundService
|
||||
}
|
||||
}
|
||||
|
||||
// ── 4. Dispose supervisors (releases sockets, channels, watchdog timers) ─────
|
||||
// ── 3. Dispose supervisors (releases sockets, channels, watchdog timers) ─────
|
||||
foreach (var supervisor in _supervisors.Values)
|
||||
await supervisor.DisposeAsync().ConfigureAwait(false);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user