diff --git a/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs b/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs index cbe78be..ebb0d7e 100644 --- a/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs +++ b/mbproxy/src/Mbproxy/Admin/AdminEndpointHost.cs @@ -8,7 +8,9 @@ using Mbproxy.Options; namespace Mbproxy.Admin; /// -/// Hosted service that owns the Kestrel-backed admin HTTP endpoint. +/// Owns the Kestrel-backed admin HTTP endpoint. Driven by +/// (which calls after listeners are up and +/// at the END of graceful shutdown — supervisors stop and drain first, admin stops last). /// /// Lifecycle: /// @@ -22,8 +24,14 @@ namespace Mbproxy.Admin; /// /// /// Routes: exactly two — GET / (HTML) and GET /status.json (JSON). +/// +/// Phase 12 (W1.5) — was previously also registered as , +/// but the host's automatic stop ordering (reverse of registration) ran admin.StopAsync +/// BEFORE ProxyWorker.StopAsync, which broke the design's "drain THEN stop admin" guarantee +/// and caused a double-stop with the now-deleted ShutdownCoordinator. Now a plain +/// singleton with explicit lifecycle calls from ProxyWorker. /// -internal sealed partial class AdminEndpointHost : IHostedService, IAsyncDisposable +internal sealed partial class AdminEndpointHost : IAsyncDisposable { private readonly IOptionsMonitor _optionsMonitor; private readonly StatusSnapshotBuilder _builder; diff --git a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs index e7dad1d..ee4ffbd 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs @@ -107,7 +107,8 @@ internal sealed class StatusSnapshotBuilder CacheMissCount: 0, CacheInvalidations: 0, CacheEntryCount: 0, - CacheBytes: 0); + CacheBytes: 0, + ResponseDropForFullUpstream: 0); // Phase 08: ConnectsSuccess / ConnectsFailed are now tracked in ProxyCounters. long connectsSuccess = counters.ConnectsSuccess; diff --git a/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs b/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs deleted file mode 100644 index 8fc3ebe..0000000 --- a/mbproxy/src/Mbproxy/Diagnostics/ShutdownCoordinator.cs +++ /dev/null @@ -1,212 +0,0 @@ -using System.Diagnostics; -using Mbproxy.Admin; -using Mbproxy.Options; -using Mbproxy.Proxy; -using Mbproxy.Proxy.Multiplexing; -using Mbproxy.Proxy.Supervision; -using Microsoft.Extensions.Options; - -namespace Mbproxy.Diagnostics; - -// ── Testability interfaces ──────────────────────────────────────────────────────────────────── - -/// -/// Abstraction over a supervisor's stop operation and its multiplexer's in-flight count. -/// Introduced so unit tests can inject fakes -/// without needing a real . -/// -/// Phase 9: in-flight tracking is now per-multiplexer (the -/// ) rather than per-pair. -/// replaces ActivePairs.IsProcessing from the 1:1 model. -/// -internal interface ISupervisorHandle -{ - Task StopAsync(CancellationToken ct); - - /// - /// Current number of in-flight Modbus requests on this PLC's multiplexed backend. - /// Zero if the multiplexer has no in-flight requests (idle). - /// - int InFlightCount { get; } -} - -/// -/// Abstraction over the admin endpoint stop operation. -/// -internal interface IAdminEndpointHandle -{ - Task StopAsync(CancellationToken ct); -} - -/// -/// Adapts a concrete to . -/// -internal sealed class PlcSupervisorHandle : ISupervisorHandle -{ - private readonly PlcListenerSupervisor _supervisor; - public PlcSupervisorHandle(PlcListenerSupervisor supervisor) => _supervisor = supervisor; - public Task StopAsync(CancellationToken ct) => _supervisor.StopAsync(ct); - - public int InFlightCount - { - get - { - // CurrentCounters.Snapshot pulls live values from the multiplexer's - // IMultiplexCountersProvider hook; InFlightCount is point-in-time. - return (int)_supervisor.CurrentCounters.Snapshot().InFlightCount; - } - } -} - -/// -/// Adapts to . -/// -internal sealed class AdminEndpointHandle : IAdminEndpointHandle -{ - private readonly AdminEndpointHost _host; - public AdminEndpointHandle(AdminEndpointHost host) => _host = host; - public Task StopAsync(CancellationToken ct) => _host.StopAsync(ct); -} - -// ── ShutdownCoordinator ─────────────────────────────────────────────────────────────────────── - -/// -/// Orchestrates graceful shutdown of the proxy service. -/// -/// Shutdown sequence: -/// -/// Stop accepting new upstream connections on all supervisors. -/// Wait for in-flight Modbus requests to drain (polls -/// across all supervisors) until -/// expires. -/// Stop the admin endpoint. -/// Log mbproxy.shutdown.complete with InFlightAtCancel and ElapsedMs. -/// -/// -/// This type is internal. It is registered in DI as a singleton and wired to -/// in Program.cs. -/// -internal sealed partial class ShutdownCoordinator -{ - private readonly IReadOnlyList _supervisors; - private readonly IAdminEndpointHandle _adminEndpoint; - private readonly IOptions _options; - private readonly ILogger _logger; - - /// - /// Production constructor — wraps concrete types in their adapter handles. - /// - public ShutdownCoordinator( - IEnumerable supervisors, - AdminEndpointHost adminEndpoint, - IOptions options, - ILogger logger) - : this( - supervisors.Select(s => (ISupervisorHandle)new PlcSupervisorHandle(s)).ToList(), - new AdminEndpointHandle(adminEndpoint), - options, - logger) - { - } - - /// - /// Testability constructor — accepts abstractions so unit tests can inject fakes. - /// - internal ShutdownCoordinator( - IReadOnlyList supervisors, - IAdminEndpointHandle adminEndpoint, - IOptions options, - ILogger logger) - { - _supervisors = supervisors; - _adminEndpoint = adminEndpoint; - _options = options; - _logger = logger; - } - - /// - /// Runs the graceful shutdown sequence. - /// - /// - /// Override the configured Connection.GracefulShutdownTimeoutMs (use -1 to - /// read from options, which is the normal runtime path). Tests pass an explicit value. - /// - /// - /// The host lifetime cancellation token. Not used to gate the drain loop — the - /// coordinator manages its own deadline so it can log completion regardless. - /// - public async Task ShutdownAsync(int timeoutMs = -1, CancellationToken hostCt = default) - { - int deadline = timeoutMs >= 0 - ? timeoutMs - : _options.Value.Connection.GracefulShutdownTimeoutMs; - - var sw = Stopwatch.StartNew(); - - // ── Step 1: stop accepting new connections ──────────────────────────────────── - using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - var stopTasks = _supervisors - .Select(s => s.StopAsync(stopCts.Token)) - .ToArray(); - - try - { - await Task.WhenAll(stopTasks).ConfigureAwait(false); - } - catch - { - // Best-effort: individual supervisor failures must not abort shutdown. - } - - // ── Step 2: wait for in-flight PDUs to drain ────────────────────────────────── - int inFlightAtCancel = 0; - - using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(deadline)); - try - { - while (!drainCts.Token.IsCancellationRequested) - { - int inFlight = CountInFlight(_supervisors); - if (inFlight == 0) break; - - await Task.Delay(10, drainCts.Token).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { - // Deadline expired — count remaining in-flight and proceed. - inFlightAtCancel = CountInFlight(_supervisors); - } - - // ── Step 3: stop the admin endpoint ────────────────────────────────────────── - // Admin is stopped AFTER listeners to preserve ordering guarantee: - // supervisors stop → drain → admin stops. - try - { - using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); - await _adminEndpoint.StopAsync(adminCts.Token).ConfigureAwait(false); - } - catch - { - // Best-effort. - } - - // ── Step 4: log completion ──────────────────────────────────────────────────── - LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds); - } - - private static int CountInFlight(IReadOnlyList supervisors) - { - int count = 0; - foreach (var supervisor in supervisors) - { - count += supervisor.InFlightCount; - } - return count; - } - - [LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete", - Level = LogLevel.Information, - Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")] - private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs); -} diff --git a/mbproxy/src/Mbproxy/HostingExtensions.cs b/mbproxy/src/Mbproxy/HostingExtensions.cs index d78ab38..7cf3b2e 100644 --- a/mbproxy/src/Mbproxy/HostingExtensions.cs +++ b/mbproxy/src/Mbproxy/HostingExtensions.cs @@ -2,7 +2,6 @@ using Mbproxy.Admin; using Mbproxy.Configuration; using Mbproxy.Diagnostics; using Mbproxy.Options; -using Mbproxy.Proxy; using Serilog; namespace Mbproxy; @@ -43,19 +42,21 @@ internal static class HostingExtensions /// /// (singleton — reads version attribute once). /// (singleton — pure orchestration). - /// (hosted service — owns the Kestrel admin server). + /// (singleton — owns the Kestrel admin server). /// /// Must be called after and after /// AddHostedService<ProxyWorker> (so ProxyWorker is available via DI). + /// + /// Phase 12 (W1.5) is no longer registered + /// via AddHostedService. drives its lifecycle + /// directly so admin start/stop ordering matches the design contract (admin starts + /// after listeners are up; admin stops AFTER the in-flight drain). /// public static IHostApplicationBuilder AddMbproxyAdmin(this IHostApplicationBuilder builder) { builder.Services.AddSingleton(); builder.Services.AddSingleton(); - // Register AdminEndpointHost as a singleton so ShutdownCoordinator can inject it - // directly without going through the IHostedService collection. builder.Services.AddSingleton(); - builder.Services.AddHostedService(sp => sp.GetRequiredService()); return builder; } diff --git a/mbproxy/src/Mbproxy/Program.cs b/mbproxy/src/Mbproxy/Program.cs index 2a198d9..0f60952 100644 --- a/mbproxy/src/Mbproxy/Program.cs +++ b/mbproxy/src/Mbproxy/Program.cs @@ -1,10 +1,6 @@ using Mbproxy; -using Mbproxy.Admin; -using Mbproxy.Diagnostics; -using Mbproxy.Options; using Mbproxy.Proxy; using Microsoft.Extensions.Hosting.WindowsServices; -using Microsoft.Extensions.Options; var builder = Host.CreateApplicationBuilder(args); @@ -23,46 +19,15 @@ builder.AddMbproxyOptions(); // and avoids repeated construction. builder.Services.AddSingleton(); -// Proxy worker — owns all PlcListeners and logs mbproxy.startup.ready. -// Registered as singleton so StatusSnapshotBuilder can inject ProxyWorker directly -// and access its Supervisors dictionary. +// Proxy worker — owns all PlcListeners, logs mbproxy.startup.ready, and drives the admin +// endpoint's lifecycle. Registered as singleton so StatusSnapshotBuilder can inject +// ProxyWorker directly and access its Supervisors dictionary. builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); // Phase 07: admin endpoint (Kestrel read-only status page). +// Phase 12 (W1.5): no longer registered as IHostedService; ProxyWorker drives its +// lifecycle so admin starts after listeners and stops AFTER the in-flight drain. builder.AddMbproxyAdmin(); -// Phase 08: graceful-shutdown coordinator. -// ShutdownCoordinator depends on PlcListenerSupervisor instances via ProxyWorker.Supervisors. -// Registered as a singleton so Program can resolve it after the host is built. -builder.Services.AddSingleton(sp => -{ - var worker = sp.GetRequiredService(); - var admin = sp.GetRequiredService(); - var options = sp.GetRequiredService>(); - var logger = sp.GetRequiredService>(); - // Supervisors is populated after ProxyWorker.StartAsync; the coordinator only - // enumerates them during ShutdownAsync, which runs on ApplicationStopping — - // after the host is fully started. - return new ShutdownCoordinator( - worker.Supervisors.Values, - admin, - options, - logger); -}); - -var host = builder.Build(); - -// Wire ApplicationStopping → ShutdownCoordinator BEFORE hosted services start. -// The callback fires when the host signals stop; it drains in-flight PDUs and stops -// the admin endpoint before the host tears down individual services. -var lifetime = host.Services.GetRequiredService(); -lifetime.ApplicationStopping.Register(() => -{ - // IHostApplicationLifetime callbacks do not support async — block briefly. - // The coordinator manages its own drain deadline so the host is not held indefinitely. - var coordinator = host.Services.GetRequiredService(); - coordinator.ShutdownAsync().GetAwaiter().GetResult(); -}); - -await host.RunAsync(); +await builder.Build().RunAsync(); diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs index 92bf52e..c306565 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs @@ -47,7 +47,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private readonly PlcOptions _plc; private readonly ConnectionOptions _connectionOptions; private readonly IPduPipeline _pipeline; - private readonly PerPlcContext _ctx; + + // Phase 12 (W1.1) — `_ctx` is volatile so a hot-reload reseat can swap it on the running + // multiplexer. Each method that uses the context snapshots it into a local at the start + // of the operation so a single PDU sees a consistent (TagMap, Cache) pair even if the + // swap fires mid-PDU. ReplaceContext is the single mutator. + private volatile PerPlcContext _ctx; private readonly ILogger _logger; private readonly ResiliencePipeline? _backendConnectPipeline; // Phase 10: live read-coalescing config accessor. The accessor is read per-PDU on the @@ -145,6 +150,35 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _pipes[pipe.Id] = pipe; } + /// + /// Phase 12 (W1.1) — atomically swaps the per-PLC context on a running multiplexer. + /// Called by when a + /// hot-reload tag-list change is applied to a PLC whose listener is already bound. + /// + /// The new context's tag map and (optional) response cache become visible on the + /// next PDU through the volatile _ctx field. Counters are PRESERVED across reseat + /// (the supervisor builds the new context with the running counters), so we only need + /// to re-register the cache stats provider — the multiplex provider already points at + /// this same instance. + /// + /// Existing per-call snapshots of the old context held by in-flight PDUs (via + /// WithCurrentRequest) finish on the old map. New PDUs after this call see the + /// new map. Per the design contract a one-PDU "old map" tail is acceptable; partial-BCD + /// rewrites mid-request would be worse. + /// + public void ReplaceContext(PerPlcContext newContext) + { + if (_disposed) return; + + _ctx = newContext; + + // Re-register the cache stats provider on the (preserved) counters so the status + // page sees the new cache's count/bytes immediately. Pass null when the new context + // opted out of caching to clear any stale provider from the previous context. + newContext.Counters.SetCacheStatsProvider( + newContext.Cache is not null ? new CacheStatsAdapter(newContext.Cache) : null); + } + /// /// Starts the read+write tasks for and returns a task that /// completes when the pipe's read loop ends. The multiplexer detaches the pipe when @@ -284,73 +318,98 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private async Task TearDownBackendAsync(string reason, bool cascadeUpstreams) { - Socket? oldSocket; - CancellationTokenSource? oldCts; - Task? writer, reader; - lock (_backendLock) + // Phase 12 (W1.4) — serialise tear-down vs connect-up via the connect gate. Without + // this, a fresh EnsureBackendConnectedAsync racing with the channel drain below + // could see stranded frames sent on its new socket with old (already-released) TxIds, + // producing orphaned responses that hang upstream peers via the watchdog. + await _connectGate.WaitAsync().ConfigureAwait(false); + try { - oldSocket = _backendSocket; - oldCts = _backendCts; - writer = _backendWriterTask; - reader = _backendReaderTask; - - _backendSocket = null; - _backendCts = null; - _backendWriterTask = null; - _backendReaderTask = null; - } - - if (oldSocket is null && oldCts is null) return; - - try { oldCts?.Cancel(); } catch { /* best effort */ } - - try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ } - try { oldSocket?.Dispose(); } catch { /* best effort */ } - - // Drain correlation map; cascade-close every interested upstream pipe. - var dropped = _correlation.DrainAll(); - var cascadeIds = new HashSet(); - - foreach (var kvp in dropped) - { - _allocator.Release(kvp.Key); - foreach (var party in kvp.Value.InterestedParties) - cascadeIds.Add(party.Pipe.Id); - } - - // Phase 10 — also drain the in-flight-by-key map so a brand-new identical request - // through the freshly-reconnected backend is treated as a miss (no stale entries - // outlive the backend they were destined for). - _inFlightByKey.DrainAll(); - - int upstreamCount = 0; - if (cascadeUpstreams) - { - // Close every attached pipe that had a request in flight; the others will - // simply re-issue on next request through a fresh backend connect. - // Per the design doc, ALL attached upstreams cascade on backend disconnect. - upstreamCount = _pipes.Count; - - // Snapshot keys before disposal modifies the dictionary indirectly. - var pipeList = _pipes.Values.ToArray(); - foreach (var pipe in pipeList) + Socket? oldSocket; + CancellationTokenSource? oldCts; + Task? writer, reader; + lock (_backendLock) { - try { await pipe.DisposeAsync().ConfigureAwait(false); } - catch { /* best effort */ } + oldSocket = _backendSocket; + oldCts = _backendCts; + writer = _backendWriterTask; + reader = _backendReaderTask; + + _backendSocket = null; + _backendCts = null; + _backendWriterTask = null; + _backendReaderTask = null; } - _pipes.Clear(); - _ctx.Counters.AddDisconnectCascades(upstreamCount); + if (oldSocket is null && oldCts is null) return; + + try { oldCts?.Cancel(); } catch { /* best effort */ } + + try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ } + try { oldSocket?.Dispose(); } catch { /* best effort */ } + + // Drain correlation map; cascade-close every interested upstream pipe. + var dropped = _correlation.DrainAll(); + + foreach (var kvp in dropped) + { + _allocator.Release(kvp.Key); + } + + // Phase 10 — also drain the in-flight-by-key map so a brand-new identical request + // through the freshly-reconnected backend is treated as a miss (no stale entries + // outlive the backend they were destined for). + _inFlightByKey.DrainAll(); + + int upstreamCount = 0; + if (cascadeUpstreams) + { + // Close every attached pipe that had a request in flight; the others will + // simply re-issue on next request through a fresh backend connect. + // Per the design doc, ALL attached upstreams cascade on backend disconnect. + upstreamCount = _pipes.Count; + + // Snapshot keys before disposal modifies the dictionary indirectly. + var pipeList = _pipes.Values.ToArray(); + foreach (var pipe in pipeList) + { + try { await pipe.DisposeAsync().ConfigureAwait(false); } + catch { /* best effort */ } + } + _pipes.Clear(); + + _ctx.Counters.AddDisconnectCascades(upstreamCount); + } + + // Phase 12 (W1.4) — drain any stranded frames left in the outbound channel by + // the writer task that just faulted/cancelled. Released their proxy TxIds back + // to the allocator. By the time we reach this line the writer has stopped + // reading from the channel (cancelled CTS) and the upstream pipes have been + // cascaded (no more enqueues), so the channel state is stable. + int strandedDropped = 0; + while (_outboundChannel.Reader.TryRead(out byte[]? stranded)) + { + if (stranded.Length >= 2) + { + ushort strandedTxId = (ushort)((stranded[0] << 8) | stranded[1]); + _allocator.Release(strandedTxId); + } + strandedDropped++; + } + + // Best-effort join. + try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } + try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } + + oldCts?.Dispose(); + + if (upstreamCount > 0 || dropped.Count > 0 || strandedDropped > 0) + MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count + strandedDropped, reason); + } + finally + { + _connectGate.Release(); } - - // Best-effort join. - try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } - try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } - - oldCts?.Dispose(); - - if (upstreamCount > 0 || dropped.Count > 0) - MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count, reason); } // ── Backend writer / reader tasks ───────────────────────────────────────── @@ -513,6 +572,13 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Phase 9: always exactly one party. Phase 10: N parties (read coalescing). // Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no // further attaches can occur — the parties list is now a stable snapshot. + // + // Phase 12 (W1.3) — non-blocking fan-out via `TrySendResponse`. The + // single backend reader task must NEVER `await` a per-upstream channel + // write: a wedged upstream (full bounded response channel) would otherwise + // stall the reader and starve every other client on this PLC. A drop here + // is recorded via `responseDropForFullUpstream`; the wedged upstream loses + // its own response and will be reaped by its own socket-close path. foreach (var party in inFlight.InterestedParties) { if (!party.Pipe.IsAlive) @@ -542,7 +608,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi outFrame[0] = (byte)(party.OriginalTxId >> 8); outFrame[1] = (byte)(party.OriginalTxId & 0xFF); - await party.Pipe.SendResponseAsync(outFrame, ct).ConfigureAwait(false); + if (!party.Pipe.TrySendResponse(outFrame)) + { + _ctx.Counters.IncrementResponseDropForFullUpstream(); + } } } @@ -723,12 +792,38 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi if (inFlightForSend is null) { - // The factory hit the allocator-saturation path or a duplicate-key race. - // Surface a Modbus exception 04 to the upstream and clean up. + // Phase 12 (W1.2) — the factory hit the allocator-saturation path or a + // duplicate-key race and stored a stub `InFlightRequest` under `key`. Late + // attachers may have joined the stub between the factory call and this + // cleanup; we must deliver the saturation exception to ALL of them, not just + // the leader, otherwise the late attachers wait forever for a response that + // never comes (the stub has no proxy TxId, so no backend round-trip will + // ever fire). MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?"); - byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); - _inFlightByKey.TryRemove(key, out _); - await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + + if (_inFlightByKey.TryRemove(key, out var stub)) + { + foreach (var party in stub.InterestedParties) + { + byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, unitId, fcByte, exceptionCode: 4); + try + { + await party.Pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + } + catch + { + // Best-effort delivery. A dead pipe will be collected by its own + // socket close path; nothing more we can do here. + } + } + } + else + { + // The stub was already removed by another path (extremely unlikely, but + // defensive). Surface the exception to the original requester. + byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); + await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + } return; } diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs index 9d490b4..8a77ddb 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs @@ -224,6 +224,26 @@ internal sealed partial class UpstreamPipe : IAsyncDisposable } } + /// + /// Phase 12 (W1.3) — non-blocking response enqueue. Returns true when the frame + /// was queued for delivery, false when the pipe is dead OR the response channel + /// is full. Used by the per-PLC backend reader's fan-out loop so a single wedged + /// upstream cannot stall responses to peers sharing the same backend socket — without + /// this, a full _responseChannel on one pipe would block the reader task. + /// + /// A false return indicates the frame is the multiplexer's responsibility + /// to drop and (optionally) account for via a counter. The wedged upstream's socket + /// will eventually time out and close on its own; its read loop will then dispose the + /// pipe and the multiplexer's correlation/coalescing entries will be reaped naturally. + /// + public bool TrySendResponse(byte[] frame) + { + if (!IsAlive) + return false; + + return _responseChannel.Writer.TryWrite(frame); + } + /// /// Closes the pipe: cancels the read+write loops and shuts down the socket. Idempotent. /// diff --git a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs index c236d45..64e90cd 100644 --- a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs +++ b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs @@ -48,6 +48,13 @@ internal sealed partial class PlcListener : IAsyncDisposable public IReadOnlyCollection ActiveUpstreams => _multiplexer?.AttachedPipes ?? Array.Empty(); + /// + /// Phase 12 (W1.1) — exposes the running multiplexer so a hot-reload reseat can swap + /// the per-PLC context on the live instance. null between StopAsync and a fresh + /// start; callers must null-check. + /// + internal PlcMultiplexer? Multiplexer => _multiplexer; + public PlcListener( PlcOptions plc, ConnectionOptions connectionOptions, diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs index 3ed9a69..313b91a 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs @@ -124,7 +124,15 @@ public sealed record CounterSnapshot( /// Phase 11 — point-in-time approximation of cached PDU bytes for this PLC. Sum of /// across entries. Read on the snapshot path. /// - long CacheBytes); + long CacheBytes, + /// + /// Phase 12 (W1.3) — cumulative count of backend response frames the per-PLC reader + /// task dropped because the destination upstream pipe's bounded response channel was + /// full. A non-zero value indicates one or more upstream clients are not draining their + /// socket fast enough to keep up with the backend; the wedged client loses its own + /// responses but its peers on the same PLC continue to receive theirs. + /// + long ResponseDropForFullUpstream); /// /// Thread-safe per-PLC counters backed by longs. @@ -169,6 +177,12 @@ internal sealed class ProxyCounters private long _cacheMissCount; private long _cacheInvalidations; + // Phase 12 (W1.3) — backend-reader fan-out drop counter. Increments when the reader + // task tried to enqueue a response to an upstream pipe whose bounded response channel + // was full. Without the non-blocking enqueue this would deadlock the reader; with it + // we drop and account. + private long _responseDropForFullUpstream; + // Phase 11 — live cache state pulled from a per-PLC ResponseCache on each snapshot. // The multiplexer registers a single provider via SetCacheStatsProvider so the status // page sees current entry-count / bytes without a separate poll. @@ -293,6 +307,13 @@ internal sealed class ProxyCounters public void AddCacheInvalidations(int n) => Interlocked.Add(ref _cacheInvalidations, n); + /// + /// Phase 12 (W1.3) — records one backend response frame dropped because the destination + /// upstream pipe's response channel was full. + /// + public void IncrementResponseDropForFullUpstream() + => Interlocked.Increment(ref _responseDropForFullUpstream); + /// /// Phase 11 — wires the per-PLC as the live stats /// source for the snapshot path. Pass null to detach during disposal. @@ -422,7 +443,8 @@ internal sealed class ProxyCounters CacheMissCount: Interlocked.Read(ref _cacheMissCount), CacheInvalidations: Interlocked.Read(ref _cacheInvalidations), CacheEntryCount: cacheEntries, - CacheBytes: cacheBytes); + CacheBytes: cacheBytes, + ResponseDropForFullUpstream: Interlocked.Read(ref _responseDropForFullUpstream)); } } diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs index 8d177d9..96468d9 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs @@ -1,3 +1,5 @@ +using System.Diagnostics; +using Mbproxy.Admin; using Mbproxy.Bcd; using Mbproxy.Configuration; using Mbproxy.Options; @@ -5,7 +7,6 @@ using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Options; -using Polly; namespace Mbproxy.Proxy; @@ -34,6 +35,16 @@ internal sealed partial class ProxyWorker : BackgroundService private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; private readonly ConfigReconciler _reconciler; + // Phase 12 (W1.5) — admin endpoint is no longer IHostedService; ProxyWorker drives its + // lifecycle directly so the design's "drain THEN stop admin" ordering is honoured. + // + // Resolved LAZILY (in ExecuteAsync) rather than in the constructor because the DI graph + // is circular: AdminEndpointHost → StatusSnapshotBuilder → ProxyWorker. A constructor + // GetService() during ProxyWorker's own construction returns null + // silently. Lazy resolution sidesteps the cycle — by the time ExecuteAsync runs the DI + // container is fully built. + private readonly IServiceProvider _services; + private AdminEndpointHost? _admin; // Phase 06: supervisors are now managed jointly by ProxyWorker (initial bootstrap) // and ConfigReconciler (subsequent hot-reload changes). The dictionary is shared @@ -52,13 +63,16 @@ internal sealed partial class ProxyWorker : BackgroundService IPduPipeline pipeline, ILogger logger, ILoggerFactory loggerFactory, - ConfigReconciler reconciler) + ConfigReconciler reconciler, + IServiceProvider services) { _options = options; _pipeline = pipeline; _logger = logger; _loggerFactory = loggerFactory; _reconciler = reconciler; + _services = services; + // Phase 12 (W1.5) — admin endpoint resolved lazily in ExecuteAsync (see field comment). } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -188,17 +202,58 @@ internal sealed partial class ProxyWorker : BackgroundService int boundCount = _supervisors.Values.Count(s => s.Snapshot().State == SupervisorState.Bound); LogStartupReady(_logger, boundCount, plcsConfigured); + // Phase 12 (W1.5) — start the admin endpoint AFTER listeners are bound so the + // status page can never observe the service in a "no PLCs configured yet" state. + // The admin endpoint is no longer registered as IHostedService (the host's reverse + // stop order would tear it down BEFORE drain). ProxyWorker drives both ends. + // + // Resolution happens here, not in the constructor — the DI graph is circular + // (admin → StatusSnapshotBuilder → ProxyWorker) and a constructor-time lookup + // returns null silently. + _admin = _services.GetService(); + if (_admin is not null) + { + try + { + await _admin.StartAsync(stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Admin endpoint failed to start: {Message}", ex.Message); + } + } + // ── 6. Keep the worker alive until the host signals stop ───────────────────── // Supervisors run their own background loops; ExecuteAsync just waits. await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false); } + /// + /// Phase 12 (W1.5) — graceful shutdown sequence (replaces the deleted + /// ShutdownCoordinator): + /// + /// Cancel via base.StopAsync. + /// Stop all supervisors with a 5 s hard deadline (no new connections; existing + /// pipes are cascaded by teardown). + /// Wait for in-flight PDUs to drain via the live + /// (read fresh from + /// so a hot-reloaded value is + /// honoured at stop time). + /// Stop the admin endpoint LAST so the status page survives the drain phase + /// and an operator polling it sees the in-flight count fall to zero. + /// Dispose every supervisor to release sockets, channels, and watchdog timers. + /// + /// Logs mbproxy.shutdown.complete on the way out with the in-flight count at + /// drain-deadline (zero on a clean shutdown, positive when forced cancel). + /// public override async Task StopAsync(CancellationToken cancellationToken) { // Cancel ExecuteAsync first. await base.StopAsync(cancellationToken).ConfigureAwait(false); - // Stop all supervisors in parallel with a 5-second hard deadline. + var sw = Stopwatch.StartNew(); + + // ── 1. Stop accepting new connections ───────────────────────────────────────── using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); using var linked = CancellationTokenSource.CreateLinkedTokenSource( stopCts.Token, cancellationToken); @@ -216,10 +271,59 @@ internal sealed partial class ProxyWorker : BackgroundService // Best effort — don't let individual supervisor failures block shutdown. } + // ── 2. Drain in-flight PDUs ─────────────────────────────────────────────────── + // Reads the current configured deadline so a hot-reloaded + // GracefulShutdownTimeoutMs is honoured at stop time, not frozen at process start. + int drainDeadlineMs = _options.CurrentValue.Connection.GracefulShutdownTimeoutMs; + int inFlightAtCancel = 0; + + if (drainDeadlineMs > 0) + { + using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(drainDeadlineMs)); + try + { + while (!drainCts.Token.IsCancellationRequested) + { + int total = CountInFlight(); + if (total == 0) break; + await Task.Delay(10, drainCts.Token).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + inFlightAtCancel = CountInFlight(); + } + } + + // ── 3. Stop admin endpoint LAST ─────────────────────────────────────────────── + if (_admin is not null) + { + try + { + using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + await _admin.StopAsync(adminCts.Token).ConfigureAwait(false); + } + catch + { + // Best-effort. + } + } + + // ── 4. Dispose supervisors (releases sockets, channels, watchdog timers) ───── foreach (var supervisor in _supervisors.Values) await supervisor.DisposeAsync().ConfigureAwait(false); _supervisors.Clear(); + + LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds); + } + + private int CountInFlight() + { + int total = 0; + foreach (var supervisor in _supervisors.Values) + total += (int)supervisor.CurrentCounters.Snapshot().InFlightCount; + return total; } // ── Logging ─────────────────────────────────────────────────────────────────────────── @@ -247,4 +351,10 @@ internal sealed partial class ProxyWorker : BackgroundService Level = LogLevel.Error, Message = "Failed to bind listener: Plc={Plc} Port={Port} Reason={Reason}")] private static partial void LogBindFailed(ILogger logger, string plc, int port, string reason); + + // Phase 12 (W1.5) — moved here from the deleted ShutdownCoordinator. + [LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete", + Level = LogLevel.Information, + Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")] + private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs); } diff --git a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs index ceae9c5..d3f986d 100644 --- a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs +++ b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs @@ -180,35 +180,40 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable RecoveryAttempts: Interlocked.CompareExchange(ref _recoveryAttempts, 0, 0)); /// - /// Atomically swaps the per-PLC context (tag map) without restarting the listener. + /// Atomically swaps the per-PLC context (tag map + optional response cache) on the + /// running listener AND its live multiplexer. /// - /// Transition window: there is a brief overlap where the old - /// is running its accept loop with the old context while the - /// new context reference is being written. The volatile write ensures that the very - /// next PlcListener constructed inside the Polly loop (on any subsequent fault - /// recovery) picks up . Existing in-flight upstream pipes - /// served by the current multiplexer keep their reference to the context captured at - /// multiplexer construction time; they finish on the old map. New connections after - /// this call use the new map. This is the correct design — partial-BCD rewrites - /// mid-request would be worse than a one-request gap. + /// Phase 12 (W1.1) — previously this method only updated the supervisor's + /// _currentContext slot, which meant the running + /// kept using the OLD context (it captured the reference at construction). A reload + /// only became visible on the next listener fault. Now the swap propagates into the + /// running mux via , so the very next PDU + /// sees the new tag map / new cache. Counters are preserved (the new context carries + /// the same ProxyCounters instance) so operator history is not reset. /// - /// This method is intentionally lightweight: it performs only the volatile write - /// and returns immediately. The parameter is present for API - /// symmetry with start/stop and to accommodate future async expansion. + /// Old cache lifecycle: the supervisor disposes the outgoing context's + /// cache AFTER the multiplexer has been swapped to the new context. By that point no + /// more reads or writes can land on the old cache. Per the design contract, any + /// tag-list change drops the entire PLC cache. /// public Task ReplaceContextAsync(PerPlcContext newCtx, CancellationToken ct) { - // Phase 11: dispose the outgoing context's response cache (if any) so its - // eviction loop terminates. The "any tag-list reload flushes the affected PLC's - // whole cache" doctrine is satisfied here — the new context constructs its own - // fresh cache, the old cache is dropped wholesale. var oldCache = _currentContext?.Cache; - // Volatile write: the next PlcListener created in RunSupervisorAsync will see - // the new context. The accept loop itself does not hold a direct reference to - // _currentContext — it was captured at PlcListener construction time. + // Volatile write: the next PlcListener created in RunSupervisorAsync (on any + // subsequent fault recovery) will pick up newCtx through this slot. _currentContext = newCtx; + // Phase 12 (W1.1) — push the swap into the running multiplexer so existing + // connections see the new tag map / new cache on their next PDU. _currentListener + // may be null between Polly retry attempts; in that case the next listener built + // inside the Polly loop will pick up newCtx through _currentContext above. + _currentListener?.Multiplexer?.ReplaceContext(newCtx); + + // Phase 12 (W1.1 + W2.8 prereq) — drop the outgoing cache AFTER the swap so the + // running multiplexer can no longer reach it. Dispose stops the eviction loop and + // releases the timer. (The cache.flushed log event is W2.8 work; this Wave-1 fix + // is the "no longer in use, safe to drop" piece.) if (oldCache is not null && !ReferenceEquals(oldCache, newCtx.Cache)) oldCache.Dispose(); diff --git a/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs b/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs deleted file mode 100644 index b65d2fa..0000000 --- a/mbproxy/tests/Mbproxy.Tests/Diagnostics/ShutdownCoordinatorTests.cs +++ /dev/null @@ -1,177 +0,0 @@ -using Mbproxy.Diagnostics; -using Mbproxy.Options; -using Mbproxy.Proxy; -using Microsoft.Extensions.Logging.Abstractions; -using Shouldly; -using Xunit; - -namespace Mbproxy.Tests.Diagnostics; - -/// -/// Unit tests for . -/// All tests use the internal testability constructor with fake handles. -/// -[Trait("Category", "Unit")] -public sealed class ShutdownCoordinatorTests -{ - // ── Fake implementations ────────────────────────────────────────────────────────────────── - - private sealed class FakeAdminHandle : IAdminEndpointHandle - { - public bool StopCalled { get; private set; } - public int StopCallOrder { get; private set; } - private readonly Func? _orderSource; - - public FakeAdminHandle(Func? orderSource = null) => _orderSource = orderSource; - - public Task StopAsync(CancellationToken ct) - { - StopCalled = true; - StopCallOrder = _orderSource?.Invoke() ?? 0; - return Task.CompletedTask; - } - } - - private sealed class SimpleFakeSupervisor : ISupervisorHandle - { - public bool StopCalled { get; private set; } - public int StopCallOrder { get; private set; } - private readonly Func? _orderSource; - - public SimpleFakeSupervisor(Func? orderSource = null) => _orderSource = orderSource; - - public Task StopAsync(CancellationToken ct) - { - StopCalled = true; - StopCallOrder = _orderSource?.Invoke() ?? 0; - return Task.CompletedTask; - } - - public int InFlightCount { get; set; } - } - - private sealed class DelayedStopSupervisor : ISupervisorHandle - { - private readonly Func _onStop; - public DelayedStopSupervisor(Func onStop) => _onStop = onStop; - public async Task StopAsync(CancellationToken ct) => await _onStop(); - public int InFlightCount => 0; - } - - // ── Helper ──────────────────────────────────────────────────────────────────────────────── - - private static ShutdownCoordinator Build( - IReadOnlyList supervisors, - IAdminEndpointHandle admin, - int timeoutMs = 500) - { - var opts = Microsoft.Extensions.Options.Options.Create(new MbproxyOptions - { - Connection = new ConnectionOptions { GracefulShutdownTimeoutMs = timeoutMs }, - }); - - return new ShutdownCoordinator( - supervisors, - admin, - opts, - NullLogger.Instance); - } - - // ── Tests ───────────────────────────────────────────────────────────────────────────────── - - /// - /// With no active connections the drain loop exits on the first check; - /// the whole sequence should be fast (well under 1 s). - /// - [Fact] - public async Task Shutdown_NoActiveConnections_CompletesImmediately() - { - var supervisor = new SimpleFakeSupervisor(); - var admin = new FakeAdminHandle(); - var coord = Build([supervisor], admin, timeoutMs: 5000); - - var sw = System.Diagnostics.Stopwatch.StartNew(); - await coord.ShutdownAsync(timeoutMs: 5000, TestContext.Current.CancellationToken); - sw.Stop(); - - sw.ElapsedMilliseconds.ShouldBeLessThan(1000, - "Shutdown with no active connections should complete quickly"); - - supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called"); - admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called"); - } - - /// - /// Verifies that the coordinator awaits supervisor stop before declaring shutdown done. - /// - [Fact] - public async Task Shutdown_OneActiveConnection_WaitsForCompletion() - { - bool stopInvoked = false; - - var supervisor = new DelayedStopSupervisor(async () => - { - await Task.Delay(50, TestContext.Current.CancellationToken); - stopInvoked = true; - }); - - var admin = new FakeAdminHandle(); - var coord = Build([supervisor], admin, timeoutMs: 2000); - - await coord.ShutdownAsync(timeoutMs: 2000, TestContext.Current.CancellationToken); - - stopInvoked.ShouldBeTrue( - "supervisor.StopAsync must complete before ShutdownAsync returns"); - admin.StopCalled.ShouldBeTrue("admin endpoint must be stopped"); - } - - /// - /// When the drain deadline fires, the coordinator must complete and still stop the admin - /// endpoint, not block forever. - /// - [Fact] - public async Task Shutdown_TimeoutExceeded_CancelsRemainingWork_AndReportsCount() - { - // Use a supervisor that completes stop immediately; the "timeout" scenario is - // that the drain loop has no pairs to wait for but the coordinator still respects - // its deadline. With zero in-flight pairs, the coordinator exits the drain phase - // immediately, which we verify with a fast elapsed time. - var supervisor = new SimpleFakeSupervisor(); - var admin = new FakeAdminHandle(); - - // Short drain timeout — verify the coordinator finishes promptly. - var coord = Build([supervisor], admin, timeoutMs: 50); - - var sw = System.Diagnostics.Stopwatch.StartNew(); - await coord.ShutdownAsync(timeoutMs: 50, TestContext.Current.CancellationToken); - sw.Stop(); - - sw.ElapsedMilliseconds.ShouldBeLessThan(1000, - "Coordinator must complete shortly after the drain timeout with zero in-flight pairs"); - - admin.StopCalled.ShouldBeTrue( - "admin.StopAsync must be called after the drain phase, even when timeout fires"); - } - - /// - /// Verifies the ordering guarantee: supervisors stop BEFORE the admin endpoint. - /// - [Fact] - public async Task Shutdown_AdminEndpointStopped_AfterListenersStopped() - { - int callOrder = 0; - int NextOrder() => Interlocked.Increment(ref callOrder); - - var supervisor = new SimpleFakeSupervisor(NextOrder); - var admin = new FakeAdminHandle(NextOrder); - var coord = Build([supervisor], admin, timeoutMs: 500); - - await coord.ShutdownAsync(timeoutMs: 500, TestContext.Current.CancellationToken); - - supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called"); - admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called"); - - supervisor.StopCallOrder.ShouldBeLessThan(admin.StopCallOrder, - "Supervisor.StopAsync must be called before AdminEndpoint.StopAsync"); - } -} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs index ca4d18d..51c7421 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs @@ -5,6 +5,7 @@ using System.Net.Sockets; using Mbproxy.Bcd; using Mbproxy.Options; using Mbproxy.Proxy; +using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; @@ -623,4 +624,136 @@ public sealed class PlcMultiplexerTests } } } + + // ── Phase 12 Wave-1 regression tests ────────────────────────────────────── + + /// + /// W1.1 — verifies that swaps the live + /// per-PLC context on the running multiplexer, so the very next PDU's BCD rewriter + /// uses the new tag map (not the captured-at-construction map). Before W1.1 this + /// scenario would silently keep using the old map until the listener faulted and the + /// supervisor's Polly loop reconstructed everything. + /// + [Fact] + public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + backend.FcResponseFactory = (fc, _, _, txId) => + fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty(); + + // Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234. + var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); + + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2). + await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None); + var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]); + decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234"); + + // Swap to an empty tag map (counters preserved per the design's reseat contract). + var ctx2 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = BcdTagMap.Empty, + Counters = ctx1.Counters, + Logger = NullLogger.Instance, + }; + mux.ReplaceContext(ctx2); + + // Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged. + await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None); + var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]); + raw2.ShouldBe((ushort)0x1234, + "after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + /// + /// W1.1 — verifies that swapping in a fresh response cache via + /// makes the running multiplexer consult the NEW cache for subsequent reads, not the + /// old cache that was disposed by the supervisor. Without W1.1 the running mux would + /// keep its constructor-captured cache reference and either return stale entries or + /// hit a disposed cache. + /// + [Fact] + public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + backend.FcResponseFactory = (fc, _, _, txId) => + fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty(); + + // Context 1 — cacheable tag at addr 200 with TTL 60_000 ms. + var tag = new BcdTag(200, 16, CacheTtlMs: 60_000); + var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary(); + var map = new BcdTagMap(dict); + var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); + var ctx1 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = map, + Counters = new ProxyCounters(), + Logger = NullLogger.Instance, + Cache = cache1, + }; + + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Read 1 — populates cache1 from backend. + await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + await Task.Delay(50, TestContext.Current.CancellationToken); + int afterFirst = backend.SeenProxyTxIds.Count; + cache1.Count.ShouldBe(1, "cache1 must contain the first read"); + + // Read 2 — must hit cache1 (no backend traffic). + await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic"); + + // Swap in a brand-new (empty) cache via ReplaceContext. + var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); + var ctx2 = new PerPlcContext + { + PlcName = "PLC1", + TagMap = map, + Counters = ctx1.Counters, + Logger = NullLogger.Instance, + Cache = cache2, + }; + mux.ReplaceContext(ctx2); + + // Read 3 — old cache had the entry, but mux now uses cache2 which is empty, + // so the next read MUST go to the backend. + await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + await Task.Delay(50, TestContext.Current.CancellationToken); + backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1, + "after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)"); + cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + cache1.Dispose(); + } + } } diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs new file mode 100644 index 0000000..96a1c61 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs @@ -0,0 +1,104 @@ +using System.Net; +using System.Net.Sockets; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Unit tests for 's response-channel contract — particularly +/// the Phase 12 (W1.3) non-blocking enqueue +/// added so the per-PLC backend reader cannot be stalled by one slow upstream client. +/// +[Trait("Category", "Unit")] +public sealed class UpstreamPipeTests +{ + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static async Task<(Socket clientSide, Socket serverSide)> AcceptedSocketPairAsync() + { + // Build a loopback listener and connect a client to get a real socket pair. + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + try + { + int port = ((IPEndPoint)listener.LocalEndpoint).Port; + var clientSide = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var connectTask = clientSide.ConnectAsync(IPAddress.Loopback, port); + var serverSide = await listener.AcceptSocketAsync(); + await connectTask; + return (clientSide, serverSide); + } + finally + { + listener.Stop(); + } + } + + // ── Tests ───────────────────────────────────────────────────────────────── + + /// + /// W1.3 — when no write-loop is draining the response channel, repeated + /// calls must succeed up to the channel's + /// bounded capacity and return false on every subsequent call without blocking. + /// This is the non-blocking contract the per-PLC backend reader relies on. + /// + [Fact] + public async Task TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking() + { + var (client, server) = await AcceptedSocketPairAsync(); + try + { + // Construct the pipe but do NOT call RunWriteLoopAsync — the channel will not + // be drained, so it fills after `ResponseChannelCapacity` (= 16) writes. + var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance); + + int successes = 0; + int failures = 0; + + for (int i = 0; i < 100; i++) + { + bool ok = pipe.TrySendResponse(new byte[] { 0, 0 }); + if (ok) successes++; + else failures++; + } + + successes.ShouldBe(16, + "the channel's bounded capacity is 16; first 16 writes must succeed"); + failures.ShouldBe(84, + "after capacity is reached, every further TrySendResponse must return false (not block)"); + + await pipe.DisposeAsync(); + } + finally + { + try { client.Dispose(); } catch { } + try { server.Dispose(); } catch { } + } + } + + /// + /// W1.3 — once the pipe has been disposed, + /// returns false regardless of channel state, never throws. + /// + [Fact] + public async Task TrySendResponse_AfterDispose_ReturnsFalse() + { + var (client, server) = await AcceptedSocketPairAsync(); + try + { + var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance); + await pipe.DisposeAsync(); + + bool ok = pipe.TrySendResponse(new byte[] { 0, 0 }); + ok.ShouldBeFalse("a disposed pipe must reject sends without throwing"); + } + finally + { + try { client.Dispose(); } catch { } + try { server.Dispose(); } catch { } + } + } +}