@@ -48,14 +48,14 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
private readonly ConnectionOptions _connectionOptions ;
private readonly IPduPipeline _pipeline ;
// Phase 12 (W1.1) — `_ctx` is volatile so a hot-reload reseat can swap it on the running
// `_ctx` is volatile so a hot-reload reseat can swap it on the running
// multiplexer. Each method that uses the context snapshots it into a local at the start
// of the operation so a single PDU sees a consistent (TagMap, Cache) pair even if the
// swap fires mid-PDU. ReplaceContext is the single mutator.
private volatile PerPlcContext _ctx ;
private readonly ILogger < PlcMultiplexer > _logger ;
private readonly ResiliencePipeline ? _backendConnectPipeline ;
// Phase 10: l ive read-coalescing config accessor. The accessor is read per-PDU on the
// L ive read-coalescing config accessor. The accessor is read per-PDU on the
// request path so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled`
// propagates immediately. Production wires this to
// `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a
@@ -74,8 +74,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
SingleWriter = false ,
} ) ;
// Attached pipes — Phase 9 needs the list for the status page; Phase 10 will need it for
// coalescing (fan-out). ConcurrentDictionary keyed on UpstreamPipe.Id for O(1) detach.
// Attached pipes — used by the status page and by coalescing fan-out.
// ConcurrentDictionary keyed on UpstreamPipe.Id for O(1) detach.
private readonly ConcurrentDictionary < Guid , UpstreamPipe > _pipes = new ( ) ;
// Lifecycle plumbing. Backend tasks share a CTS; cascading disconnect cancels it,
@@ -88,10 +88,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
private Task ? _backendReaderTask ;
private readonly CancellationTokenSource _disposeCts = new ( ) ;
// Phase 12 (W2.2) — v olatile so the disposing thread's write is observed by every
// hot-path reader (OnUpstreamFrameAsync, ReplaceContext, Attach, etc.) without a
// separate fence. On x86/x64 plain reads happen to give acquire-release semantics, so
// this is defense for ARM hosts and future portability.
// V olatile so the disposing thread's write is observed by every hot-path reader
// (OnUpstreamFrameAsync, ReplaceContext, Attach, etc.) without a separate fence.
// On x86/x64 plain reads happen to give acquire-release semantics, so this is
// defense for ARM hosts and future portability.
private volatile bool _disposed ;
private Task ? _watchdogTask ;
@@ -112,8 +112,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_backendConnectPipeline = backendConnectPipeline ;
_coalescingOptions = coalescingOptions ? ? ( static ( ) = > new ReadCoalescingOptions ( ) ) ;
// Phase 11 — r egister the per-PLC cache as the live stats source for the snapshot
// path. Cache may be null when the per-PLC context has not been wired with one
// R egister the per-PLC cache as the live stats source for the snapshot path.
// Cache may be null when the per-PLC context has not been wired with one
// (every tag uncached, or unit tests).
if ( _ctx . Cache is not null )
_ctx . Counters . SetCacheStatsProvider ( new CacheStatsAdapter ( _ctx . Cache ) ) ;
@@ -155,8 +155,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
/// <summary>
/// Phase 12 (W1.1) — a tomically swaps the per-PLC context on a running multiplexer.
/// Called by <see cref="Supervision.PlcListenerSupervisor.ReplaceContextAsync"/> when a
/// A tomically swaps the per-PLC context on a running multiplexer. Called by
/// <see cref="Supervision.PlcListenerSupervisor.ReplaceContextAsync"/> when a
/// hot-reload tag-list change is applied to a PLC whose listener is already bound.
///
/// <para>The new context's tag map and (optional) response cache become visible on the
@@ -174,10 +174,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
{
if ( _disposed ) return ;
// Phase 12 (W4 / NM2) — p rovider FIRST, then _ctx. The status page's snapshot
// path reads `_cacheStatsProvider` independently of `_ctx`. If we swapped `_ctx`
// first, a snapshot taken in the gap between the two writes would still hold the
// OLD adapter wrapping the OLD cache — which the supervisor is about to dispose
// Provider FIRST, then _ctx. The status page's snapshot path reads
// `_cacheStatsProvider` independently of `_ctx`. If we swapped `_ctx` first, a
// snapshot taken in the gap between the two writes would still hold the OLD
// adapter wrapping the OLD cache — which the supervisor is about to dispose
// (`PlcListenerSupervisor.ReplaceContextAsync` runs `oldCache.Dispose()` after we
// return). Setting the provider first means snapshots in the swap window read
// either (old provider, old ctx) or (new provider, new ctx) — both coherent —
@@ -254,9 +254,9 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
_pipes . Clear ( ) ;
// Phase 12 (W2.5, W2.6) — g uard the CTS dispose against a watchdog tick that
// raced past the WaitAsync above (e.g. a slow Task.Delay completion observing
// cancellation late). Also dispose the connect-gate semaphore.
// G uard the CTS dispose against a watchdog tick that raced past the WaitAsync
// above (e.g. a slow Task.Delay completion observing cancellation late). Also
// dispose the connect-gate semaphore.
try { _disposeCts . Dispose ( ) ; } catch ( ObjectDisposedException ) { /* already disposed */ }
try { _connectGate . Dispose ( ) ; } catch ( ObjectDisposedException ) { /* already disposed */ }
}
@@ -336,30 +336,28 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
private async Task TearDownBackendAsync ( string reason , bool cascadeUpstreams )
{
// Phase 12 (W1.4) — s erialise tear-down vs connect-up via the connect gate. Without
// this, a fresh EnsureBackendConnectedAsync racing with the channel drain below
// could see stranded frames sent on its new socket with old (already-released) TxIds,
// S erialise tear-down vs connect-up via the connect gate. Without this, a fresh
// EnsureBackendConnectedAsync racing with the channel drain below could see
// stranded frames sent on its new socket with old (already-released) TxIds,
// producing orphaned responses that hang upstream peers via the watchdog.
//
// Phase 12 (W4 / NM1) — bound the wait. Without a timeout, a long Polly-wrapped
// EnsureBackendConnectedAsync against an unreachable host can hold the gate for
// the full BackendConnectTimeoutMs * MaxAttempts window, blocking DisposeAsync (and
// therefore ProxyWorker.StopAsync) for that duration. A 2 s teardown deadline
// bounds disposal latency; if the gate is unavailable we proceed best-effort
// without it (the worst-case consequence is one orphaned in-flight cycle on the
// dying backend, which the upstream watchdog will surface as exception 0x0B).
// Bounded wait: a long Polly-wrapped EnsureBackendConnectedAsync against an
// unreachable host can hold the gate for the full BackendConnectTimeoutMs *
// MaxAttempts window, blocking DisposeAsync (and therefore ProxyWorker.StopAsync)
// for that duration. A 2 s teardown deadline bounds disposal latency; if the gate
// is unavailable we proceed best-effort without it (the worst-case consequence is
// one orphaned in-flight cycle on the dying backend, which the upstream watchdog
// will surface as exception 0x0B).
//
// Phase 12 (W5 / m1) — KNOWN RACE on the gate-not-held path: a concurrent
// EnsureBackendConnectedAsync that DOES hold the gate may TryAllocate a TxId
// that collides (after wraparound in the allocator's forward scan) with a TxId
// we're about to release from the channel-drain step below. The double-release
// would mark the new request's slot as free even though it's legitimately
// in-flight, allowing the next allocation to reuse the same slot and
// CorrelationMap.TryAdd to fail (silent request drop). Probability is very low
// (requires gate timeout + new accept landing during cascade + TxId collision in
// a 65,536-slot space); the only consequence is one dropped request that the
// client retries. Documented as accepted best-effort behaviour in
// codereviews/2026-05-14/ReReviewAfterRemediation.md (m1).
// KNOWN RACE on the gate-not-held path: a concurrent EnsureBackendConnectedAsync
// that DOES hold the gate may TryAllocate a TxId that collides (after wraparound
// in the allocator's forward scan) with a TxId we're about to release from the
// channel-drain step below. The double-release would mark the new request's slot
// as free even though it's legitimately in-flight, allowing the next allocation
// to reuse the same slot and CorrelationMap.TryAdd to fail (silent request drop).
// Probability is very low (requires gate timeout + new accept landing during
// cascade + TxId collision in a 65,536-slot space); the only consequence is one
// dropped request that the client retries. Accepted as best-effort behaviour.
bool gateHeld = false ;
try
{
@@ -412,8 +410,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_allocator . Release ( kvp . Key ) ;
}
// Phase 10 — a lso drain the in-flight-by-key map so a brand-new identical request
// through the freshly-reconnected backend is treated as a miss (no stale entries
// A lso drain the in-flight-by-key map so a brand-new identical request through
// the freshly-reconnected backend is treated as a miss (no stale entries
// outlive the backend they were destined for).
_inFlightByKey . DrainAll ( ) ;
@@ -437,11 +435,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_ctx . Counters . AddDisconnectCascades ( upstreamCount ) ;
}
// Phase 12 (W1.4) — d rain any stranded frames left in the outbound channel by
// the writer task that just faulted/cancelled. Released their proxy TxIds back
// to the allocator. By the time we reach this line the writer has stopped
// reading from the channel (cancelled CTS) and the upstream pipes have been
// cascaded (no more enqueues), so the channel state is stable.
// D rain any stranded frames left in the outbound channel by the writer task
// that just faulted/cancelled. Release their proxy TxIds back to the
// allocator. By the time we reach this line the writer has stopped reading
// from the channel (cancelled CTS) and the upstream pipes have been cascaded
// (no more enqueues), so the channel state is stable.
int strandedDropped = 0 ;
while ( _outboundChannel . Reader . TryRead ( out byte [ ] ? stranded ) )
{
@@ -464,7 +462,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
finally
{
// Only release if we acquired (W4 / NM1) — best-effort path may have skipped.
// Only release if we acquired — best-effort path may have skipped.
if ( gateHeld )
{
try { _connectGate . Release ( ) ; }
@@ -499,10 +497,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
catch ( Exception ex )
{
// Backend failure — cascade. Phase 12 (W4 / NM5) — skip if disposal is
// already in progress; DisposeAsync runs an explicit TearDown and the
// fire-and-forget here would race against it, hitting a disposed
// _connectGate and producing an unobserved-task exception.
// Backend failure — cascade. Skip if disposal is already in progress;
// DisposeAsync runs an explicit TearDown and the fire-and-forget here would
// race against it, hitting a disposed _connectGate and producing an
// unobserved-task exception.
if ( ! _disposeCts . IsCancellationRequested )
_ = TearDownBackendAsync ( $"writer fault: {ex.Message}" , cascadeUpstreams : true ) ;
}
@@ -554,10 +552,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Free the allocator slot immediately so it can be reused.
_allocator . Release ( proxyTxId ) ;
// Phase 10 — f or FC03/FC04 reads, also clear the coalescing-by-key entry so
// a brand-new identical request issued AFTER this response is treated as a
// miss (opens a fresh round-trip). The TryRemove is best-effort: a watchdog
// timeout or cascade may have already removed it.
// F or FC03/FC04 reads, also clear the coalescing-by-key entry so a
// brand-new identical request issued AFTER this response is treated as a
// miss (opens a fresh round-trip). The TryRemove is best-effort: a
// watchdog timeout or cascade may have already removed it.
if ( inFlight . Fc is 0x03 or 0x04 )
{
var coalKey = new CoalescingKey ( inFlight . UnitId , inFlight . Fc ,
@@ -580,16 +578,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
frame . AsSpan ( MbapFrame . HeaderSize , pduBodyLen ) ,
responseCtx ) ;
// Phase 11 — p ost-rewriter cache update:
// Post-rewriter cache update:
// * FC03/FC04 successful responses are stored when the request was
// cache-eligible (resolvedTtlMs > 0).
// * FC06/FC16 successful responses invalidate every cached entry whose
// address range overlaps the write.
//
// Phase 12 (W2.7) — e xception bit comes from the post-rewriter buffer
// (the rewriter never touches the FC byte today, but reading from
// inFlight.Fc would lose the exception bit). The base FC for routing
// decisions uses inFlight.Fc — the request side knows what was sent.
// E xception bit comes from the post-rewriter buffer (the rewriter never
// touches the FC byte today, but reading from inFlight.Fc would lose the
// exception bit). The base FC for routing decisions uses inFlight.Fc —
// the request side knows what was sent.
if ( _ctx . Cache is { } postCache )
{
byte fcInResponse = frame [ MbapFrame . HeaderSize ] ;
@@ -623,16 +621,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
else if ( inFlight . Fc is 0x06 or 0x10 )
{
// Phase 12 (W2.9) — t he design contract "invalidations during a
// recovering listener state are skipped" (design.md:203) is
// upheld IMPLICITLY here: invalidation only fires inside the
// backend reader task when a non-exception FC06/FC16 response
// arrives. A `Recovering` listener has no backend reader (the
// multiplexer is torn down between recovery attempts), so no
// response can land here, so no invalidation. The gating is
// structural, not conditional. If a future change ever produces
// a write response off the live backend, an explicit recovering-
// state check would need to be added.
// T he design contract "invalidations during a recovering
// listener state are skipped" is upheld IMPLICITLY here:
// invalidation only fires inside the backend reader task when
// a non-exception FC06/FC16 response arrives. A `Recovering`
// listener has no backend reader (the multiplexer is torn
// down between recovery attempts), so no response can land
// here, so no invalidation. The gating is structural, not
// conditional. If a future change ever produces a write
// response off the live backend, an explicit recovering-state
// check would need to be added.
int invalidated = postCache . Invalidate (
inFlight . UnitId , inFlight . StartAddress , inFlight . Qty ) ;
if ( invalidated > 0 )
@@ -647,23 +645,23 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
// Fan out to each interested party with their original TxId restored.
// Phase 9: always exactly one party. Phase 10: N parties (read coalescing).
// Note: t he InFlightByKey TryRemove above (for FC03/FC04) guarantees no
// Without coalescing there is exactly one party; with coalescing there
// are N. T he InFlightByKey TryRemove above (for FC03/FC04) guarantees no
// further attaches can occur — the parties list is now a stable snapshot.
//
// Phase 12 (W1.3) — n on-blocking fan-out via `TrySendResponse`. The
// single backend reader task must NEVER `await` a per-upstream channel
// write: a wedged upstream (full bounded response channel) would otherwise
// stall the reader and starve every other client on this PLC. A drop here
// is recorded via `responseDropForFullUpstream`; the wedged upstream loses
// its own response and will be reaped by its own socket-close path.
// N on-blocking fan-out via `TrySendResponse`. The single backend reader
// task must NEVER `await` a per-upstream channel write: a wedged upstream
// (full bounded response channel) would otherwise stall the reader and
// starve every other client on this PLC. A drop here is recorded via
// `responseDropForFullUpstream`; the wedged upstream loses its own
// response and will be reaped by its own socket-close path.
foreach ( var party in inFlight . InterestedParties )
{
if ( ! party . Pipe . IsAlive )
{
// Phase 10 — r ecord the dead-upstream skip only for FC03/FC04 (the
// only function codes that take the coalescing path). For non-
// coalescing FCs this branch is silent — the Phase-9 behaviour .
// R ecord the dead-upstream skip only for FC03/FC04 (the only
// function codes that take the coalescing path). For
// non- coalescing FCs this branch is silent.
if ( inFlight . Fc is 0x03 or 0x04
& & inFlight . InterestedParties . Count > 1 )
{
@@ -675,10 +673,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
continue ;
}
// The frame buffer is private to this iteration; if there are multiple
// parties (Phase 10) , each gets its own copy with its own original TxId
// patched in. Phase 9 always has Count == 1, so the single- buffer path
// i s the common case; we copy to keep Phase-10 forward compatibility .
// The frame buffer is private to this iteration; if there are
// multiple coalesced parties , each gets its own copy with its own
// original TxId patched in. The single-party case reuses the buffer
// directly a s the common- case fast path .
byte [ ] outFrame = inFlight . InterestedParties . Count = = 1
? frame
: ( byte [ ] ) frame . Clone ( ) ;
@@ -692,17 +690,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
else
{
// Phase 12 (W6) — c ount outbound bytes per delivered party.
// With coalescing, one backend response fans out to N parties and
// produces N × frame.Length bytes leaving the proxy upstream-side.
// C ount outbound bytes per delivered party. With coalescing, one
// backend response fans out to N parties and produces
// N × frame.Length bytes leaving the proxy upstream-side.
_ctx . Counters . AddBytes ( up : 0 , down : outFrame . Length ) ;
}
}
}
// Reader exited cleanly — backend closed by remote. Cascade.
// Phase 12 (W4 / NM5) — skip if dispose is already in progress (see writer-side
// comment above for rationale).
// Reader exited cleanly — backend closed by remote. Cascade. Skip if
// dispose is already in progress (see writer-side comment above).
if ( ! _disposeCts . IsCancellationRequested )
_ = TearDownBackendAsync ( "backend reader EOF" , cascadeUpstreams : true ) ;
}
@@ -730,16 +727,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
out ushort originalTxId , out _ , out _ , out byte unitId ) )
return ;
// Phase 12 (W6) — c ount inbound bytes from the upstream client. Surfaces in
// bytes.upstreamIn on the status page. Counted ONCE per parsed frame regardless
// of subsequent routing (cache hit, coalesce, backend round-trip, exception).
// C ount inbound bytes from the upstream client. Surfaces in bytes.upstreamIn on
// the status page. Counted ONCE per parsed frame regardless of subsequent
// routing (cache hit, coalesce, backend round-trip, exception).
_ctx . Counters . AddBytes ( up : frame . Length , down : 0 ) ;
// Parse the PDU FC + start/qty. FC03/FC04 reads use start/qty for the coalescing key
// and (Phase 11) for the cache lookup. FC06 writes carry [addr][value]; we treat qty
// as 1 for invalidation. FC16 carries [start][qty][byteCount]...; qty is the write
// span used for cache invalidation. Phase 11: FC06/FC16 start/qty drive cache
// invalidation by overlap rather than exact key.
// Parse the PDU FC + start/qty. FC03/FC04 reads use start/qty for the coalescing
// key and for the cache lookup. FC06 writes carry [addr][value]; we treat qty as
// 1 for invalidation. FC16 carries [start][qty][byteCount]...; qty is the write
// span used for cache invalidation. FC06/FC16 start/qty drive cache invalidation
// by overlap rather than exact key.
int pduOffset = MbapFrame . HeaderSize ;
byte fcByte = frame . Length > pduOffset ? frame [ pduOffset ] : ( byte ) 0 ;
ushort startAddr = 0 ;
@@ -763,12 +760,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
qty = ( ushort ) ( ( frame [ pduOffset + 3 ] < < 8 ) | frame [ pduOffset + 4 ] ) ;
}
// Phase 11 — r esponse-cache path. Cache check happens BEFORE coalescing AND before
// we attempt to bring up the backend connection. A hit short-circuits everything,
// including the EnsureBackendConnectedAsync call — operators with all reads cached
// and the backend down still get served (the cache survives backend disconnects per
// the design contract). The cache only fires for FC03/FC04 and only when the read
// range's resolved TTL > 0.
// R esponse-cache path. Cache check happens BEFORE coalescing AND before we
// attempt to bring up the backend connection. A hit short-circuits everything,
// including the EnsureBackendConnectedAsync call — operators with all reads
// cached and the backend down still get served (the cache survives backend
// disconnects per the design contract). The cache only fires for FC03/FC04 and
// only when the read range's resolved TTL > 0.
int resolvedCacheTtlMs = 0 ;
if ( fcByte is 0x03 or 0x04 & & _ctx . Cache is { } responseCache )
{
@@ -783,7 +780,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
byte [ ] hitFrame = BuildCacheHitFrame ( originalTxId , unitId , cached . PduBytes ) ;
await pipe . SendResponseAsync ( hitFrame , ct ) . ConfigureAwait ( false ) ;
// Phase 12 (W6) — o utbound bytes for cache-hit response.
// O utbound bytes for cache-hit response.
_ctx . Counters . AddBytes ( up : 0 , down : hitFrame . Length ) ;
return ;
}
@@ -800,16 +797,15 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
// Ensure backend is connected. Failure here means we cannot service the request;
// close the upstream pipe (consistent with the 1:1 model's behaviour on connect
// failure).
// close the upstream pipe.
if ( ! await EnsureBackendConnectedAsync ( ct ) . ConfigureAwait ( false ) )
{
try { await pipe . DisposeAsync ( ) . ConfigureAwait ( false ) ; } catch { /* best effort */ }
return ;
}
// Phase 10 — r ead-coalescing path. Only FC03/FC04 are coalescable; only when the
// feature is enabled in the live config. If the late-arriving request matches an
// R ead-coalescing path. Only FC03/FC04 are coalescable; only when the feature
// is enabled in the live config. If the late-arriving request matches an
// already-in-flight peer, we attach to the existing entry and skip the backend
// round-trip entirely. The existing entry's response will fan out to both parties.
var coalescingOpts = _coalescingOptions ( ) ;
@@ -818,14 +814,14 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
var key = new CoalescingKey ( unitId , fcByte , startAddr , qty ) ;
var newParty = new InterestedParty ( pipe , originalTxId ) ;
// The factory does the Phase-9 work: allocate a proxy TxId, build the
// InFlightRequest with a mutable List<InterestedParty>, add to the correlation
// map. We deliberately do NOT enqueue to the outbound channel inside the
// factory — that's done outside the InFlightByKey lock to keep the lock
// scope tight and to avoid holding the lock across an async send.
// The factory allocates a proxy TxId, builds the InFlightRequest with a
// mutable List<InterestedParty>, and adds to the correlation map. We
// deliberately do NOT enqueue to the outbound channel inside the factory —
// that's done outside the InFlightByKey lock to keep the lock scope tight
// and to avoid holding the lock across an async send.
//
// proxyTxIdForSend / inFlightForSend communicate the factory's allocation back
// out of the lock so the post-lock code can finish the send.
// proxyTxIdForSend / inFlightForSend communicate the factory's allocation
// back out of the lock so the post-lock code can finish the send.
ushort proxyTxIdForSend = 0 ;
InFlightRequest ? inFlightForSend = null ;
@@ -898,40 +894,38 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
if ( inFlightForSend is null )
{
// Phase 12 (W1.2) — t he factory hit the allocator-saturation path or a
// duplicate-key race and stored a stub `InFlightRequest` under `key`. Late
// attachers may have joined the stub between the factory call and this
// cleanup; we must deliver the saturation exception to ALL of them, not just
// the leader, otherwise the late attachers wait forever for a response that
// T he factory hit the allocator-saturation path or a duplicate-key race
// and stored a stub `InFlightRequest` under `key`. Late attachers may
// have joined the stub between the factory call and this cleanup; we
// must deliver the saturation exception to ALL of them, not just the
// leader, otherwise the late attachers wait forever for a response that
// never comes (the stub has no proxy TxId, so no backend round-trip will
// ever fire).
MultiplexerLogEvents . Saturated ( _logger , _plc . Name , pipe . RemoteEp ? . ToString ( ) ? ? "?" ) ;
if ( _inFlightByKey . TryRemove ( key , out var stub ) )
{
// Phase 12 (W4 / Nm1) — n on-blocking delivery via TrySendResponse.
// Previously this loop awaited SendResponseAsync per party, which would
// serialise on a wedged late-attacher's full bounded channel and stall
// delivery to its peers. Same doctrine as the W1.3 backend-reader fix:
// the per-PLC fan-out path must never await per-pipe writes.
// N on-blocking delivery via TrySendResponse — the per-PLC fan-out
// path must never await per-pipe writes (a wedged late-attacher's
// full bounded channel would otherwise stall delivery to its peers).
foreach ( var party in stub . InterestedParties )
{
byte [ ] excFrame = BuildExceptionFrame ( party . OriginalTxId , unitId , fcByte , exceptionCode : 4 ) ;
if ( ! party . Pipe . TrySendResponse ( excFrame ) )
_ctx . Counters . IncrementResponseDropForFullUpstream ( ) ;
else
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ; // W6
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ;
}
}
else
{
// The stub was already removed by another path (extremely unlikely, but
// defensive). Surface the exception to the original requester.
// The stub was already removed by another path (extremely unlikely,
// but defensive). Surface the exception to the original requester.
byte [ ] excFrame = BuildExceptionFrame ( originalTxId , unitId , fcByte , exceptionCode : 4 ) ;
if ( ! pipe . TrySendResponse ( excFrame ) )
_ctx . Counters . IncrementResponseDropForFullUpstream ( ) ;
else
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ; // W6
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ;
}
return ;
}
@@ -962,16 +956,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
return ;
}
// Non-coalescing path (FC06/FC16 writes, FC03/04 with coalescing disabled, or any
// other FC). This is the Phase-9 path verbatim — e very request gets its own proxy
// TxId and its own backend round-trip.
// Non-coalescing path (FC06/FC16 writes, FC03/04 with coalescing disabled, or
// any other FC). E very request gets its own proxy TxId and its own backend
// round-trip.
if ( ! _allocator . TryAllocate ( out ushort proxyTxIdFc ) )
{
MultiplexerLogEvents . Saturated ( _logger , _plc . Name , pipe . RemoteEp ? . ToString ( ) ? ? "?" ) ;
byte [ ] excFrame = BuildExceptionFrame ( originalTxId , unitId , fcByte , exceptionCode : 4 ) ;
await pipe . SendResponseAsync ( excFrame , ct ) . ConfigureAwait ( false ) ;
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ; // W6
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ;
return ;
}
@@ -993,10 +987,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
return ;
}
// Phase 10 — e ven when the coalescing path is bypassed (e.g. coalescing disabled
// for FC03/04), we still report the request as a Miss so Hit + Miss = total
// FC03/FC04 requests across snapshots. FC06/FC16 are not counted here (they are
// not coalescable in any sense).
// E ven when the coalescing path is bypassed (e.g. coalescing disabled for
// FC03/04), we still report the request as a Miss so Hit + Miss = total
// FC03/FC04 requests across snapshots. FC06/FC16 are not counted here (they
// are not coalescable in any sense).
if ( fcByte is 0x03 or 0x04 )
_ctx . Counters . IncrementCoalescedMiss ( ) ;
@@ -1037,12 +1031,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
/// Modbus exception (code 0x0B / Gateway Target Device Failed To Respond) to each
/// interested party with the original TxId restored.
///
/// <para><b>Why this exists.</b> In the 1:1 connection model, a lost response would
/// fault the dedicated backend socket an d the upstream pair would close. The multiplexed
/// model needs an explicit per-request timer because a single missing or mis-routed
/// response would otherwise leak a correlation entry forever a nd hang the upstream
/// pipe indefinitely. Real-world causes: PLC drops a response, network packet loss,
/// backend that mis-echoes MBAP TxIds.</para>
/// <para><b>Why this exists.</b> In a multiplexed connection model a single missing
/// or mis-routed response woul d o therwise leak a correlation entry forever and hang
/// the upstream pipe indefinitely. Real-world causes: PLC drops a response, network
/// packet loss, backe nd t hat mis-echoes MBAP TxIds.</para>
/// </summary>
private async Task RunRequestTimeoutWatchdogAsync ( CancellationToken ct )
{
@@ -1070,10 +1062,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_allocator . Release ( proxyTxId ) ;
// Phase 10 — a lso clear the coalescing-by-key entry. A late attach that
// raced in just before the watchdog claim will still receive the 0x0B
// exception via this entry's InterestedParties list (List<T> mutations
// happen before fan-out begins).
// A lso clear the coalescing-by-key entry. A late attach that raced
// in just before the watchdog claim will still receive the 0x0B
// exception via this entry's InterestedParties list (List<T>
// mutations happen before fan-out begins).
if ( req . Fc is 0x03 or 0x04 )
{
var coalKey = new CoalescingKey ( req . UnitId , req . Fc , req . StartAddress , req . Qty ) ;
@@ -1097,7 +1089,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
try
{
await party . Pipe . SendResponseAsync ( excFrame , ct ) . ConfigureAwait ( false ) ;
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ; // W6
_ctx . Counters . AddBytes ( up : 0 , down : excFrame . Length ) ;
}
catch
{
@@ -1150,10 +1142,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
/// <summary>
/// Phase 11 — b uilds an MBAP-framed response from cached PDU bytes for the given
/// upstream party. The cache stores POST-rewriter PDU bodies (no MBAP); each hit
/// stamps a fresh MBAP header carrying the requesting party's original TxId so the
/// response looks indistinguishable from a fresh backend reply.
/// B uilds an MBAP-framed response from cached PDU bytes for the given upstream
/// party. The cache stores POST-rewriter PDU bodies (no MBAP); each hit stamps a
/// fresh MBAP header carrying the requesting party's original TxId so the response
/// looks indistinguishable from a fresh backend reply.
/// </summary>
private static byte [ ] BuildCacheHitFrame ( ushort originalTxId , byte unitId , byte [ ] cachedPdu )
{