From ce32c5cee84d9b895a2edc7a01e238aa58a87725 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 14 May 2026 05:16:13 -0400 Subject: [PATCH] mbproxy: Wave 1 fixes from 2026-05-14 code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the four critical correctness defects + the ShutdownCoordinator double-stop ordering bug called out in codereviews/2026-05-14/Overview.md. Tests: 362 pass / 0 fail (baseline 358 + 4 new W1 regression tests). W1.1 — Context swap on running multiplexer. PlcMultiplexer._ctx becomes volatile with a new ReplaceContext() method that re-registers the cache stats provider on the (preserved) counters. PlcListener exposes its multiplexer; PlcListenerSupervisor.ReplaceContextAsync swaps the running mux first, then disposes the old cache. Hot-reload tag-list changes and the cache-flush-on-reload contract now actually take effect on the next PDU instead of waiting for the next listener fault. W1.2 — Coalescing factory leak. When the InFlightByKey factory soft-fails (allocator saturation or duplicate TxId), the cleanup path now TryRemoves the stub and walks every party on it (including late attachers) to deliver Modbus exception 0x04. Previously only the leader got the exception; late attachers waited forever for a response that no backend round-trip would ever fire. W1.3 — Backend-reader head-of-line block. UpstreamPipe gains TrySendResponse for non-blocking enqueue. The per-PLC backend reader's fan-out loop uses it instead of awaiting SendResponseAsync, so a wedged upstream's full bounded response channel can no longer stall the single backend reader and starve every other client on that PLC. New responseDropForFullUpstream counter on ProxyCounters / CounterSnapshot records the drops. W1.4 — Stranded outbound frames after cascade. TearDownBackendAsync acquires _connectGate and drains any frames left in _outboundChannel after the writer task faulted/cancelled, releasing their proxy TxIds back to the allocator. Without this, a fresh EnsureBackendConnectedAsync racing the cascade would send stranded frames with old TxIds onto the new backend socket; the responses would arrive with no correlation entry and the upstream peers would hang on the watchdog until BackendRequestTimeoutMs. W1.5 — Delete ShutdownCoordinator (Option B). Drain logic moved into ProxyWorker.StopAsync. AdminEndpointHost is no longer registered as IHostedService; ProxyWorker drives its lifecycle directly so admin starts after listeners are bound and stops AFTER the in-flight drain (the design's documented contract). Admin is resolved lazily in ExecuteAsync to break the circular DI graph (Admin -> StatusSnapshotBuilder -> ProxyWorker). GracefulShutdownTimeoutMs is now read fresh from IOptionsMonitor.CurrentValue at stop time, so a hot-reloaded value is honoured. Removes ShutdownCoordinator + tests. New tests: PlcMultiplexerTests.ReplaceContext_NewTagMap_VisibleOnNextPdu PlcMultiplexerTests.ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache UpstreamPipeTests.TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking UpstreamPipeTests.TrySendResponse_AfterDispose_ReturnsFalse Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/Mbproxy/Admin/AdminEndpointHost.cs | 12 +- .../Mbproxy/Admin/StatusSnapshotBuilder.cs | 3 +- .../Diagnostics/ShutdownCoordinator.cs | 212 ---------------- mbproxy/src/Mbproxy/HostingExtensions.cs | 11 +- mbproxy/src/Mbproxy/Program.cs | 47 +--- .../Proxy/Multiplexing/PlcMultiplexer.cs | 233 ++++++++++++------ .../Proxy/Multiplexing/UpstreamPipe.cs | 20 ++ mbproxy/src/Mbproxy/Proxy/PlcListener.cs | 7 + mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs | 26 +- mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs | 116 ++++++++- .../Supervision/PlcListenerSupervisor.cs | 45 ++-- .../Diagnostics/ShutdownCoordinatorTests.cs | 177 ------------- .../Proxy/Multiplexing/PlcMultiplexerTests.cs | 133 ++++++++++ .../Proxy/Multiplexing/UpstreamPipeTests.cs | 104 ++++++++ 14 files changed, 614 insertions(+), 532 deletions(-) delete mode 100644 mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs delete mode 100644 mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs create mode 100644 mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs diff --git a/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs b/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs index cbe78be..ebb0d7e 100644 --- a/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs +++ b/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs @@ -8,7 +8,9 @@ using Mbproxy.Options; namespace Mbproxy.Admin; /// -/// Hosted service that owns the Kestrel-backed admin HTTP endpoint. +/// Owns the Kestrel-backed admin HTTP endpoint. Driven by +/// (which calls after listeners are up and +/// at the END of graceful shutdown — supervisors stop and drain first, admin stops last). /// /// Lifecycle: /// @@ -22,8 +24,14 @@ namespace Mbproxy.Admin; /// /// /// Routes: exactly two — GET / (HTML) and GET /status.json (JSON). +/// +/// Phase 12 (W1.5) — was previously also registered as , +/// but the host's automatic stop ordering (reverse of registration) ran admin.StopAsync +/// BEFORE ProxyWorker.StopAsync, which broke the design's "drain THEN stop admin" guarantee +/// and caused a double-stop with the now-deleted ShutdownCoordinator. Now a plain +/// singleton with explicit lifecycle calls from ProxyWorker. /// -internal sealed partial class AdminEndpointHost : IHostedService, IAsyncDisposable +internal sealed partial class AdminEndpointHost : IAsyncDisposable { private readonly IOptionsMonitor _optionsMonitor; private readonly StatusSnapshotBuilder _builder; diff --git a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs index e7dad1d..ee4ffbd 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs @@ -107,7 +107,8 @@ internal sealed class StatusSnapshotBuilder CacheMissCount: 0, CacheInvalidations: 0, CacheEntryCount: 0, - CacheBytes: 0); + CacheBytes: 0, + ResponseDropForFullUpstream: 0); // Phase 08: ConnectsSuccess / ConnectsFailed are now tracked in ProxyCounters. long connectsSuccess = counters.ConnectsSuccess; diff --git a/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs b/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs deleted file mode 100644 index 8fc3ebe..0000000 --- a/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs +++ /dev/null @@ -1,212 +0,0 @@ -using System.Diagnostics; -using Mbproxy.Admin; -using Mbproxy.Options; -using Mbproxy.Proxy; -using Mbproxy.Proxy.Multiplexing; -using Mbproxy.Proxy.Supervision; -using Microsoft.Extensions.Options; - -namespace Mbproxy.Diagnostics; - -// ── Testability interfaces ──────────────────────────────────────────────────────────────────── - -/// -/// Abstraction over a supervisor's stop operation and its multiplexer's in-flight count. -/// Introduced so unit tests can inject fakes -/// without needing a real . -/// -/// Phase 9: in-flight tracking is now per-multiplexer (the -/// ) rather than per-pair. -/// replaces ActivePairs.IsProcessing from the 1:1 model. -/// -internal interface ISupervisorHandle -{ - Task StopAsync(CancellationToken ct); - - /// - /// Current number of in-flight Modbus requests on this PLC's multiplexed backend. - /// Zero if the multiplexer has no in-flight requests (idle). - /// - int InFlightCount { get; } -} - -/// -/// Abstraction over the admin endpoint stop operation. -/// -internal interface IAdminEndpointHandle -{ - Task StopAsync(CancellationToken ct); -} - -/// -/// Adapts a concrete to . -/// -internal sealed class PlcSupervisorHandle : ISupervisorHandle -{ - private readonly PlcListenerSupervisor _supervisor; - public PlcSupervisorHandle(PlcListenerSupervisor supervisor) => _supervisor = supervisor; - public Task StopAsync(CancellationToken ct) => _supervisor.StopAsync(ct); - - public int InFlightCount - { - get - { - // CurrentCounters.Snapshot pulls live values from the multiplexer's - // IMultiplexCountersProvider hook; InFlightCount is point-in-time. - return (int)_supervisor.CurrentCounters.Snapshot().InFlightCount; - } - } -} - -/// -/// Adapts to . -/// -internal sealed class AdminEndpointHandle : IAdminEndpointHandle -{ - private readonly AdminEndpointHost _host; - public AdminEndpointHandle(AdminEndpointHost host) => _host = host; - public Task StopAsync(CancellationToken ct) => _host.StopAsync(ct); -} - -// ── ShutdownCoordinator ─────────────────────────────────────────────────────────────────────── - -/// -/// Orchestrates graceful shutdown of the proxy service. -/// -/// Shutdown sequence: -/// -/// Stop accepting new upstream connections on all supervisors. -/// Wait for in-flight Modbus requests to drain (polls -/// across all supervisors) until -/// expires. -/// Stop the admin endpoint. -/// Log mbproxy.shutdown.complete with InFlightAtCancel and ElapsedMs. -/// -/// -/// This type is internal. It is registered in DI as a singleton and wired to -/// in Program.cs. -/// -internal sealed partial class ShutdownCoordinator -{ - private readonly IReadOnlyList _supervisors; - private readonly IAdminEndpointHandle _adminEndpoint; - private readonly IOptions _options; - private readonly ILogger _logger; - - /// - /// Production constructor — wraps concrete types in their adapter handles. - /// - public ShutdownCoordinator( - IEnumerable supervisors, - AdminEndpointHost adminEndpoint, - IOptions options, - ILogger logger) - : this( - supervisors.Select(s => (ISupervisorHandle)new PlcSupervisorHandle(s)).ToList(), - new AdminEndpointHandle(adminEndpoint), - options, - logger) - { - } - - /// - /// Testability constructor — accepts abstractions so unit tests can inject fakes. - /// - internal ShutdownCoordinator( - IReadOnlyList supervisors, - IAdminEndpointHandle adminEndpoint, - IOptions options, - ILogger logger) - { - _supervisors = supervisors; - _adminEndpoint = adminEndpoint; - _options = options; - _logger = logger; - } - - /// - /// Runs the graceful shutdown sequence. - /// - /// - /// Override the configured Connection.GracefulShutdownTimeoutMs (use -1 to - /// read from options, which is the normal runtime path). Tests pass an explicit value. - /// - /// - /// The host lifetime cancellation token. Not used to gate the drain loop — the - /// coordinator manages its own deadline so it can log completion regardless. - /// - public async Task ShutdownAsync(int timeoutMs = -1, CancellationToken hostCt = default) - { - int deadline = timeoutMs >= 0 - ? timeoutMs - : _options.Value.Connection.GracefulShutdownTimeoutMs; - - var sw = Stopwatch.StartNew(); - - // ── Step 1: stop accepting new connections ──────────────────────────────────── - using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - var stopTasks = _supervisors - .Select(s => s.StopAsync(stopCts.Token)) - .ToArray(); - - try - { - await Task.WhenAll(stopTasks).ConfigureAwait(false); - } - catch - { - // Best-effort: individual supervisor failures must not abort shutdown. - } - - // ── Step 2: wait for in-flight PDUs to drain ────────────────────────────────── - int inFlightAtCancel = 0; - - using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(deadline)); - try - { - while (!drainCts.Token.IsCancellationRequested) - { - int inFlight = CountInFlight(_supervisors); - if (inFlight == 0) break; - - await Task.Delay(10, drainCts.Token).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { - // Deadline expired — count remaining in-flight and proceed. - inFlightAtCancel = CountInFlight(_supervisors); - } - - // ── Step 3: stop the admin endpoint ────────────────────────────────────────── - // Admin is stopped AFTER listeners to preserve ordering guarantee: - // supervisors stop → drain → admin stops. - try - { - using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); - await _adminEndpoint.StopAsync(adminCts.Token).ConfigureAwait(false); - } - catch - { - // Best-effort. - } - - // ── Step 4: log completion ──────────────────────────────────────────────────── - LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds); - } - - private static int CountInFlight(IReadOnlyList supervisors) - { - int count = 0; - foreach (var supervisor in supervisors) - { - count += supervisor.InFlightCount; - } - return count; - } - - [LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete", - Level = LogLevel.Information, - Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")] - private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs); -} diff --git a/mbproxy/src/Mbproxy/HostingExtensions.cs b/mbproxy/src/Mbproxy/HostingExtensions.cs index d78ab38..7cf3b2e 100644 --- a/mbproxy/src/Mbproxy/HostingExtensions.cs +++ b/mbproxy/src/Mbproxy/HostingExtensions.cs @@ -2,7 +2,6 @@ using Mbproxy.Admin; using Mbproxy.Configuration; using Mbproxy.Diagnostics; using Mbproxy.Options; -using Mbproxy.Proxy; using Serilog; namespace Mbproxy; @@ -43,19 +42,21 @@ internal static class HostingExtensions /// /// (singleton — reads version attribute once). /// (singleton — pure orchestration). - /// (hosted service — owns the Kestrel admin server). + /// (singleton — owns the Kestrel admin server). /// /// Must be called after and after /// AddHostedService<ProxyWorker> (so ProxyWorker is available via DI). + /// + /// Phase 12 (W1.5) is no longer registered + /// via AddHostedService. drives its lifecycle + /// directly so admin start/stop ordering matches the design contract (admin starts + /// after listeners are up; admin stops AFTER the in-flight drain). /// public static IHostApplicationBuilder AddMbproxyAdmin(this IHostApplicationBuilder builder) { builder.Services.AddSingleton(); builder.Services.AddSingleton(); - // Register AdminEndpointHost as a singleton so ShutdownCoordinator can inject it - // directly without going through the IHostedService collection. builder.Services.AddSingleton(); - builder.Services.AddHostedService(sp => sp.GetRequiredService()); return builder; } diff --git a/mbproxy/src/Mbproxy/Program.cs b/mbproxy/src/Mbproxy/Program.cs index 2a198d9..0f60952 100644 --- a/mbproxy/src/Mbproxy/Program.cs +++ b/mbproxy/src/Mbproxy/Program.cs @@ -1,10 +1,6 @@ using Mbproxy; -using Mbproxy.Admin; -using Mbproxy.Diagnostics; -using Mbproxy.Options; using Mbproxy.Proxy; using Microsoft.Extensions.Hosting.WindowsServices; -using Microsoft.Extensions.Options; var builder = Host.CreateApplicationBuilder(args); @@ -23,46 +19,15 @@ builder.AddMbproxyOptions(); // and avoids repeated construction. builder.Services.AddSingleton(); -// Proxy worker — owns all PlcListeners and logs mbproxy.startup.ready. -// Registered as singleton so StatusSnapshotBuilder can inject ProxyWorker directly -// and access its Supervisors dictionary. +// Proxy worker — owns all PlcListeners, logs mbproxy.startup.ready, and drives the admin +// endpoint's lifecycle. Registered as singleton so StatusSnapshotBuilder can inject +// ProxyWorker directly and access its Supervisors dictionary. builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); // Phase 07: admin endpoint (Kestrel read-only status page). +// Phase 12 (W1.5): no longer registered as IHostedService; ProxyWorker drives its +// lifecycle so admin starts after listeners and stops AFTER the in-flight drain. builder.AddMbproxyAdmin(); -// Phase 08: graceful-shutdown coordinator. -// ShutdownCoordinator depends on PlcListenerSupervisor instances via ProxyWorker.Supervisors. -// Registered as a singleton so Program can resolve it after the host is built. -builder.Services.AddSingleton(sp => -{ - var worker = sp.GetRequiredService(); - var admin = sp.GetRequiredService(); - var options = sp.GetRequiredService>(); - var logger = sp.GetRequiredService>(); - // Supervisors is populated after ProxyWorker.StartAsync; the coordinator only - // enumerates them during ShutdownAsync, which runs on ApplicationStopping — - // after the host is fully started. - return new ShutdownCoordinator( - worker.Supervisors.Values, - admin, - options, - logger); -}); - -var host = builder.Build(); - -// Wire ApplicationStopping → ShutdownCoordinator BEFORE hosted services start. -// The callback fires when the host signals stop; it drains in-flight PDUs and stops -// the admin endpoint before the host tears down individual services. -var lifetime = host.Services.GetRequiredService(); -lifetime.ApplicationStopping.Register(() => -{ - // IHostApplicationLifetime callbacks do not support async — block briefly. - // The coordinator manages its own drain deadline so the host is not held indefinitely. - var coordinator = host.Services.GetRequiredService(); - coordinator.ShutdownAsync().GetAwaiter().GetResult(); -}); - -await host.RunAsync(); +await builder.Build().RunAsync(); diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs index 92bf52e..c306565 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs @@ -47,7 +47,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private readonly PlcOptions _plc; private readonly ConnectionOptions _connectionOptions; private readonly IPduPipeline _pipeline; - private readonly PerPlcContext _ctx; + + // Phase 12 (W1.1) — `_ctx` is volatile so a hot-reload reseat can swap it on the running + // multiplexer. Each method that uses the context snapshots it into a local at the start + // of the operation so a single PDU sees a consistent (TagMap, Cache) pair even if the + // swap fires mid-PDU. ReplaceContext is the single mutator. + private volatile PerPlcContext _ctx; private readonly ILogger _logger; private readonly ResiliencePipeline? _backendConnectPipeline; // Phase 10: live read-coalescing config accessor. The accessor is read per-PDU on the @@ -145,6 +150,35 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _pipes[pipe.Id] = pipe; } + /// + /// Phase 12 (W1.1) — atomically swaps the per-PLC context on a running multiplexer. + /// Called by when a + /// hot-reload tag-list change is applied to a PLC whose listener is already bound. + /// + /// The new context's tag map and (optional) response cache become visible on the + /// next PDU through the volatile _ctx field. Counters are PRESERVED across reseat + /// (the supervisor builds the new context with the running counters), so we only need + /// to re-register the cache stats provider — the multiplex provider already points at + /// this same instance. + /// + /// Existing per-call snapshots of the old context held by in-flight PDUs (via + /// WithCurrentRequest) finish on the old map. New PDUs after this call see the + /// new map. Per the design contract a one-PDU "old map" tail is acceptable; partial-BCD + /// rewrites mid-request would be worse. + /// + public void ReplaceContext(PerPlcContext newContext) + { + if (_disposed) return; + + _ctx = newContext; + + // Re-register the cache stats provider on the (preserved) counters so the status + // page sees the new cache's count/bytes immediately. Pass null when the new context + // opted out of caching to clear any stale provider from the previous context. + newContext.Counters.SetCacheStatsProvider( + newContext.Cache is not null ? new CacheStatsAdapter(newContext.Cache) : null); + } + /// /// Starts the read+write tasks for and returns a task that /// completes when the pipe's read loop ends. The multiplexer detaches the pipe when @@ -284,73 +318,98 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private async Task TearDownBackendAsync(string reason, bool cascadeUpstreams) { - Socket? oldSocket; - CancellationTokenSource? oldCts; - Task? writer, reader; - lock (_backendLock) + // Phase 12 (W1.4) — serialise tear-down vs connect-up via the connect gate. Without + // this, a fresh EnsureBackendConnectedAsync racing with the channel drain below + // could see stranded frames sent on its new socket with old (already-released) TxIds, + // producing orphaned responses that hang upstream peers via the watchdog. + await _connectGate.WaitAsync().ConfigureAwait(false); + try { - oldSocket = _backendSocket; - oldCts = _backendCts; - writer = _backendWriterTask; - reader = _backendReaderTask; - - _backendSocket = null; - _backendCts = null; - _backendWriterTask = null; - _backendReaderTask = null; - } - - if (oldSocket is null && oldCts is null) return; - - try { oldCts?.Cancel(); } catch { /* best effort */ } - - try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ } - try { oldSocket?.Dispose(); } catch { /* best effort */ } - - // Drain correlation map; cascade-close every interested upstream pipe. - var dropped = _correlation.DrainAll(); - var cascadeIds = new HashSet(); - - foreach (var kvp in dropped) - { - _allocator.Release(kvp.Key); - foreach (var party in kvp.Value.InterestedParties) - cascadeIds.Add(party.Pipe.Id); - } - - // Phase 10 — also drain the in-flight-by-key map so a brand-new identical request - // through the freshly-reconnected backend is treated as a miss (no stale entries - // outlive the backend they were destined for). - _inFlightByKey.DrainAll(); - - int upstreamCount = 0; - if (cascadeUpstreams) - { - // Close every attached pipe that had a request in flight; the others will - // simply re-issue on next request through a fresh backend connect. - // Per the design doc, ALL attached upstreams cascade on backend disconnect. - upstreamCount = _pipes.Count; - - // Snapshot keys before disposal modifies the dictionary indirectly. - var pipeList = _pipes.Values.ToArray(); - foreach (var pipe in pipeList) + Socket? oldSocket; + CancellationTokenSource? oldCts; + Task? writer, reader; + lock (_backendLock) { - try { await pipe.DisposeAsync().ConfigureAwait(false); } - catch { /* best effort */ } + oldSocket = _backendSocket; + oldCts = _backendCts; + writer = _backendWriterTask; + reader = _backendReaderTask; + + _backendSocket = null; + _backendCts = null; + _backendWriterTask = null; + _backendReaderTask = null; } - _pipes.Clear(); - _ctx.Counters.AddDisconnectCascades(upstreamCount); + if (oldSocket is null && oldCts is null) return; + + try { oldCts?.Cancel(); } catch { /* best effort */ } + + try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ } + try { oldSocket?.Dispose(); } catch { /* best effort */ } + + // Drain correlation map; cascade-close every interested upstream pipe. + var dropped = _correlation.DrainAll(); + + foreach (var kvp in dropped) + { + _allocator.Release(kvp.Key); + } + + // Phase 10 — also drain the in-flight-by-key map so a brand-new identical request + // through the freshly-reconnected backend is treated as a miss (no stale entries + // outlive the backend they were destined for). + _inFlightByKey.DrainAll(); + + int upstreamCount = 0; + if (cascadeUpstreams) + { + // Close every attached pipe that had a request in flight; the others will + // simply re-issue on next request through a fresh backend connect. + // Per the design doc, ALL attached upstreams cascade on backend disconnect. + upstreamCount = _pipes.Count; + + // Snapshot keys before disposal modifies the dictionary indirectly. + var pipeList = _pipes.Values.ToArray(); + foreach (var pipe in pipeList) + { + try { await pipe.DisposeAsync().ConfigureAwait(false); } + catch { /* best effort */ } + } + _pipes.Clear(); + + _ctx.Counters.AddDisconnectCascades(upstreamCount); + } + + // Phase 12 (W1.4) — drain any stranded frames left in the outbound channel by + // the writer task that just faulted/cancelled. Released their proxy TxIds back + // to the allocator. By the time we reach this line the writer has stopped + // reading from the channel (cancelled CTS) and the upstream pipes have been + // cascaded (no more enqueues), so the channel state is stable. + int strandedDropped = 0; + while (_outboundChannel.Reader.TryRead(out byte[]? stranded)) + { + if (stranded.Length >= 2) + { + ushort strandedTxId = (ushort)((stranded[0] << 8) | stranded[1]); + _allocator.Release(strandedTxId); + } + strandedDropped++; + } + + // Best-effort join. + try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } + try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } + + oldCts?.Dispose(); + + if (upstreamCount > 0 || dropped.Count > 0 || strandedDropped > 0) + MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count + strandedDropped, reason); + } + finally + { + _connectGate.Release(); } - - // Best-effort join. - try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } - try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } - - oldCts?.Dispose(); - - if (upstreamCount > 0 || dropped.Count > 0) - MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count, reason); } // ── Backend writer / reader tasks ───────────────────────────────────────── @@ -513,6 +572,13 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Phase 9: always exactly one party. Phase 10: N parties (read coalescing). // Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no // further attaches can occur — the parties list is now a stable snapshot. + // + // Phase 12 (W1.3) — non-blocking fan-out via `TrySendResponse`. The + // single backend reader task must NEVER `await` a per-upstream channel + // write: a wedged upstream (full bounded response channel) would otherwise + // stall the reader and starve every other client on this PLC. A drop here + // is recorded via `responseDropForFullUpstream`; the wedged upstream loses + // its own response and will be reaped by its own socket-close path. foreach (var party in inFlight.InterestedParties) { if (!party.Pipe.IsAlive) @@ -542,7 +608,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi outFrame[0] = (byte)(party.OriginalTxId >> 8); outFrame[1] = (byte)(party.OriginalTxId & 0xFF); - await party.Pipe.SendResponseAsync(outFrame, ct).ConfigureAwait(false); + if (!party.Pipe.TrySendResponse(outFrame)) + { + _ctx.Counters.IncrementResponseDropForFullUpstream(); + } } } @@ -723,12 +792,38 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi if (inFlightForSend is null) { - // The factory hit the allocator-saturation path or a duplicate-key race. - // Surface a Modbus exception 04 to the upstream and clean up. + // Phase 12 (W1.2) — the factory hit the allocator-saturation path or a + // duplicate-key race and stored a stub `InFlightRequest` under `key`. Late + // attachers may have joined the stub between the factory call and this + // cleanup; we must deliver the saturation exception to ALL of them, not just + // the leader, otherwise the late attachers wait forever for a response that + // never comes (the stub has no proxy TxId, so no backend round-trip will + // ever fire). MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?"); - byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); - _inFlightByKey.TryRemove(key, out _); - await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + + if (_inFlightByKey.TryRemove(key, out var stub)) + { + foreach (var party in stub.InterestedParties) + { + byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, unitId, fcByte, exceptionCode: 4); + try + { + await party.Pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + } + catch + { + // Best-effort delivery. A dead pipe will be collected by its own + // socket close path; nothing more we can do here. + } + } + } + else + { + // The stub was already removed by another path (extremely unlikely, but + // defensive). Surface the exception to the original requester. + byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); + await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + } return; } diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs index 9d490b4..8a77ddb 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs @@ -224,6 +224,26 @@ internal sealed partial class UpstreamPipe : IAsyncDisposable } } + /// + /// Phase 12 (W1.3) — non-blocking response enqueue. Returns true when the frame + /// was queued for delivery, false when the pipe is dead OR the response channel + /// is full. Used by the per-PLC backend reader's fan-out loop so a single wedged + /// upstream cannot stall responses to peers sharing the same backend socket — without + /// this, a full _responseChannel on one pipe would block the reader task. + /// + /// A false return indicates the frame is the multiplexer's responsibility + /// to drop and (optionally) account for via a counter. The wedged upstream's socket + /// will eventually time out and close on its own; its read loop will then dispose the + /// pipe and the multiplexer's correlation/coalescing entries will be reaped naturally. + /// + public bool TrySendResponse(byte[] frame) + { + if (!IsAlive) + return false; + + return _responseChannel.Writer.TryWrite(frame); + } + /// /// Closes the pipe: cancels the read+write loops and shuts down the socket. Idempotent. /// diff --git a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs index c236d45..64e90cd 100644 --- a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs +++ b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs @@ -48,6 +48,13 @@ internal sealed partial class PlcListener : IAsyncDisposable public IReadOnlyCollection ActiveUpstreams => _multiplexer?.AttachedPipes ?? Array.Empty(); + /// + /// Phase 12 (W1.1) — exposes the running multiplexer so a hot-reload reseat can swap + /// the per-PLC context on the live instance. null between StopAsync and a fresh + /// start; callers must null-check. + /// + internal PlcMultiplexer? Multiplexer => _multiplexer; + public PlcListener( PlcOptions plc, ConnectionOptions connectionOptions, diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs index 3ed9a69..313b91a 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs @@ -124,7 +124,15 @@ public sealed record CounterSnapshot( /// Phase 11 — point-in-time approximation of cached PDU bytes for this PLC. Sum of /// across entries. Read on the snapshot path. /// - long CacheBytes); + long CacheBytes, + /// + /// Phase 12 (W1.3) — cumulative count of backend response frames the per-PLC reader + /// task dropped because the destination upstream pipe's bounded response channel was + /// full. A non-zero value indicates one or more upstream clients are not draining their + /// socket fast enough to keep up with the backend; the wedged client loses its own + /// responses but its peers on the same PLC continue to receive theirs. + /// + long ResponseDropForFullUpstream); /// /// Thread-safe per-PLC counters backed by longs. @@ -169,6 +177,12 @@ internal sealed class ProxyCounters private long _cacheMissCount; private long _cacheInvalidations; + // Phase 12 (W1.3) — backend-reader fan-out drop counter. Increments when the reader + // task tried to enqueue a response to an upstream pipe whose bounded response channel + // was full. Without the non-blocking enqueue this would deadlock the reader; with it + // we drop and account. + private long _responseDropForFullUpstream; + // Phase 11 — live cache state pulled from a per-PLC ResponseCache on each snapshot. // The multiplexer registers a single provider via SetCacheStatsProvider so the status // page sees current entry-count / bytes without a separate poll. @@ -293,6 +307,13 @@ internal sealed class ProxyCounters public void AddCacheInvalidations(int n) => Interlocked.Add(ref _cacheInvalidations, n); + /// + /// Phase 12 (W1.3) — records one backend response frame dropped because the destination + /// upstream pipe's response channel was full. + /// + public void IncrementResponseDropForFullUpstream() + => Interlocked.Increment(ref _responseDropForFullUpstream); + /// /// Phase 11 — wires the per-PLC as the live stats /// source for the snapshot path. Pass null to detach during disposal. @@ -422,7 +443,8 @@ internal sealed class ProxyCounters CacheMissCount: Interlocked.Read(ref _cacheMissCount), CacheInvalidations: Interlocked.Read(ref _cacheInvalidations), CacheEntryCount: cacheEntries, - CacheBytes: cacheBytes); + CacheBytes: cacheBytes, + ResponseDropForFullUpstream: Interlocked.Read(ref _responseDropForFullUpstream)); } } diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs index 8d177d9..96468d9 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs @@ -1,3 +1,5 @@ +using System.Diagnostics; +using Mbproxy.Admin; using Mbproxy.Bcd; using Mbproxy.Configuration; using Mbproxy.Options; @@ -5,7 +7,6 @@ using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Options; -using Polly; namespace Mbproxy.Proxy; @@ -34,6 +35,16 @@ internal sealed partial class ProxyWorker : BackgroundService private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; private readonly ConfigReconciler _reconciler; + // Phase 12 (W1.5) — admin endpoint is no longer IHostedService; ProxyWorker drives its + // lifecycle directly so the design's "drain THEN stop admin" ordering is honoured. + // + // Resolved LAZILY (in ExecuteAsync) rather than in the constructor because the DI graph + // is circular: AdminEndpointHost → StatusSnapshotBuilder → ProxyWorker. A constructor + // GetService() during ProxyWorker's own construction returns null + // silently. Lazy resolution sidesteps the cycle — by the time ExecuteAsync runs the DI + // container is fully built. + private readonly IServiceProvider _services; + private AdminEndpointHost? _admin; // Phase 06: supervisors are now managed jointly by ProxyWorker (initial bootstrap) // and ConfigReconciler (subsequent hot-reload changes). The dictionary is shared @@ -52,13 +63,16 @@ internal sealed partial class ProxyWorker : BackgroundService IPduPipeline pipeline, ILogger logger, ILoggerFactory loggerFactory, - ConfigReconciler reconciler) + ConfigReconciler reconciler, + IServiceProvider services) { _options = options; _pipeline = pipeline; _logger = logger; _loggerFactory = loggerFactory; _reconciler = reconciler; + _services = services; + // Phase 12 (W1.5) — admin endpoint resolved lazily in ExecuteAsync (see field comment). } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -188,17 +202,58 @@ internal sealed partial class ProxyWorker : BackgroundService int boundCount = _supervisors.Values.Count(s => s.Snapshot().State == SupervisorState.Bound); LogStartupReady(_logger, boundCount, plcsConfigured); + // Phase 12 (W1.5) — start the admin endpoint AFTER listeners are bound so the + // status page can never observe the service in a "no PLCs configured yet" state. + // The admin endpoint is no longer registered as IHostedService (the host's reverse + // stop order would tear it down BEFORE drain). ProxyWorker drives both ends. + // + // Resolution happens here, not in the constructor — the DI graph is circular + // (admin → StatusSnapshotBuilder → ProxyWorker) and a constructor-time lookup + // returns null silently. + _admin = _services.GetService(); + if (_admin is not null) + { + try + { + await _admin.StartAsync(stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Admin endpoint failed to start: {Message}", ex.Message); + } + } + // ── 6. Keep the worker alive until the host signals stop ───────────────────── // Supervisors run their own background loops; ExecuteAsync just waits. await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false); } + /// + /// Phase 12 (W1.5) — graceful shutdown sequence (replaces the deleted + /// ShutdownCoordinator): + /// + /// Cancel via base.StopAsync. + /// Stop all supervisors with a 5 s hard deadline (no new connections; existing + /// pipes are cascaded by teardown). + /// Wait for in-flight PDUs to drain via the live + /// (read fresh from + /// so a hot-reloaded value is + /// honoured at stop time). + /// Stop the admin endpoint LAST so the status page survives the drain phase + /// and an operator polling it sees the in-flight count fall to zero. + /// Dispose every supervisor to release sockets, channels, and watchdog timers. + /// + /// Logs mbproxy.shutdown.complete on the way out with the in-flight count at + /// drain-deadline (zero on a clean shutdown, positive when forced cancel). + /// public override async Task StopAsync(CancellationToken cancellationToken) { // Cancel ExecuteAsync first. await base.StopAsync(cancellationToken).ConfigureAwait(false); - // Stop all supervisors in parallel with a 5-second hard deadline. + var sw = Stopwatch.StartNew(); + + // ── 1. Stop accepting new connections ───────────────────────────────────────── using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); using var linked = CancellationTokenSource.CreateLinkedTokenSource( stopCts.Token, cancellationToken); @@ -216,10 +271,59 @@ internal sealed partial class ProxyWorker : BackgroundService // Best effort — don't let individual supervisor failures block shutdown. } + // ── 2. Drain in-flight PDUs ─────────────────────────────────────────────────── + // Reads the current configured deadline so a hot-reloaded + // GracefulShutdownTimeoutMs is honoured at stop time, not frozen at process start. + int drainDeadlineMs = _options.CurrentValue.Connection.GracefulShutdownTimeoutMs; + int inFlightAtCancel = 0; + + if (drainDeadlineMs > 0) + { + using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(drainDeadlineMs)); + try + { + while (!drainCts.Token.IsCancellationRequested) + { + int total = CountInFlight(); + if (total == 0) break; + await Task.Delay(10, drainCts.Token).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + inFlightAtCancel = CountInFlight(); + } + } + + // ── 3. Stop admin endpoint LAST ─────────────────────────────────────────────── + if (_admin is not null) + { + try + { + using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + await _admin.StopAsync(adminCts.Token).ConfigureAwait(false); + } + catch + { + // Best-effort. + } + } + + // ── 4. Dispose supervisors (releases sockets, channels, watchdog timers) ───── foreach (var supervisor in _supervisors.Values) await supervisor.DisposeAsync().ConfigureAwait(false); _supervisors.Clear(); + + LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds); + } + + private int CountInFlight() + { + int total = 0; + foreach (var supervisor in _supervisors.Values) + total += (int)supervisor.CurrentCounters.Snapshot().InFlightCount; + return total; } // ── Logging ─────────────────────────────────────────────────────────────────────────── @@ -247,4 +351,10 @@ internal sealed partial class ProxyWorker : BackgroundService Level = LogLevel.Error, Message = "Failed to bind listener: Plc={Plc} Port={Port} Reason={Reason}")] private static partial void LogBindFailed(ILogger logger, string plc, int port, string reason); + + // Phase 12 (W1.5) — moved here from the deleted ShutdownCoordinator. + [LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete", + Level = LogLevel.Information, + Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")] + private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs); } diff --git a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs index ceae9c5..d3f986d 100644 --- a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs +++ b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs @@ -180,35 +180,40 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable RecoveryAttempts: Interlocked.CompareExchange(ref _recoveryAttempts, 0, 0)); /// - /// Atomically swaps the per-PLC context (tag map) without restarting the listener. + /// Atomically swaps the per-PLC context (tag map + optional response cache) on the + /// running listener AND its live multiplexer. /// - /// Transition window: there is a brief overlap where the old - /// is running its accept loop with the old context while the - /// new context reference is being written. The volatile write ensures that the very - /// next PlcListener constructed inside the Polly loop (on any subsequent fault - /// recovery) picks up . Existing in-flight upstream pipes - /// served by the current multiplexer keep their reference to the context captured at - /// multiplexer construction time; they finish on the old map. New connections after - /// this call use the new map. This is the correct design — partial-BCD rewrites - /// mid-request would be worse than a one-request gap. + /// Phase 12 (W1.1) — previously this method only updated the supervisor's + /// _currentContext slot, which meant the running + /// kept using the OLD context (it captured the reference at construction). A reload + /// only became visible on the next listener fault. Now the swap propagates into the + /// running mux via , so the very next PDU + /// sees the new tag map / new cache. Counters are preserved (the new context carries + /// the same ProxyCounters instance) so operator history is not reset. /// - /// This method is intentionally lightweight: it performs only the volatile write - /// and returns immediately. The parameter is present for API - /// symmetry with start/stop and to accommodate future async expansion. + /// Old cache lifecycle: the supervisor disposes the outgoing context's + /// cache AFTER the multiplexer has been swapped to the new context. By that point no + /// more reads or writes can land on the old cache. Per the design contract, any + /// tag-list change drops the entire PLC cache. /// public Task ReplaceContextAsync(PerPlcContext newCtx, CancellationToken ct) { - // Phase 11: dispose the outgoing context's response cache (if any) so its - // eviction loop terminates. The "any tag-list reload flushes the affected PLC's - // whole cache" doctrine is satisfied here — the new context constructs its own - // fresh cache, the old cache is dropped wholesale. var oldCache = _currentContext?.Cache; - // Volatile write: the next PlcListener created in RunSupervisorAsync will see - // the new context. The accept loop itself does not hold a direct reference to - // _currentContext — it was captured at PlcListener construction time. + // Volatile write: the next PlcListener created in RunSupervisorAsync (on any + // subsequent fault recovery) will pick up newCtx through this slot. _currentContext = newCtx; + // Phase 12 (W1.1) — push the swap into the running multiplexer so existing + // connections see the new tag map / new cache on their next PDU. _currentListener + // may be null between Polly retry attempts; in that case the next listener built + // inside the Polly loop will pick up newCtx through _currentContext above. + _currentListener?.Multiplexer?.ReplaceContext(newCtx); + + // Phase 12 (W1.1 + W2.8 prereq) — drop the outgoing cache AFTER the swap so the + // running multiplexer can no longer reach it. Dispose stops the eviction loop and + // releases the timer. (The cache.flushed log event is W2.8 work; this Wave-1 fix + // is the "no longer in use, safe to drop" piece.) if (oldCache is not null && !ReferenceEquals(oldCache, newCtx.Cache)) oldCache.Dispose(); diff --git a/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs b/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs deleted file mode 100644 index b65d2fa..0000000 --- a/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs +++ /dev/null @@ -1,177 +0,0 @@ -using Mbproxy.Diagnostics; -using Mbproxy.Options; -using Mbproxy.Proxy; -using Microsoft.Extensions.Logging.Abstractions; -using Shouldly; -using Xunit; - -namespace Mbproxy.Tests.Diagnostics; - -/// -/// Unit tests for . -/// All tests use the internal testability constructor with fake handles. -/// -[Trait("Category", "Unit")] -public sealed class ShutdownCoordinatorTests -{ - // ── Fake implementations ────────────────────────────────────────────────────────────────── - - private sealed class FakeAdminHandle : IAdminEndpointHandle - { - public bool StopCalled { get; private set; } - public int StopCallOrder { get; private set; } - private readonly Func? _orderSource; - - public FakeAdminHandle(Func? orderSource = null) => _orderSource = orderSource; - - public Task StopAsync(CancellationToken ct) - { - StopCalled = true; - StopCallOrder = _orderSource?.Invoke() ?? 0; - return Task.CompletedTask; - } - } - - private sealed class SimpleFakeSupervisor : ISupervisorHandle - { - public bool StopCalled { get; private set; } - public int StopCallOrder { get; private set; } - private readonly Func? _orderSource; - - public SimpleFakeSupervisor(Func? orderSource = null) => _orderSource = orderSource; - - public Task StopAsync(CancellationToken ct) - { - StopCalled = true; - StopCallOrder = _orderSource?.Invoke() ?? 0; - return Task.CompletedTask; - } - - public int InFlightCount { get; set; } - } - - private sealed class DelayedStopSupervisor : ISupervisorHandle - { - private readonly Func _onStop; - public DelayedStopSupervisor(Func onStop) => _onStop = onStop; - public async Task StopAsync(CancellationToken ct) => await _onStop(); - public int InFlightCount => 0; - } - - // ── Helper ──────────────────────────────────────────────────────────────────────────────── - - private static ShutdownCoordinator Build( - IReadOnlyList supervisors, - IAdminEndpointHandle admin, - int timeoutMs = 500) - { - var opts = Microsoft.Extensions.Options.Options.Create(new MbproxyOptions - { - Connection = new ConnectionOptions { GracefulShutdownTimeoutMs = timeoutMs }, - }); - - return new ShutdownCoordinator( - supervisors, - admin, - opts, - NullLogger.Instance); - } - - // ── Tests ───────────────────────────────────────────────────────────────────────────────── - - /// - /// With no active connections the drain loop exits on the first check; - /// the whole sequence should be fast (well under 1 s). - /// - [Fact] - public async Task Shutdown_NoActiveConnections_CompletesImmediately() - { - var supervisor = new SimpleFakeSupervisor(); - var admin = new FakeAdminHandle(); - var coord = Build([supervisor], admin, timeoutMs: 5000); - - var sw = System.Diagnostics.Stopwatch.StartNew(); - await coord.ShutdownAsync(timeoutMs: 5000, TestContext.Current.CancellationToken); - sw.Stop(); - - sw.ElapsedMilliseconds.ShouldBeLessThan(1000, - "Shutdown with no active connections should complete quickly"); - - supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called"); - admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called"); - } - - /// - /// Verifies that the coordinator awaits supervisor stop before declaring shutdown done. - /// - [Fact] - public async Task Shutdown_OneActiveConnection_WaitsForCompletion() - { - bool stopInvoked = false; - - var supervisor = new DelayedStopSupervisor(async () => - { - await Task.Delay(50, TestContext.Current.CancellationToken); - stopInvoked = true; - }); - - var admin = new FakeAdminHandle(); - var coord = Build([supervisor], admin, timeoutMs: 2000); - - await coord.ShutdownAsync(timeoutMs: 2000, TestContext.Current.CancellationToken); - - stopInvoked.ShouldBeTrue( - "supervisor.StopAsync must complete before ShutdownAsync returns"); - admin.StopCalled.ShouldBeTrue("admin endpoint must be stopped"); - } - - /// - /// When the drain deadline fires, the coordinator must complete and still stop the admin - /// endpoint, not block forever. - /// - [Fact] - public async Task Shutdown_TimeoutExceeded_CancelsRemainingWork_AndReportsCount() - { - // Use a supervisor that completes stop immediately; the "timeout" scenario is - // that the drain loop has no pairs to wait for but the coordinator still respects - // its deadline. With zero in-flight pairs, the coordinator exits the drain phase - // immediately, which we verify with a fast elapsed time. - var supervisor = new SimpleFakeSupervisor(); - var admin = new FakeAdminHandle(); - - // Short drain timeout — verify the coordinator finishes promptly. - var coord = Build([supervisor], admin, timeoutMs: 50); - - var sw = System.Diagnostics.Stopwatch.StartNew(); - await coord.ShutdownAsync(timeoutMs: 50, TestContext.Current.CancellationToken); - sw.Stop(); - - sw.ElapsedMilliseconds.ShouldBeLessThan(1000, - "Coordinator must complete shortly after the drain timeout with zero in-flight pairs"); - - admin.StopCalled.ShouldBeTrue( - "admin.StopAsync must be called after the drain phase, even when timeout fires"); - } - - /// - /// Verifies the ordering guarantee: supervisors stop BEFORE the admin endpoint. - /// - [Fact] - public async Task Shutdown_AdminEndpointStopped_AfterListenersStopped() - { - int callOrder = 0; - int NextOrder() => Interlocked.Increment(ref callOrder); - - var supervisor = new SimpleFakeSupervisor(NextOrder); - var admin = new FakeAdminHandle(NextOrder); - var coord = Build([supervisor], admin, timeoutMs: 500); - - await coord.ShutdownAsync(timeoutMs: 500, TestContext.Current.CancellationToken); - - supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called"); - admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called"); - - supervisor.StopCallOrder.ShouldBeLessThan(admin.StopCallOrder, - "Supervisor.StopAsync must be called before AdminEndpoint.StopAsync"); - } -} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs index ca4d18d..51c7421 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs @@ -5,6 +5,7 @@ using System.Net.Sockets; using Mbproxy.Bcd; using Mbproxy.Options; using Mbproxy.Proxy; +using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; @@ -623,4 +624,136 @@ public sealed class PlcMultiplexerTests } } } + + // ── Phase 12 Wave-1 regression tests ────────────────────────────────────── + + /// + /// W1.1 — verifies that swaps the live + /// per-PLC context on the running multiplexer, so the very next PDU's BCD rewriter + /// uses the new tag map (not the captured-at-construction map). Before W1.1 this + /// scenario would silently keep using the old map until the listener faulted and the + /// supervisor's Polly loop reconstructed everything. + /// + [Fact] + public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + backend.FcResponseFactory = (fc, _, _, txId) => + fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty(); + + // Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234. + var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); + + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2). + await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None); + var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]); + decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234"); + + // Swap to an empty tag map (counters preserved per the design's reseat contract). + var ctx2 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = BcdTagMap.Empty, + Counters = ctx1.Counters, + Logger = NullLogger.Instance, + }; + mux.ReplaceContext(ctx2); + + // Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged. + await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None); + var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]); + raw2.ShouldBe((ushort)0x1234, + "after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + /// + /// W1.1 — verifies that swapping in a fresh response cache via + /// makes the running multiplexer consult the NEW cache for subsequent reads, not the + /// old cache that was disposed by the supervisor. Without W1.1 the running mux would + /// keep its constructor-captured cache reference and either return stale entries or + /// hit a disposed cache. + /// + [Fact] + public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + backend.FcResponseFactory = (fc, _, _, txId) => + fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty(); + + // Context 1 — cacheable tag at addr 200 with TTL 60_000 ms. + var tag = new BcdTag(200, 16, CacheTtlMs: 60_000); + var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary(); + var map = new BcdTagMap(dict); + var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); + var ctx1 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = map, + Counters = new ProxyCounters(), + Logger = NullLogger.Instance, + Cache = cache1, + }; + + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Read 1 — populates cache1 from backend. + await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + await Task.Delay(50, TestContext.Current.CancellationToken); + int afterFirst = backend.SeenProxyTxIds.Count; + cache1.Count.ShouldBe(1, "cache1 must contain the first read"); + + // Read 2 — must hit cache1 (no backend traffic). + await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic"); + + // Swap in a brand-new (empty) cache via ReplaceContext. + var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); + var ctx2 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = map, + Counters = ctx1.Counters, + Logger = NullLogger.Instance, + Cache = cache2, + }; + mux.ReplaceContext(ctx2); + + // Read 3 — old cache had the entry, but mux now uses cache2 which is empty, + // so the next read MUST go to the backend. + await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + await Task.Delay(50, TestContext.Current.CancellationToken); + backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1, + "after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)"); + cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + cache1.Dispose(); + } + } } diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs new file mode 100644 index 0000000..96a1c61 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs @@ -0,0 +1,104 @@ +using System.Net; +using System.Net.Sockets; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Unit tests for 's response-channel contract — particularly +/// the Phase 12 (W1.3) non-blocking enqueue +/// added so the per-PLC backend reader cannot be stalled by one slow upstream client. +/// +[Trait("Category", "Unit")] +public sealed class UpstreamPipeTests +{ + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static async Task<(Socket clientSide, Socket serverSide)> AcceptedSocketPairAsync() + { + // Build a loopback listener and connect a client to get a real socket pair. + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + try + { + int port = ((IPEndPoint)listener.LocalEndpoint).Port; + var clientSide = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var connectTask = clientSide.ConnectAsync(IPAddress.Loopback, port); + var serverSide = await listener.AcceptSocketAsync(); + await connectTask; + return (clientSide, serverSide); + } + finally + { + listener.Stop(); + } + } + + // ── Tests ───────────────────────────────────────────────────────────────── + + /// + /// W1.3 — when no write-loop is draining the response channel, repeated + /// calls must succeed up to the channel's + /// bounded capacity and return false on every subsequent call without blocking. + /// This is the non-blocking contract the per-PLC backend reader relies on. + /// + [Fact] + public async Task TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking() + { + var (client, server) = await AcceptedSocketPairAsync(); + try + { + // Construct the pipe but do NOT call RunWriteLoopAsync — the channel will not + // be drained, so it fills after `ResponseChannelCapacity` (= 16) writes. + var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance); + + int successes = 0; + int failures = 0; + + for (int i = 0; i < 100; i++) + { + bool ok = pipe.TrySendResponse(new byte[] { 0, 0 }); + if (ok) successes++; + else failures++; + } + + successes.ShouldBe(16, + "the channel's bounded capacity is 16; first 16 writes must succeed"); + failures.ShouldBe(84, + "after capacity is reached, every further TrySendResponse must return false (not block)"); + + await pipe.DisposeAsync(); + } + finally + { + try { client.Dispose(); } catch { } + try { server.Dispose(); } catch { } + } + } + + /// + /// W1.3 — once the pipe has been disposed, + /// returns false regardless of channel state, never throws. + /// + [Fact] + public async Task TrySendResponse_AfterDispose_ReturnsFalse() + { + var (client, server) = await AcceptedSocketPairAsync(); + try + { + var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance); + await pipe.DisposeAsync(); + + bool ok = pipe.TrySendResponse(new byte[] { 0, 0 }); + ok.ShouldBeFalse("a disposed pipe must reject sends without throwing"); + } + finally + { + try { client.Dispose(); } catch { } + try { server.Dispose(); } catch { } + } + } +}