Files
wwtools/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs
T
Joseph Doherty 0868613890 mbproxy: add keepalive / connection monitoring
The DL205/DL260 ECOM emits no TCP keepalives, so an idle backend socket
can be silently dropped by a middlebox (switch, firewall, NAT) after
2-5 minutes. Enable OS SO_KEEPALIVE on backend and accepted upstream
sockets, and drive a periodic synthetic FC03 heartbeat on each idle
backend socket so a dead path is detected before a real client request
hits it. Controlled by Connection.Keepalive (ON by default).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 09:40:54 -04:00

1365 lines
67 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading.Channels;
using Mbproxy.Options;
using Mbproxy.Proxy.Cache;
using Polly;
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Owner of the single backend TCP connection to one PLC. Multiplexes many
/// <see cref="UpstreamPipe"/> instances onto that one socket by rewriting MBAP transaction
/// IDs so concurrent in-flight requests from different upstream clients remain
/// distinguishable on the shared wire. The multiplexer:
///
/// <list type="bullet">
/// <item><description>Opens and re-opens the backend socket through a Polly retry pipeline
/// that matches the <see cref="ResilienceOptions.BackendConnect"/> profile.</description></item>
/// <item><description>Runs one backend writer task that drains <see cref="_outboundChannel"/>
/// into the backend socket (single writer; no socket-level synchronisation needed).</description></item>
/// <item><description>Runs one backend reader task that decodes MBAP frames from the backend,
/// looks each frame up in the <see cref="CorrelationMap"/>, restores each interested
/// party's original TxId, and hands the frame to that party's
/// <see cref="UpstreamPipe._responseChannel"/>.</description></item>
/// <item><description>Cascades a backend disconnect by closing every attached pipe and
/// freeing every allocated proxy TxId, then waits for the next upstream request to
/// arrive (which triggers a fresh backend connect via Polly).</description></item>
/// </list>
///
/// <para><b>Threading invariants:</b> a single backend writer touches the backend socket
/// for sends; a single backend reader touches the same socket for receives. Per-upstream
/// read tasks call <see cref="OnUpstreamFrameAsync"/>, which allocates a proxy TxId, queues
/// the request frame into <see cref="_outboundChannel"/>, and returns. Upstream-side writes
/// flow through each pipe's response channel — never directly through this class.</para>
///
/// <para><b>Lifecycle:</b> the multiplexer is created with the backend offline. The first
/// <see cref="OnUpstreamFrameAsync"/> call (or the first <see cref="Attach"/> if you prefer
/// eager-start) triggers backend connect through the Polly pipeline. Subsequent in-flight
/// requests reuse the same socket. <see cref="DisposeAsync"/> tears down the backend
/// socket, the writer/reader tasks, and every attached pipe.</para>
/// </summary>
internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvider
{
private const int OutboundChannelCapacity = 256;
private readonly PlcOptions _plc;
private readonly ConnectionOptions _connectionOptions;
private readonly IPduPipeline _pipeline;
// `_ctx` is volatile so a hot-reload reseat can swap it on the running
// multiplexer. Each method that uses the context snapshots it into a local at the start
// of the operation so a single PDU sees a consistent (TagMap, Cache) pair even if the
// swap fires mid-PDU. ReplaceContext is the single mutator.
private volatile PerPlcContext _ctx;
private readonly ILogger<PlcMultiplexer> _logger;
private readonly ResiliencePipeline? _backendConnectPipeline;
// Live read-coalescing config accessor. The accessor is read per-PDU on the
// request path so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled`
// propagates immediately. Production wires this to
// `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a
// fresh `ReadCoalescingOptions()` (Enabled = true, MaxParties = 32).
private readonly Func<ReadCoalescingOptions> _coalescingOptions;
// Live keepalive config accessor. Read at backend-connect time (TCP SO_KEEPALIVE) and
// on each heartbeat-loop tick (idle threshold + probe address) so a hot-reload of
// `Connection.Keepalive` propagates without a listener restart. Production wires this
// to `() => optionsMonitor.CurrentValue.Connection.Keepalive`; the fallback reads the
// construction-time `ConnectionOptions` snapshot.
private readonly Func<KeepaliveOptions> _keepaliveOptions;
private readonly TxIdAllocator _allocator = new();
private readonly CorrelationMap _correlation = new();
private readonly InFlightByKeyMap _inFlightByKey = new();
private readonly Channel<byte[]> _outboundChannel = Channel.CreateBounded<byte[]>(
new BoundedChannelOptions(OutboundChannelCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
});
// Attached pipes — used by the status page and by coalescing fan-out.
// ConcurrentDictionary keyed on UpstreamPipe.Id for O(1) detach.
private readonly ConcurrentDictionary<Guid, UpstreamPipe> _pipes = new();
// Lifecycle plumbing. Backend tasks share a CTS; cascading disconnect cancels it,
// which terminates both the writer and reader tasks. The next call to
// EnsureBackendConnectedAsync constructs a fresh CTS and a fresh backend socket.
private readonly object _backendLock = new();
private Socket? _backendSocket;
private CancellationTokenSource? _backendCts;
private Task? _backendWriterTask;
private Task? _backendReaderTask;
private Task? _backendHeartbeatTask;
// UTC ticks of the last backend socket activity (a send OR a received frame). Updated
// by the writer and reader tasks; read by the heartbeat loop to decide whether the
// socket has been idle long enough to warrant a probe. Interlocked for cross-task
// coherence.
private long _lastBackendActivityTicks;
// Unit ID of the most recent upstream request. The synthetic heartbeat reuses it so
// the probe targets the same Modbus unit the real clients successfully talk to.
// Defaults to 0 until the first upstream frame is seen; by the time a heartbeat can
// fire the backend socket exists, which means at least one upstream frame arrived.
private int _lastSeenUnitId;
private readonly CancellationTokenSource _disposeCts = new();
// Volatile so the disposing thread's write is observed by every hot-path reader
// (OnUpstreamFrameAsync, ReplaceContext, Attach, etc.) without a separate fence.
// On x86/x64 plain reads happen to give acquire-release semantics, so this is
// defense for ARM hosts and future portability.
private volatile bool _disposed;
private Task? _watchdogTask;
public PlcMultiplexer(
PlcOptions plc,
ConnectionOptions connectionOptions,
IPduPipeline pipeline,
PerPlcContext perPlcContext,
ILogger<PlcMultiplexer> logger,
ResiliencePipeline? backendConnectPipeline = null,
Func<ReadCoalescingOptions>? coalescingOptions = null,
Func<KeepaliveOptions>? keepaliveOptions = null)
{
_plc = plc;
_connectionOptions = connectionOptions;
_pipeline = pipeline;
_ctx = perPlcContext;
_logger = logger;
_backendConnectPipeline = backendConnectPipeline;
_coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions());
_keepaliveOptions = keepaliveOptions ?? (() => _connectionOptions.Keepalive);
// Register the per-PLC cache as the live stats source for the snapshot path.
// Cache may be null when the per-PLC context has not been wired with one
// (every tag uncached, or unit tests).
if (_ctx.Cache is not null)
_ctx.Counters.SetCacheStatsProvider(new CacheStatsAdapter(_ctx.Cache));
// Register this multiplexer as the live telemetry source for the PLC's counters.
_ctx.Counters.SetMultiplexProvider(this);
// Spin up the per-request timeout watchdog. It scans the correlation map at a fixed
// interval and times out any in-flight request older than BackendRequestTimeoutMs.
// Critical for: lost responses, dead-PLC paths, and backends that mis-echo TxIds
// (e.g. pymodbus 3.13.0's concurrent-multiplexed-request bug — see test files).
_watchdogTask = Task.Run(() => RunRequestTimeoutWatchdogAsync(_disposeCts.Token), CancellationToken.None);
}
// ── IMultiplexCountersProvider ────────────────────────────────────────────
public long InFlightCount => _allocator.InFlightCount;
public long TxIdWraps => _allocator.WrapCount;
public long BackendQueueDepth => _outboundChannel.Reader.Count;
// ── Public surface ────────────────────────────────────────────────────────
/// <summary>
/// Read-only collection of currently-attached upstream pipes. Used by the status page.
/// </summary>
public IReadOnlyCollection<UpstreamPipe> AttachedPipes => _pipes.Values.ToArray();
/// <summary>
/// Attaches an upstream pipe to this multiplexer. The caller is responsible for
/// running the pipe's read+write loops (typically via <see cref="StartPipeAsync"/>)
/// which wires the pipe's OnFrame callback back into <see cref="OnUpstreamFrameAsync"/>.
/// </summary>
public void Attach(UpstreamPipe pipe)
{
if (_disposed)
throw new ObjectDisposedException(nameof(PlcMultiplexer));
_pipes[pipe.Id] = pipe;
}
/// <summary>
/// Atomically swaps the per-PLC context on a running multiplexer. Called by
/// <see cref="Supervision.PlcListenerSupervisor.ReplaceContextAsync"/> when a
/// hot-reload tag-list change is applied to a PLC whose listener is already bound.
///
/// <para>The new context's tag map and (optional) response cache become visible on the
/// next PDU through the volatile <c>_ctx</c> field. Counters are PRESERVED across reseat
/// (the supervisor builds the new context with the running counters), so we only need
/// to re-register the cache stats provider — the multiplex provider already points at
/// this same instance.</para>
///
/// <para>Existing per-call snapshots of the old context held by in-flight PDUs (via
/// <c>WithCurrentRequest</c>) finish on the old map. New PDUs after this call see the
/// new map. Per the design contract a one-PDU "old map" tail is acceptable; partial-BCD
/// rewrites mid-request would be worse.</para>
/// </summary>
public void ReplaceContext(PerPlcContext newContext)
{
if (_disposed) return;
// Provider FIRST, then _ctx. The status page's snapshot path reads
// `_cacheStatsProvider` independently of `_ctx`. If we swapped `_ctx` first, a
// snapshot taken in the gap between the two writes would still hold the OLD
// adapter wrapping the OLD cache — which the supervisor is about to dispose
// (`PlcListenerSupervisor.ReplaceContextAsync` runs `oldCache.Dispose()` after we
// return). Setting the provider first means snapshots in the swap window read
// either (old provider, old ctx) or (new provider, new ctx) — both coherent —
// never (old provider after old cache disposed).
//
// In the typical reseat case `oldContext.Counters == newContext.Counters` (the
// reconciler preserves counters across reseat), so this updates the same instance
// both paths share. The order still matters because the snapshot reads the
// provider field, not the per-context counters reference.
newContext.Counters.SetCacheStatsProvider(
newContext.Cache is not null ? new CacheStatsAdapter(newContext.Cache) : null);
_ctx = newContext;
}
/// <summary>
/// Starts the read+write tasks for <paramref name="pipe"/> and returns a task that
/// completes when the pipe's read loop ends. The multiplexer detaches the pipe when
/// its read loop returns.
/// </summary>
public Task StartPipeAsync(UpstreamPipe pipe, CancellationToken ct)
{
Attach(pipe);
// The write loop runs to completion when the pipe is disposed or the channel
// completes. We don't await it directly — it's joined inside DisposeAsync of the pipe.
_ = Task.Run(() => pipe.RunWriteLoopAsync(ct), CancellationToken.None);
var readLoop = pipe.RunReadLoopAsync(
(frame, frameCt) => OnUpstreamFrameAsync(pipe, frame, frameCt),
ct);
// When the pipe's read loop finishes, detach it. Don't dispose it here; the
// listener (or the cascade walker) owns disposal.
_ = readLoop.ContinueWith(prev =>
{
_pipes.TryRemove(pipe.Id, out _);
}, TaskScheduler.Default);
return readLoop;
}
/// <summary>
/// Tears down the multiplexer: closes the backend connection, cancels both backend
/// tasks, drains every in-flight correlation entry, and closes every attached pipe.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
// Stop the counters provider link so a status snapshot during teardown doesn't
// see live-but-soon-to-be-empty internal state.
_ctx.Counters.SetMultiplexProvider(null);
_ctx.Counters.SetCacheStatsProvider(null);
await _disposeCts.CancelAsync().ConfigureAwait(false);
// Best-effort join the watchdog so its in-flight log/dispatch settles before tests
// assert on counter state.
if (_watchdogTask is not null)
{
try { await _watchdogTask.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); }
catch { /* swallow */ }
}
await TearDownBackendAsync("disposing", cascadeUpstreams: true).ConfigureAwait(false);
_outboundChannel.Writer.TryComplete();
// Dispose all attached pipes.
foreach (var pipe in _pipes.Values)
{
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
}
_pipes.Clear();
// Guard the CTS dispose against a watchdog tick that raced past the WaitAsync
// above (e.g. a slow Task.Delay completion observing cancellation late). Also
// dispose the connect-gate semaphore.
try { _disposeCts.Dispose(); } catch (ObjectDisposedException) { /* already disposed */ }
try { _connectGate.Dispose(); } catch (ObjectDisposedException) { /* already disposed */ }
}
// ── Backend connect / teardown ────────────────────────────────────────────
private async Task<bool> EnsureBackendConnectedAsync(CancellationToken ct)
{
if (_disposed) return false;
// Fast path: already connected.
if (_backendSocket is { Connected: true } && _backendCts is { IsCancellationRequested: false })
return true;
// Serialise concurrent connect attempts from many upstream pipes.
await _connectGate.WaitAsync(ct).ConfigureAwait(false);
try
{
// Re-check after acquiring the gate.
if (_backendSocket is { Connected: true } && _backendCts is { IsCancellationRequested: false })
return true;
// Build a fresh backend socket and Polly-connect.
var backend = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
SocketKeepalive.Apply(backend, _keepaliveOptions());
try
{
if (_backendConnectPipeline is not null)
{
await _backendConnectPipeline.ExecuteAsync(async attemptToken =>
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(attemptToken);
cts.CancelAfter(_connectionOptions.BackendConnectTimeoutMs);
await backend.ConnectAsync(_plc.Host, _plc.Port, cts.Token).ConfigureAwait(false);
}, ct).ConfigureAwait(false);
}
else
{
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
connectCts.CancelAfter(_connectionOptions.BackendConnectTimeoutMs);
await backend.ConnectAsync(_plc.Host, _plc.Port, connectCts.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
string reason = ex is OperationCanceledException
? $"Backend connect timed out or cancelled after {_connectionOptions.BackendConnectTimeoutMs} ms"
: ex.Message;
MultiplexerLogEvents.BackendFailed(_logger, _plc.Name, reason);
_ctx.Counters.IncrementConnectFailed();
backend.Dispose();
return false;
}
// Successful connect. Wire up the backend tasks.
var cts2 = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token);
lock (_backendLock)
{
_backendSocket = backend;
_backendCts = cts2;
// Seed the idle timer so the heartbeat loop measures idleness from connect.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
_backendWriterTask = Task.Run(() => RunBackendWriterAsync(backend, cts2.Token), CancellationToken.None);
_backendReaderTask = Task.Run(() => RunBackendReaderAsync(backend, cts2.Token), CancellationToken.None);
_backendHeartbeatTask = Task.Run(() => RunBackendHeartbeatAsync(cts2.Token), CancellationToken.None);
}
_ctx.Counters.IncrementConnectSuccess();
MultiplexerLogEvents.BackendConnected(_logger, _plc.Name, _plc.Host, _plc.Port);
return true;
}
finally
{
_connectGate.Release();
}
}
private readonly SemaphoreSlim _connectGate = new(1, 1);
private async Task TearDownBackendAsync(string reason, bool cascadeUpstreams)
{
// Serialise tear-down vs connect-up via the connect gate. Without this, a fresh
// EnsureBackendConnectedAsync racing with the channel drain below could see
// stranded frames sent on its new socket with old (already-released) TxIds,
// producing orphaned responses that hang upstream peers via the watchdog.
//
// Bounded wait: a long Polly-wrapped EnsureBackendConnectedAsync against an
// unreachable host can hold the gate for the full BackendConnectTimeoutMs *
// MaxAttempts window, blocking DisposeAsync (and therefore ProxyWorker.StopAsync)
// for that duration. A 2 s teardown deadline bounds disposal latency; if the gate
// is unavailable we proceed best-effort without it (the worst-case consequence is
// one orphaned in-flight cycle on the dying backend, which the upstream watchdog
// will surface as exception 0x0B).
//
// KNOWN RACE on the gate-not-held path: a concurrent EnsureBackendConnectedAsync
// that DOES hold the gate may TryAllocate a TxId that collides (after wraparound
// in the allocator's forward scan) with a TxId we're about to release from the
// channel-drain step below. The double-release would mark the new request's slot
// as free even though it's legitimately in-flight, allowing the next allocation
// to reuse the same slot and CorrelationMap.TryAdd to fail (silent request drop).
// Probability is very low (requires gate timeout + new accept landing during
// cascade + TxId collision in a 65,536-slot space); the only consequence is one
// dropped request that the client retries. Accepted as best-effort behaviour.
bool gateHeld = false;
try
{
using var teardownCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await _connectGate.WaitAsync(teardownCts.Token).ConfigureAwait(false);
gateHeld = true;
}
catch (OperationCanceledException)
{
// Best-effort: proceed without the gate. Concurrent connect attempts will
// observe _disposed (or the now-null _backendSocket) and short-circuit.
}
catch (ObjectDisposedException)
{
// _connectGate already disposed — TearDown is racing past DisposeAsync.
// Skip the body entirely; there's nothing useful to do at this point.
return;
}
try
{
Socket? oldSocket;
CancellationTokenSource? oldCts;
Task? writer, reader, heartbeat;
lock (_backendLock)
{
oldSocket = _backendSocket;
oldCts = _backendCts;
writer = _backendWriterTask;
reader = _backendReaderTask;
heartbeat = _backendHeartbeatTask;
_backendSocket = null;
_backendCts = null;
_backendWriterTask = null;
_backendReaderTask = null;
_backendHeartbeatTask = null;
}
if (oldSocket is null && oldCts is null) return;
try { oldCts?.Cancel(); } catch { /* best effort */ }
try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ }
try { oldSocket?.Dispose(); } catch { /* best effort */ }
// Drain correlation map; cascade-close every interested upstream pipe.
var dropped = _correlation.DrainAll();
foreach (var kvp in dropped)
{
_allocator.Release(kvp.Key);
}
// Also drain the in-flight-by-key map so a brand-new identical request through
// the freshly-reconnected backend is treated as a miss (no stale entries
// outlive the backend they were destined for).
_inFlightByKey.DrainAll();
int upstreamCount = 0;
if (cascadeUpstreams)
{
// Close every attached pipe that had a request in flight; the others will
// simply re-issue on next request through a fresh backend connect.
// Per docs/Architecture/ConnectionModel.md, ALL attached upstreams cascade on backend disconnect.
upstreamCount = _pipes.Count;
// Snapshot keys before disposal modifies the dictionary indirectly.
var pipeList = _pipes.Values.ToArray();
foreach (var pipe in pipeList)
{
try { await pipe.DisposeAsync().ConfigureAwait(false); }
catch { /* best effort */ }
}
_pipes.Clear();
_ctx.Counters.AddDisconnectCascades(upstreamCount);
}
// Drain any stranded frames left in the outbound channel by the writer task
// that just faulted/cancelled. Release their proxy TxIds back to the
// allocator. By the time we reach this line the writer has stopped reading
// from the channel (cancelled CTS) and the upstream pipes have been cascaded
// (no more enqueues), so the channel state is stable.
int strandedDropped = 0;
while (_outboundChannel.Reader.TryRead(out byte[]? stranded))
{
if (stranded.Length >= 2)
{
ushort strandedTxId = (ushort)((stranded[0] << 8) | stranded[1]);
_allocator.Release(strandedTxId);
}
strandedDropped++;
}
// Best-effort join.
try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
try { if (heartbeat is not null) await heartbeat.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
oldCts?.Dispose();
if (upstreamCount > 0 || dropped.Count > 0 || strandedDropped > 0)
MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count + strandedDropped, reason);
}
finally
{
// Only release if we acquired — best-effort path may have skipped.
if (gateHeld)
{
try { _connectGate.Release(); }
catch (ObjectDisposedException) { /* dispose race — harmless */ }
}
}
}
// ── Backend writer / reader tasks ─────────────────────────────────────────
private async Task RunBackendWriterAsync(Socket backend, CancellationToken ct)
{
try
{
await foreach (var frame in _outboundChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
int sent = 0;
while (sent < frame.Length)
{
int n = await backend.SendAsync(
frame.AsMemory(sent, frame.Length - sent),
SocketFlags.None,
ct).ConfigureAwait(false);
if (n == 0) throw new SocketException((int)SocketError.ConnectionReset);
sent += n;
}
// A send counts as backend activity — it suppresses the idle heartbeat.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
// Backend failure — cascade. Skip if disposal is already in progress;
// DisposeAsync runs an explicit TearDown and the fire-and-forget here would
// race against it, hitting a disposed _connectGate and producing an
// unobserved-task exception.
if (!_disposeCts.IsCancellationRequested)
_ = TearDownBackendAsync($"writer fault: {ex.Message}", cascadeUpstreams: true);
}
}
private async Task RunBackendReaderAsync(Socket backend, CancellationToken ct)
{
byte[] headerBuf = new byte[MbapFrame.HeaderSize];
try
{
while (!ct.IsCancellationRequested)
{
if (!await FillAsync(backend, headerBuf, 0, MbapFrame.HeaderSize, ct).ConfigureAwait(false))
break;
if (!MbapFrame.TryParseHeader(headerBuf.AsSpan(),
out ushort proxyTxId, out _, out ushort length, out _))
break;
if (length < 1)
{
// Degenerate frame — drop.
continue;
}
int pduBodyLen = length - 1;
if (pduBodyLen > MbapFrame.MaxPduBodySize)
{
// Frame too large — backend is misbehaving; force teardown.
_logger.LogWarning(
"Oversized backend frame: Plc={Plc} PduBody={Body} > Max={Max}",
_plc.Name, pduBodyLen, MbapFrame.MaxPduBodySize);
break;
}
byte[] frame = new byte[MbapFrame.HeaderSize + pduBodyLen];
Buffer.BlockCopy(headerBuf, 0, frame, 0, MbapFrame.HeaderSize);
if (!await FillAsync(backend, frame, MbapFrame.HeaderSize, pduBodyLen, ct).ConfigureAwait(false))
break;
// A received frame counts as backend activity — it suppresses (and, for a
// heartbeat response, satisfies) the idle heartbeat.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
if (!_correlation.TryRemove(proxyTxId, out var inFlight))
{
// No correlation entry — either a stale response after cascade, or
// the PLC sent something unsolicited. Drop the frame.
continue;
}
// Free the allocator slot immediately so it can be reused.
_allocator.Release(proxyTxId);
// Keepalive heartbeat response — the probe came back, the backend is alive.
// The activity timestamp was already refreshed above. There is no upstream
// party, no cache eligibility, and no rewriting to do: drop the payload and
// skip the EWMA update so the synthetic probe never pollutes the
// client-facing round-trip metric.
if (inFlight.IsHeartbeat)
continue;
// For FC03/FC04 reads, also clear the coalescing-by-key entry so a
// brand-new identical request issued AFTER this response is treated as a
// miss (opens a fresh round-trip). The TryRemove is best-effort: a
// watchdog timeout or cascade may have already removed it.
if (inFlight.Fc is 0x03 or 0x04)
{
var coalKey = new CoalescingKey(inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty);
_inFlightByKey.TryRemove(coalKey, out _);
}
// Update EWMA round-trip from when we sent the request. UpdateRoundTripEwma
// expects Stopwatch ticks; convert from the wall-clock SentAtUtc timestamp.
long ticks = (long)((DateTimeOffset.UtcNow - inFlight.SentAtUtc).TotalSeconds * Stopwatch.Frequency);
if (ticks > 0)
_ctx.Counters.UpdateRoundTripEwma(ticks);
// Apply the BCD rewriter on the response. Build a per-call context clone
// that carries CurrentRequest so the rewriter can decode FC03/04 slots.
var responseCtx = _ctx.WithCurrentRequest(inFlight);
_pipeline.Process(
MbapDirection.ResponseToClient,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, pduBodyLen),
responseCtx);
// Post-rewriter cache update:
// * FC03/FC04 successful responses are stored when the request was
// cache-eligible (resolvedTtlMs > 0).
// * FC06/FC16 successful responses invalidate every cached entry whose
// address range overlaps the write.
//
// Exception bit comes from the post-rewriter buffer (the rewriter never
// touches the FC byte today, but reading from inFlight.Fc would lose the
// exception bit). The base FC for routing decisions uses inFlight.Fc —
// the request side knows what was sent.
if (_ctx.Cache is { } postCache)
{
byte fcInResponse = frame[MbapFrame.HeaderSize];
bool isException = (fcInResponse & 0x80) != 0;
if (!isException)
{
if (inFlight.Fc is 0x03 or 0x04 && inFlight.ResolvedCacheTtlMs > 0)
{
// Snapshot the post-rewriter PDU body so the cached entry is
// independent of this frame's lifetime.
byte[] pduSnapshot = new byte[pduBodyLen];
Buffer.BlockCopy(frame, MbapFrame.HeaderSize, pduSnapshot, 0, pduBodyLen);
var cacheKey = new CacheKey(
inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty);
var now = DateTimeOffset.UtcNow;
var entry = new CacheEntry(
PduBytes: pduSnapshot,
CachedAtUtc: now,
ExpiresAtUtc: now.AddMilliseconds(inFlight.ResolvedCacheTtlMs),
Length: pduSnapshot.Length,
LastUsedTick: 0); // ResponseCache.Set stamps the real tick
postCache.Set(cacheKey, entry);
CacheLogEvents.Store(_logger, _plc.Name,
inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty,
inFlight.ResolvedCacheTtlMs);
}
else if (inFlight.Fc is 0x06 or 0x10)
{
// The design contract "invalidations during a recovering
// listener state are skipped" is upheld IMPLICITLY here:
// invalidation only fires inside the backend reader task when
// a non-exception FC06/FC16 response arrives. A `Recovering`
// listener has no backend reader (the multiplexer is torn
// down between recovery attempts), so no response can land
// here, so no invalidation. The gating is structural, not
// conditional. If a future change ever produces a write
// response off the live backend, an explicit recovering-state
// check would need to be added.
int invalidated = postCache.Invalidate(
inFlight.UnitId, inFlight.StartAddress, inFlight.Qty);
if (invalidated > 0)
{
_ctx.Counters.AddCacheInvalidations(invalidated);
CacheLogEvents.Invalidated(_logger, _plc.Name,
inFlight.UnitId, inFlight.StartAddress, inFlight.Qty,
invalidated);
}
}
}
}
// Fan out to each interested party with their original TxId restored.
// Without coalescing there is exactly one party; with coalescing there
// are N. The InFlightByKey TryRemove above (for FC03/FC04) guarantees no
// further attaches can occur — the parties list is now a stable snapshot.
//
// Non-blocking fan-out via `TrySendResponse`. The single backend reader
// task must NEVER `await` a per-upstream channel write: a wedged upstream
// (full bounded response channel) would otherwise stall the reader and
// starve every other client on this PLC. A drop here is recorded via
// `responseDropForFullUpstream`; the wedged upstream loses its own
// response and will be reaped by its own socket-close path.
foreach (var party in inFlight.InterestedParties)
{
if (!party.Pipe.IsAlive)
{
// Record the dead-upstream skip only for FC03/FC04 (the only
// function codes that take the coalescing path). For
// non-coalescing FCs this branch is silent.
if (inFlight.Fc is 0x03 or 0x04
&& inFlight.InterestedParties.Count > 1)
{
_ctx.Counters.IncrementCoalescedResponseToDeadUpstream();
CoalescingLogEvents.DeadUpstream(
_logger, _plc.Name, inFlight.UnitId, inFlight.Fc,
inFlight.StartAddress, inFlight.Qty);
}
continue;
}
// The frame buffer is private to this iteration; if there are
// multiple coalesced parties, each gets its own copy with its own
// original TxId patched in. The single-party case reuses the buffer
// directly as the common-case fast path.
byte[] outFrame = inFlight.InterestedParties.Count == 1
? frame
: (byte[])frame.Clone();
outFrame[0] = (byte)(party.OriginalTxId >> 8);
outFrame[1] = (byte)(party.OriginalTxId & 0xFF);
if (!party.Pipe.TrySendResponse(outFrame))
{
_ctx.Counters.IncrementResponseDropForFullUpstream();
}
else
{
// Count outbound bytes per delivered party. With coalescing, one
// backend response fans out to N parties and produces
// N × frame.Length bytes leaving the proxy upstream-side.
_ctx.Counters.AddBytes(up: 0, down: outFrame.Length);
}
}
}
// Reader exited cleanly — backend closed by remote. Cascade. Skip if
// dispose is already in progress (see writer-side comment above).
if (!_disposeCts.IsCancellationRequested)
_ = TearDownBackendAsync("backend reader EOF", cascadeUpstreams: true);
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
if (!_disposeCts.IsCancellationRequested)
_ = TearDownBackendAsync($"reader fault: {ex.Message}", cascadeUpstreams: true);
}
}
// ── Upstream → multiplexer entry point ────────────────────────────────────
private async ValueTask OnUpstreamFrameAsync(UpstreamPipe pipe, byte[] frame, CancellationToken ct)
{
if (_disposed) return;
if (frame.Length < MbapFrame.HeaderSize)
return;
if (!MbapFrame.TryParseHeader(frame.AsSpan(0, MbapFrame.HeaderSize),
out ushort originalTxId, out _, out _, out byte unitId))
return;
// Remember the unit ID so the backend keepalive heartbeat probes the same Modbus
// unit the real clients are known to reach successfully.
Volatile.Write(ref _lastSeenUnitId, unitId);
// Count inbound bytes from the upstream client. Surfaces in bytes.upstreamIn on
// the status page. Counted ONCE per parsed frame regardless of subsequent
// routing (cache hit, coalesce, backend round-trip, exception).
_ctx.Counters.AddBytes(up: frame.Length, down: 0);
// Parse the PDU FC + start/qty. FC03/FC04 reads use start/qty for the coalescing
// key and for the cache lookup. FC06 writes carry [addr][value]; we treat qty as
// 1 for invalidation. FC16 carries [start][qty][byteCount]...; qty is the write
// span used for cache invalidation. FC06/FC16 start/qty drive cache invalidation
// by overlap rather than exact key.
int pduOffset = MbapFrame.HeaderSize;
byte fcByte = frame.Length > pduOffset ? frame[pduOffset] : (byte)0;
ushort startAddr = 0;
ushort qty = 0;
if (fcByte is 0x03 or 0x04 && frame.Length >= pduOffset + 5)
{
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
}
else if (fcByte == 0x06 && frame.Length >= pduOffset + 5)
{
// FC06 = Write Single Register. PDU: [fc=06][addrHi][addrLo][valHi][valLo].
// For cache invalidation we represent this as qty=1 at addr.
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
qty = 1;
}
else if (fcByte == 0x10 && frame.Length >= pduOffset + 5)
{
// FC16 = Write Multiple Registers. PDU: [fc=10][startHi][startLo][qtyHi][qtyLo][byteCount]...
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
}
// Response-cache path. Cache check happens BEFORE coalescing AND before we
// attempt to bring up the backend connection. A hit short-circuits everything,
// including the EnsureBackendConnectedAsync call — operators with all reads
// cached and the backend down still get served (the cache survives backend
// disconnects per the design contract). The cache only fires for FC03/FC04 and
// only when the read range's resolved TTL > 0.
int resolvedCacheTtlMs = 0;
if (fcByte is 0x03 or 0x04 && _ctx.Cache is { } responseCache)
{
resolvedCacheTtlMs = _ctx.TagMap.ResolveCacheTtlMs(startAddr, qty);
if (resolvedCacheTtlMs > 0)
{
var cacheKey = new CacheKey(unitId, fcByte, startAddr, qty);
if (responseCache.TryGet(cacheKey, out var cached))
{
_ctx.Counters.IncrementCacheHit();
CacheLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
byte[] hitFrame = BuildCacheHitFrame(originalTxId, unitId, cached.PduBytes);
await pipe.SendResponseAsync(hitFrame, ct).ConfigureAwait(false);
// Outbound bytes for cache-hit response.
_ctx.Counters.AddBytes(up: 0, down: hitFrame.Length);
return;
}
// Per design contract: "miss" = "fell through to coalescing/backend".
// When two upstream peers issue the same cache-eligible read, both increment
// CacheMiss; only one then opens a backend round-trip (the second coalesces
// onto the first via the InFlightByKey path below). So `CacheMiss` does NOT
// equal "produced a backend round-trip" — it equals "did not find a fresh
// cache entry". The identity `Hit + Miss = cache-eligible requests` holds.
_ctx.Counters.IncrementCacheMiss();
CacheLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
}
}
// Ensure backend is connected. Failure here means we cannot service the request;
// close the upstream pipe.
if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false))
{
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
return;
}
// Read-coalescing path. Only FC03/FC04 are coalescable; only when the feature
// is enabled in the live config. If the late-arriving request matches an
// already-in-flight peer, we attach to the existing entry and skip the backend
// round-trip entirely. The existing entry's response will fan out to both parties.
var coalescingOpts = _coalescingOptions();
if (fcByte is 0x03 or 0x04 && coalescingOpts.Enabled)
{
var key = new CoalescingKey(unitId, fcByte, startAddr, qty);
var newParty = new InterestedParty(pipe, originalTxId);
// The factory allocates a proxy TxId, builds the InFlightRequest with a
// mutable List<InterestedParty>, and adds to the correlation map. We
// deliberately do NOT enqueue to the outbound channel inside the factory —
// that's done outside the InFlightByKey lock to keep the lock scope tight
// and to avoid holding the lock across an async send.
//
// proxyTxIdForSend / inFlightForSend communicate the factory's allocation
// back out of the lock so the post-lock code can finish the send.
ushort proxyTxIdForSend = 0;
InFlightRequest? inFlightForSend = null;
_inFlightByKey.AttachOrCreate(
key,
newParty,
factory: () =>
{
if (!_allocator.TryAllocate(out ushort proxyTxId))
{
// Saturation — record an empty placeholder InFlightRequest that the
// caller will detect via inFlightForSend == null. We can't easily
// signal failure through the bool return, so we leave the saturation
// exception delivery to the caller.
return new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: new List<InterestedParty> { newParty },
SentAtUtc: DateTimeOffset.UtcNow,
ResolvedCacheTtlMs: resolvedCacheTtlMs);
}
var partyList = new List<InterestedParty>(capacity: 1) { newParty };
var inFlight = new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: partyList,
SentAtUtc: DateTimeOffset.UtcNow,
ResolvedCacheTtlMs: resolvedCacheTtlMs);
if (!_correlation.TryAdd(proxyTxId, inFlight))
{
_allocator.Release(proxyTxId);
_logger.LogError(
"CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}",
proxyTxId);
// Return the stub anyway; outer code detects via inFlightForSend == null.
return inFlight;
}
_ctx.Counters.ObserveInFlight(_allocator.InFlightCount);
proxyTxIdForSend = proxyTxId;
inFlightForSend = inFlight;
return inFlight;
},
maxParties: coalescingOpts.MaxParties,
out _,
out bool wasNew);
if (!wasNew)
{
// Coalesce hit: attached to an existing in-flight entry. No backend traffic.
_ctx.Counters.IncrementCoalescedHit();
CoalescingLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty,
partyCount: _inFlightByKey.Count);
return;
}
// Coalesce miss: this request did not attach to an in-flight peer. Per the
// design contract `coalescedHitCount + coalescedMissCount = total FC03/FC04`,
// so even saturation-failure paths (factory below returns null inFlightForSend)
// count as a miss — every FC03/FC04 entered the coalescing path exactly once.
// "Miss" here means "did not coalesce", NOT "produced a backend round-trip".
_ctx.Counters.IncrementCoalescedMiss();
CoalescingLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
if (inFlightForSend is null)
{
// The factory hit the allocator-saturation path or a duplicate-key race
// and stored a stub `InFlightRequest` under `key`. Late attachers may
// have joined the stub between the factory call and this cleanup; we
// must deliver the saturation exception to ALL of them, not just the
// leader, otherwise the late attachers wait forever for a response that
// never comes (the stub has no proxy TxId, so no backend round-trip will
// ever fire).
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
if (_inFlightByKey.TryRemove(key, out var stub))
{
// Non-blocking delivery via TrySendResponse — the per-PLC fan-out
// path must never await per-pipe writes (a wedged late-attacher's
// full bounded channel would otherwise stall delivery to its peers).
foreach (var party in stub.InterestedParties)
{
byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, unitId, fcByte, exceptionCode: 4);
if (!party.Pipe.TrySendResponse(excFrame))
_ctx.Counters.IncrementResponseDropForFullUpstream();
else
_ctx.Counters.AddBytes(up: 0, down: excFrame.Length);
}
}
else
{
// The stub was already removed by another path (extremely unlikely,
// but defensive). Surface the exception to the original requester.
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4);
if (!pipe.TrySendResponse(excFrame))
_ctx.Counters.IncrementResponseDropForFullUpstream();
else
_ctx.Counters.AddBytes(up: 0, down: excFrame.Length);
}
return;
}
// Apply the BCD rewriter on the request, then send to the backend. We are now
// OUTSIDE the InFlightByKey lock — late attaches arriving after this point will
// attach to the same entry while it sits in the channel/wire.
var requestCtx = _ctx.WithCurrentRequest(inFlightForSend);
_pipeline.Process(
MbapDirection.RequestToBackend,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize),
requestCtx);
frame[0] = (byte)(proxyTxIdForSend >> 8);
frame[1] = (byte)(proxyTxIdForSend & 0xFF);
try
{
await _outboundChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
if (_correlation.TryRemove(proxyTxIdForSend, out _))
_allocator.Release(proxyTxIdForSend);
_inFlightByKey.TryRemove(key, out _);
}
return;
}
// Non-coalescing path (FC06/FC16 writes, FC03/04 with coalescing disabled, or
// any other FC). Every request gets its own proxy TxId and its own backend
// round-trip.
if (!_allocator.TryAllocate(out ushort proxyTxIdFc))
{
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4);
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
_ctx.Counters.AddBytes(up: 0, down: excFrame.Length);
return;
}
var partyListNc = new List<InterestedParty>(capacity: 1) { new InterestedParty(pipe, originalTxId) };
var inFlightNc = new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: partyListNc,
SentAtUtc: DateTimeOffset.UtcNow,
ResolvedCacheTtlMs: resolvedCacheTtlMs);
if (!_correlation.TryAdd(proxyTxIdFc, inFlightNc))
{
// Should be impossible: the allocator just guaranteed proxyTxId is free.
_allocator.Release(proxyTxIdFc);
_logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxIdFc);
return;
}
// Even when the coalescing path is bypassed (e.g. coalescing disabled for
// FC03/04), we still report the request as a Miss so Hit + Miss = total
// FC03/FC04 requests across snapshots. FC06/FC16 are not counted here (they
// are not coalescable in any sense).
if (fcByte is 0x03 or 0x04)
_ctx.Counters.IncrementCoalescedMiss();
// Peak in-flight tracking.
_ctx.Counters.ObserveInFlight(_allocator.InFlightCount);
// Apply the BCD rewriter on the request.
var requestCtxNc = _ctx.WithCurrentRequest(inFlightNc);
_pipeline.Process(
MbapDirection.RequestToBackend,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize),
requestCtxNc);
// Overwrite the MBAP TxId with the proxy TxId.
frame[0] = (byte)(proxyTxIdFc >> 8);
frame[1] = (byte)(proxyTxIdFc & 0xFF);
// Enqueue for the backend writer task.
try
{
await _outboundChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// Channel completed during shutdown — release the proxy TxId.
if (_correlation.TryRemove(proxyTxIdFc, out _))
_allocator.Release(proxyTxIdFc);
}
}
// ── Per-request timeout watchdog ──────────────────────────────────────────
/// <summary>
/// Periodically scans the correlation map for in-flight requests whose response has
/// not arrived within <see cref="ConnectionOptions.BackendRequestTimeoutMs"/>. For each
/// stale entry: removes it from the map, frees its allocator slot, and delivers a
/// Modbus exception (code 0x0B / Gateway Target Device Failed To Respond) to each
/// interested party with the original TxId restored.
///
/// <para><b>Why this exists.</b> In a multiplexed connection model a single missing
/// or mis-routed response would otherwise leak a correlation entry forever and hang
/// the upstream pipe indefinitely. Real-world causes: PLC drops a response, network
/// packet loss, backend that mis-echoes MBAP TxIds.</para>
/// </summary>
private async Task RunRequestTimeoutWatchdogAsync(CancellationToken ct)
{
// Tick at ~quarter of the request timeout for responsive cleanup, but cap to a
// 100 ms floor so the watchdog doesn't busy-wake on very small timeouts.
int tickMs = Math.Max(100, _connectionOptions.BackendRequestTimeoutMs / 4);
try
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(tickMs, ct).ConfigureAwait(false);
var threshold = DateTimeOffset.UtcNow.AddMilliseconds(-_connectionOptions.BackendRequestTimeoutMs);
var stale = _correlation.SnapshotOlderThan(threshold);
if (stale.Count == 0) continue;
foreach (var kvp in stale)
{
ushort proxyTxId = kvp.Key;
// Try to claim the entry; if another path (response, cascade) already removed it,
// skip — no work to do.
if (!_correlation.TryRemove(proxyTxId, out var req))
continue;
_allocator.Release(proxyTxId);
// Keepalive heartbeat that never came back. The backend is no longer
// answering Modbus even though the socket may still look connected —
// tear it down proactively (cascading every attached pipe) so the
// failure is found here, during idle, instead of corrupting the next
// real client request. There is no upstream party to send a 0x0B to.
if (req.IsHeartbeat)
{
long hbElapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds;
KeepaliveLogEvents.HeartbeatTimeout(_logger, _plc.Name, proxyTxId, hbElapsedMs);
_ctx.Counters.IncrementBackendHeartbeatFailed();
_ctx.Counters.IncrementBackendIdleDisconnect();
KeepaliveLogEvents.BackendIdleDisconnect(_logger, _plc.Name, hbElapsedMs);
if (!_disposeCts.IsCancellationRequested)
_ = TearDownBackendAsync("keepalive heartbeat timeout", cascadeUpstreams: true);
continue;
}
// Also clear the coalescing-by-key entry. A late attach that raced
// in just before the watchdog claim will still receive the 0x0B
// exception via this entry's InterestedParties list (List<T>
// mutations happen before fan-out begins).
if (req.Fc is 0x03 or 0x04)
{
var coalKey = new CoalescingKey(req.UnitId, req.Fc, req.StartAddress, req.Qty);
_inFlightByKey.TryRemove(coalKey, out _);
}
long elapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds;
foreach (var party in req.InterestedParties)
{
MultiplexerLogEvents.RequestTimeout(
_logger, _plc.Name, proxyTxId, party.OriginalTxId, req.Fc, elapsedMs);
if (!party.Pipe.IsAlive)
continue;
// Deliver Modbus exception 0x0B (Gateway Target Device Failed To Respond)
// to the upstream client. This lets the client's library raise a clean
// ModbusException rather than hanging on a timeout.
byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, req.UnitId, req.Fc, exceptionCode: 0x0B);
try
{
await party.Pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
_ctx.Counters.AddBytes(up: 0, down: excFrame.Length);
}
catch
{
// Best-effort delivery; if the pipe is going down, the client
// discovers the failure through its own socket close path.
}
}
}
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
_logger.LogError(ex, "Request-timeout watchdog faulted: Plc={Plc}", _plc.Name);
}
}
// ── Backend keepalive heartbeat ───────────────────────────────────────────
/// <summary>
/// Backend keepalive heartbeat loop. Started alongside the writer/reader on each
/// successful connect and cancelled with them on teardown. While the backend socket
/// has been idle (no send or receive) for longer than
/// <see cref="KeepaliveOptions.BackendHeartbeatIdleMs"/>, it issues a synthetic FC03
/// qty=1 read so the path stays warm against middlebox idle-drop and a backend that is
/// connected-but-not-answering is detected here rather than on the next client request.
///
/// <para>The probe response is consumed by <see cref="RunBackendReaderAsync"/> (which
/// recognises <see cref="InFlightRequest.IsHeartbeat"/> and drops it); a probe that
/// never returns is timed out by <see cref="RunRequestTimeoutWatchdogAsync"/>, which
/// tears the backend down. The heartbeat keeps an <i>existing</i> backend warm — it
/// never resurrects a dead one (reconnect stays gated on the next upstream frame).</para>
/// </summary>
private async Task RunBackendHeartbeatAsync(CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
{
var ka = _keepaliveOptions();
int idleMs = Math.Max(1000, ka.BackendHeartbeatIdleMs);
// Tick at a quarter of the idle window so a freshly-elapsed idle period is
// noticed promptly, floored at 500 ms so the loop never busy-wakes.
int tickMs = Math.Max(500, idleMs / 4);
await Task.Delay(tickMs, ct).ConfigureAwait(false);
if (!ka.Enabled)
continue;
long lastTicks = Interlocked.Read(ref _lastBackendActivityTicks);
double idleElapsedMs =
(DateTime.UtcNow - new DateTime(lastTicks, DateTimeKind.Utc)).TotalMilliseconds;
if (idleElapsedMs < idleMs)
continue;
SendHeartbeat(ka);
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
_logger.LogError(ex, "Backend heartbeat loop faulted: Plc={Plc}", _plc.Name);
}
}
/// <summary>
/// Builds and enqueues one synthetic FC03 qty=1 heartbeat request onto the backend
/// outbound channel. The correlation entry is flagged <see cref="InFlightRequest.IsHeartbeat"/>
/// so the reader and watchdog treat it specially; it carries no interested parties and
/// bypasses the coalescing and cache paths entirely.
/// </summary>
private void SendHeartbeat(KeepaliveOptions ka)
{
// A saturated TxId space means the backend is busy (65,536 requests in flight),
// which is the opposite of idle — skip this tick rather than force a probe.
if (!_allocator.TryAllocate(out ushort proxyTxId))
return;
byte unitId = (byte)Volatile.Read(ref _lastSeenUnitId);
ushort address = (ushort)ka.BackendHeartbeatProbeAddress;
var inFlight = new InFlightRequest(
UnitId: unitId,
Fc: 0x03,
StartAddress: address,
Qty: 1,
InterestedParties: Array.Empty<InterestedParty>(),
SentAtUtc: DateTimeOffset.UtcNow,
ResolvedCacheTtlMs: 0,
IsHeartbeat: true);
if (!_correlation.TryAdd(proxyTxId, inFlight))
{
_allocator.Release(proxyTxId);
return;
}
byte[] frame = BuildHeartbeatFrame(proxyTxId, unitId, address);
// Non-blocking enqueue: if the channel is full the backend is not idle (a race), and
// if it is completed the backend is tearing down — either way, undo and skip.
if (!_outboundChannel.Writer.TryWrite(frame))
{
if (_correlation.TryRemove(proxyTxId, out _))
_allocator.Release(proxyTxId);
return;
}
_ctx.Counters.IncrementBackendHeartbeatSent();
KeepaliveLogEvents.HeartbeatSent(_logger, _plc.Name, proxyTxId, address);
}
/// <summary>
/// Builds a 12-byte MBAP-framed FC03 (Read Holding Registers) request reading one
/// register at <paramref name="address"/> — the keepalive heartbeat probe PDU.
/// </summary>
private static byte[] BuildHeartbeatFrame(ushort proxyTxId, byte unitId, ushort address)
{
// PDU = [fc=03][addrHi][addrLo][qtyHi][qtyLo]. MBAP length = UnitId(1) + PDU(5) = 6.
var frame = new byte[MbapFrame.HeaderSize + 5];
frame[0] = (byte)(proxyTxId >> 8);
frame[1] = (byte)(proxyTxId & 0xFF);
frame[2] = 0; frame[3] = 0; // ProtocolId
frame[4] = 0; frame[5] = 6; // Length
frame[6] = unitId;
frame[7] = 0x03; // FC03 Read Holding Registers
frame[8] = (byte)(address >> 8);
frame[9] = (byte)(address & 0xFF);
frame[10] = 0; frame[11] = 1; // Qty = 1
return frame;
}
// ── Helpers ───────────────────────────────────────────────────────────────
/// <summary>
/// Adapter exposing a <see cref="Cache.ResponseCache"/>'s Count / ApproximateBytes as
/// <see cref="ICacheStatsProvider"/> for the snapshot path. Kept as a sealed class so
/// the cache type itself doesn't need to take an interface dependency on
/// <see cref="ICacheStatsProvider"/>.
/// </summary>
private sealed class CacheStatsAdapter : ICacheStatsProvider
{
private readonly Cache.ResponseCache _cache;
public CacheStatsAdapter(Cache.ResponseCache cache) => _cache = cache;
public long EntryCount => _cache.Count;
public long ApproximateBytes => _cache.ApproximateBytes;
}
private static async Task<bool> FillAsync(
Socket socket, byte[] buf, int offset, int count, CancellationToken ct)
{
int remaining = count;
while (remaining > 0)
{
int n = await socket.ReceiveAsync(
buf.AsMemory(offset + (count - remaining), remaining),
SocketFlags.None, ct).ConfigureAwait(false);
if (n == 0) return false;
remaining -= n;
}
return true;
}
/// <summary>
/// Builds an MBAP-framed response from cached PDU bytes for the given upstream
/// party. The cache stores POST-rewriter PDU bodies (no MBAP); each hit stamps a
/// fresh MBAP header carrying the requesting party's original TxId so the response
/// looks indistinguishable from a fresh backend reply.
/// </summary>
private static byte[] BuildCacheHitFrame(ushort originalTxId, byte unitId, byte[] cachedPdu)
{
// Length field covers UnitId(1) + PDU body. Capped by Modbus spec at 253-byte PDU.
int pduLen = cachedPdu.Length;
ushort length = (ushort)(1 + pduLen);
var frame = new byte[MbapFrame.HeaderSize + pduLen];
frame[0] = (byte)(originalTxId >> 8);
frame[1] = (byte)(originalTxId & 0xFF);
frame[2] = 0; frame[3] = 0;
frame[4] = (byte)(length >> 8);
frame[5] = (byte)(length & 0xFF);
frame[6] = unitId;
Buffer.BlockCopy(cachedPdu, 0, frame, MbapFrame.HeaderSize, pduLen);
return frame;
}
private static byte[] BuildExceptionFrame(ushort originalTxId, byte unitId, byte fc, byte exceptionCode)
{
// Modbus exception PDU = [fc | 0x80][exceptionCode].
// MBAP length covers UnitId (1) + PDU (2) = 3.
var frame = new byte[MbapFrame.HeaderSize + 2];
frame[0] = (byte)(originalTxId >> 8);
frame[1] = (byte)(originalTxId & 0xFF);
frame[2] = 0; // ProtocolId
frame[3] = 0;
frame[4] = 0; // Length high
frame[5] = 3; // Length low: UnitId(1) + ExFc(1) + ExCode(1)
frame[6] = unitId;
frame[7] = (byte)(fc | 0x80);
frame[8] = exceptionCode;
return frame;
}
}