mbproxy: add in-flight read coalescing (Phase 10)

When two or more upstream clients send the same FC03/FC04 read while a
matching request is already in flight on the same PLC's multiplexed
backend socket, attach the late arrivals to the existing InFlightRequest
.InterestedParties list instead of opening a second backend round-trip.
The single backend response fans out to every attached party with each
party's original MBAP TxId restored individually. Zero post-response
staleness — coalescing operates entirely within the in-flight window
(microseconds to ~10 ms typical); the proxy is NOT a cache layer.

Headline mechanism:

- New record struct CoalescingKey(UnitId, Fc, StartAddress, Qty) keys
  the per-PLC InFlightByKeyMap. FC03 and FC04 are separate Modbus
  tables and never share a key; different unit IDs never coalesce;
  writes (FC06/FC16) bypass the coalescing path entirely.
- InFlightByKeyMap uses a simple lock around a Dictionary; atomic
  TryAttachOrCreate either appends a new party to the in-flight
  request's mutable List<InterestedParty> or invokes a factory to
  build a fresh entry. Per-entry MaxParties cap (default 32) bounds
  fan-out cost; past the cap, the next arrival opens a new entry.
- PlcMultiplexer.OnUpstreamFrameAsync takes the coalescing path for
  FC03/FC04 when Mbproxy.Resilience.ReadCoalescing.Enabled. The
  factory closure does the Phase-9 work (allocate TxId, add to
  CorrelationMap); the channel send happens AFTER returning from
  TryAttachOrCreate so the map lock is not held across the async send.
- Response fan-out in RunBackendReaderAsync removes the entry from
  InFlightByKeyMap before iterating InterestedParties, ensuring no
  concurrent attach can mutate the list during iteration.
- Cascade + watchdog paths also drain the key map so a stale entry
  cannot outlive its backend round-trip.

Counter accounting balance (per snapshot): CoalescedHitCount +
CoalescedMissCount equals total FC03 + FC04 requests since startup.
Even with coalescing disabled, every read still bumps Miss so dashboard
math stays balanced.

New surface (additive only):
- src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs
- src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs
- src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs
- ReadCoalescingOptions on ResilienceOptions
- CoalescedHitCount / CoalescedMissCount /
  CoalescedResponseToDeadUpstream counters surfaced on /status.json
  per PLC and as a compact "Coal" cell on the HTML status page.

Phase 9 test patch: TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire
previously read the same register from both clients (which now
coalesces). Patched to read two different addresses so the test still
proves distinct backend TxIds without violating the coalescing
contract.

Tests added: 24 new (19 unit + 5 E2E):
- CoalescingKeyTests (5)
- InFlightByKeyMapTests (6, includes concurrent stress)
- ReadCoalescingTests (8, stub-backend with deterministic delay)
- ReadCoalescingE2ETests (5, pymodbus simulator; coalescing-active
  during overlap is proven against the stub, not the sim, due to
  pymodbus 3.13's known concurrent-frame bug)

Total: 325 tests passing (282 unit + 43 E2E).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-14 02:26:06 -04:00
parent 56eee3c563
commit a2dba4bd07
25 changed files with 1888 additions and 52 deletions
@@ -49,9 +49,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
private readonly PerPlcContext _ctx;
private readonly ILogger<PlcMultiplexer> _logger;
private readonly ResiliencePipeline? _backendConnectPipeline;
// Phase 10: live read-coalescing config accessor. The accessor is read per-PDU on the
// request path so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled`
// propagates immediately. Production wires this to
// `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a
// fresh `ReadCoalescingOptions()` (Enabled = true, MaxParties = 32).
private readonly Func<ReadCoalescingOptions> _coalescingOptions;
private readonly TxIdAllocator _allocator = new();
private readonly CorrelationMap _correlation = new();
private readonly InFlightByKeyMap _inFlightByKey = new();
private readonly Channel<byte[]> _outboundChannel = Channel.CreateBounded<byte[]>(
new BoundedChannelOptions(OutboundChannelCapacity)
@@ -84,7 +91,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
IPduPipeline pipeline,
PerPlcContext perPlcContext,
ILogger<PlcMultiplexer> logger,
ResiliencePipeline? backendConnectPipeline = null)
ResiliencePipeline? backendConnectPipeline = null,
Func<ReadCoalescingOptions>? coalescingOptions = null)
{
_plc = plc;
_connectionOptions = connectionOptions;
@@ -92,6 +100,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_ctx = perPlcContext;
_logger = logger;
_backendConnectPipeline = backendConnectPipeline;
_coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions());
// Register this multiplexer as the live telemetry source for the PLC's counters.
_ctx.Counters.SetMultiplexProvider(this);
@@ -301,6 +310,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
cascadeIds.Add(party.Pipe.Id);
}
// Phase 10 — also drain the in-flight-by-key map so a brand-new identical request
// through the freshly-reconnected backend is treated as a miss (no stale entries
// outlive the backend they were destined for).
_inFlightByKey.DrainAll();
int upstreamCount = 0;
if (cascadeUpstreams)
{
@@ -408,6 +422,17 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Free the allocator slot immediately so it can be reused.
_allocator.Release(proxyTxId);
// Phase 10 — for FC03/FC04 reads, also clear the coalescing-by-key entry so
// a brand-new identical request issued AFTER this response is treated as a
// miss (opens a fresh round-trip). The TryRemove is best-effort: a watchdog
// timeout or cascade may have already removed it.
if (inFlight.Fc is 0x03 or 0x04)
{
var coalKey = new CoalescingKey(inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty);
_inFlightByKey.TryRemove(coalKey, out _);
}
// Update EWMA round-trip from when we sent the request.
long elapsedMs = (DateTimeOffset.UtcNow - inFlight.SentAtUtc).Ticks * 100; // 100 ns per tick
// UpdateRoundTripEwma expects Stopwatch ticks, but we have wall-clock.
@@ -427,10 +452,25 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Fan out to each interested party with their original TxId restored.
// Phase 9: always exactly one party. Phase 10: N parties (read coalescing).
// Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no
// further attaches can occur — the parties list is now a stable snapshot.
foreach (var party in inFlight.InterestedParties)
{
if (!party.Pipe.IsAlive)
{
// Phase 10 — record the dead-upstream skip only for FC03/FC04 (the
// only function codes that take the coalescing path). For non-
// coalescing FCs this branch is silent — the Phase-9 behaviour.
if (inFlight.Fc is 0x03 or 0x04
&& inFlight.InterestedParties.Count > 1)
{
_ctx.Counters.IncrementCoalescedResponseToDeadUpstream();
CoalescingLogEvents.DeadUpstream(
_logger, _plc.Name, inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty);
}
continue;
}
// The frame buffer is private to this iteration; if there are multiple
// parties (Phase 10), each gets its own copy with its own original TxId
@@ -482,20 +522,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
out ushort originalTxId, out _, out _, out byte unitId))
return;
if (!_allocator.TryAllocate(out ushort proxyTxId))
{
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
// Synthesize Modbus exception 04 (Slave Device Failure).
byte fc = frame.Length > MbapFrame.HeaderSize ? frame[MbapFrame.HeaderSize] : (byte)0;
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fc, exceptionCode: 4);
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
return;
}
// Parse the PDU FC + start/qty (for FC03/04) so the response decoder has the
// correlation it needs.
// Parse the PDU FC + start/qty (for FC03/04) — needed for both the coalescing-key
// path and the response correlation slot. FC06/FC16 (writes) keep startAddr/qty = 0;
// they bypass coalescing entirely.
int pduOffset = MbapFrame.HeaderSize;
byte fcByte = frame[pduOffset];
byte fcByte = frame.Length > pduOffset ? frame[pduOffset] : (byte)0;
ushort startAddr = 0;
ushort qty = 0;
if (fcByte is 0x03 or 0x04 && frame.Length >= pduOffset + 5)
@@ -504,37 +535,175 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
}
var inFlight = new InFlightRequest(
// Phase 10 — read-coalescing path. Only FC03/FC04 are coalescable; only when the
// feature is enabled in the live config. If the late-arriving request matches an
// already-in-flight peer, we attach to the existing entry and skip the backend
// round-trip entirely. The existing entry's response will fan out to both parties.
var coalescingOpts = _coalescingOptions();
if (fcByte is 0x03 or 0x04 && coalescingOpts.Enabled)
{
var key = new CoalescingKey(unitId, fcByte, startAddr, qty);
var newParty = new InterestedParty(pipe, originalTxId);
// The factory does the Phase-9 work: allocate a proxy TxId, build the
// InFlightRequest with a mutable List<InterestedParty>, add to the correlation
// map. We deliberately do NOT enqueue to the outbound channel inside the
// factory — that's done outside the InFlightByKey lock to keep the lock
// scope tight and to avoid holding the lock across an async send.
//
// proxyTxIdForSend / inFlightForSend communicate the factory's allocation back
// out of the lock so the post-lock code can finish the send.
ushort proxyTxIdForSend = 0;
InFlightRequest? inFlightForSend = null;
_inFlightByKey.TryAttachOrCreate(
key,
newParty,
factory: () =>
{
if (!_allocator.TryAllocate(out ushort proxyTxId))
{
// Saturation — record an empty placeholder InFlightRequest that the
// caller will detect via inFlightForSend == null. We can't easily
// signal failure through the bool return, so we leave the saturation
// exception delivery to the caller.
return new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: new List<InterestedParty> { newParty },
SentAtUtc: DateTimeOffset.UtcNow);
}
var partyList = new List<InterestedParty>(capacity: 1) { newParty };
var inFlight = new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: partyList,
SentAtUtc: DateTimeOffset.UtcNow);
if (!_correlation.TryAdd(proxyTxId, inFlight))
{
_allocator.Release(proxyTxId);
_logger.LogError(
"CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}",
proxyTxId);
// Return the stub anyway; outer code detects via inFlightForSend == null.
return inFlight;
}
_ctx.Counters.ObserveInFlight(_allocator.InFlightCount);
proxyTxIdForSend = proxyTxId;
inFlightForSend = inFlight;
return inFlight;
},
maxParties: coalescingOpts.MaxParties,
out _,
out bool wasNew);
if (!wasNew)
{
// Coalesce hit: attached to an existing in-flight entry. No backend traffic.
_ctx.Counters.IncrementCoalescedHit();
CoalescingLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty,
partyCount: _inFlightByKey.Count);
return;
}
// Coalesce miss: we just opened a fresh in-flight entry.
_ctx.Counters.IncrementCoalescedMiss();
CoalescingLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
if (inFlightForSend is null)
{
// The factory hit the allocator-saturation path or a duplicate-key race.
// Surface a Modbus exception 04 to the upstream and clean up.
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4);
_inFlightByKey.TryRemove(key, out _);
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
return;
}
// Apply the BCD rewriter on the request, then send to the backend. We are now
// OUTSIDE the InFlightByKey lock — late attaches arriving after this point will
// attach to the same entry while it sits in the channel/wire.
var requestCtx = _ctx.WithCurrentRequest(inFlightForSend);
_pipeline.Process(
MbapDirection.RequestToBackend,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize),
requestCtx);
frame[0] = (byte)(proxyTxIdForSend >> 8);
frame[1] = (byte)(proxyTxIdForSend & 0xFF);
try
{
await _outboundChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
if (_correlation.TryRemove(proxyTxIdForSend, out _))
_allocator.Release(proxyTxIdForSend);
_inFlightByKey.TryRemove(key, out _);
}
return;
}
// Non-coalescing path (FC06/FC16 writes, FC03/04 with coalescing disabled, or any
// other FC). This is the Phase-9 path verbatim — every request gets its own proxy
// TxId and its own backend round-trip.
if (!_allocator.TryAllocate(out ushort proxyTxIdFc))
{
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4);
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
return;
}
var partyListNc = new List<InterestedParty>(capacity: 1) { new InterestedParty(pipe, originalTxId) };
var inFlightNc = new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: [new InterestedParty(pipe, originalTxId)],
InterestedParties: partyListNc,
SentAtUtc: DateTimeOffset.UtcNow);
if (!_correlation.TryAdd(proxyTxId, inFlight))
if (!_correlation.TryAdd(proxyTxIdFc, inFlightNc))
{
// Should be impossible: the allocator just guaranteed proxyTxId is free.
_allocator.Release(proxyTxId);
_logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxId);
_allocator.Release(proxyTxIdFc);
_logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxIdFc);
return;
}
// Phase 10 — even when the coalescing path is bypassed (e.g. coalescing disabled
// for FC03/04), we still report the request as a Miss so Hit + Miss = total
// FC03/FC04 requests across snapshots. FC06/FC16 are not counted here (they are
// not coalescable in any sense).
if (fcByte is 0x03 or 0x04)
_ctx.Counters.IncrementCoalescedMiss();
// Peak in-flight tracking.
_ctx.Counters.ObserveInFlight(_allocator.InFlightCount);
// Apply the BCD rewriter on the request. Use a per-call context with CurrentRequest
// (the rewriter doesn't currently need it on request, but Phase 10 may).
var requestCtx = _ctx.WithCurrentRequest(inFlight);
// Apply the BCD rewriter on the request.
var requestCtxNc = _ctx.WithCurrentRequest(inFlightNc);
_pipeline.Process(
MbapDirection.RequestToBackend,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize),
requestCtx);
requestCtxNc);
// Overwrite the MBAP TxId with the proxy TxId.
frame[0] = (byte)(proxyTxId >> 8);
frame[1] = (byte)(proxyTxId & 0xFF);
frame[0] = (byte)(proxyTxIdFc >> 8);
frame[1] = (byte)(proxyTxIdFc & 0xFF);
// Enqueue for the backend writer task.
try
@@ -544,8 +713,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
catch (ChannelClosedException)
{
// Channel completed during shutdown — release the proxy TxId.
if (_correlation.TryRemove(proxyTxId, out _))
_allocator.Release(proxyTxId);
if (_correlation.TryRemove(proxyTxIdFc, out _))
_allocator.Release(proxyTxIdFc);
}
}
@@ -591,6 +760,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_allocator.Release(proxyTxId);
// Phase 10 — also clear the coalescing-by-key entry. A late attach that
// raced in just before the watchdog claim will still receive the 0x0B
// exception via this entry's InterestedParties list (List<T> mutations
// happen before fan-out begins).
if (req.Fc is 0x03 or 0x04)
{
var coalKey = new CoalescingKey(req.UnitId, req.Fc, req.StartAddress, req.Qty);
_inFlightByKey.TryRemove(coalKey, out _);
}
long elapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds;
foreach (var party in req.InterestedParties)