mbproxy: initial commit through Phase 9 (TxId multiplexing)

Adds the mbproxy service end-to-end. Phases 00-08 implement the
production-ready single-listener / 1:1-backend transparent Modbus TCP
proxy with bidirectional BCD rewriting for the ~54-PLC DL205/DL260
fleet. Phase 9 replaces the connection layer with a single backend
socket per PLC plus MBAP TxId rewriting, lifting the H2-ECOM100's
4-concurrent-client cap as an operational ceiling.

Phase 9 additions of note:
- PlcMultiplexer + UpstreamPipe + TxIdAllocator + CorrelationMap
- InFlightRequest with IReadOnlyList<InterestedParty> (load-bearing
  for Phase 10 read coalescing — do not collapse to a single field)
- Per-request watchdog: surfaces Modbus exception 0x0B to upstream
  on BackendRequestTimeoutMs, defending against lost responses,
  dead-PLC paths, and pymodbus 3.13.0's concurrent-multiplexed-
  request bug (its ServerRequestHandler.last_pdu state race)
- Status DTO + HTML gain inFlight / maxInFlight / txIdWraps /
  disconnectCascades / queueDepth (Tier 1.6 in docs/kpi.md)

Tests: 263 unit + 38 E2E. Multiplexer correctness under truly
concurrent backend traffic is proved against a stub backend in
PlcMultiplexerTests; MultiplexerE2ETests paces requests so pymodbus
3.13's single-PDU framer stays in known-good mode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-14 01:49:35 -04:00
parent 2e937228a0
commit 56eee3c563
105 changed files with 18430 additions and 0 deletions
@@ -0,0 +1,82 @@
using System.Collections.Concurrent;
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Maps a proxy-assigned MBAP TxId → <see cref="InFlightRequest"/>. The multiplexer's
/// per-upstream <c>OnFrame</c> path adds entries; the backend reader task removes them
/// when the matching response arrives.
///
/// <para>Backed by <see cref="ConcurrentDictionary{TKey, TValue}"/>. The single-writer /
/// single-remover pattern in Phase 9 does not strictly require it — but cascade-on-
/// disconnect walks the map from a separate task and Phase 10 adds upstream-side
/// cancellation paths, so the safer primitive is worth the negligible cost.</para>
/// </summary>
internal sealed class CorrelationMap
{
private readonly ConcurrentDictionary<ushort, InFlightRequest> _entries = new();
/// <summary>
/// Adds <paramref name="req"/> under <paramref name="proxyTxId"/>. Returns <c>false</c>
/// if a request was already stored under that key — which would be a programming
/// error (the allocator should never hand out the same key twice while it is still
/// in flight). Callers should treat <c>false</c> as a fatal contract violation and
/// drop the upstream connection.
/// </summary>
public bool TryAdd(ushort proxyTxId, InFlightRequest req)
=> _entries.TryAdd(proxyTxId, req);
/// <summary>
/// Removes the entry under <paramref name="proxyTxId"/>. Returns <c>false</c> when
/// no entry exists (which is normal for cascade cleanup and for stale-response paths).
/// </summary>
public bool TryRemove(ushort proxyTxId, out InFlightRequest req)
=> _entries.TryRemove(proxyTxId, out req!);
/// <summary>Number of currently-in-flight requests.</summary>
public int Count => _entries.Count;
/// <summary>
/// Returns a point-in-time copy of all in-flight requests. Allocates a list; intended
/// for diagnostics (cascade walk on backend disconnect; future drain-on-shutdown).
/// </summary>
public IReadOnlyCollection<InFlightRequest> Snapshot()
{
// ConcurrentDictionary.Values is a snapshot-safe enumerable; materialise to
// detach from the live dictionary and give callers a stable view.
return _entries.Values.ToArray();
}
/// <summary>
/// Returns and removes every entry. Used by the multiplexer's cascade path when the
/// backend socket dies — the multiplexer must close every interested upstream pipe
/// and free every allocated proxy TxId.
/// </summary>
public IReadOnlyList<KeyValuePair<ushort, InFlightRequest>> DrainAll()
{
var drained = new List<KeyValuePair<ushort, InFlightRequest>>(_entries.Count);
foreach (var kvp in _entries)
{
if (_entries.TryRemove(kvp.Key, out var req))
drained.Add(new KeyValuePair<ushort, InFlightRequest>(kvp.Key, req));
}
return drained;
}
/// <summary>
/// Returns a snapshot of (proxyTxId, InFlightRequest) pairs whose <see cref="InFlightRequest.SentAtUtc"/>
/// is older than <paramref name="threshold"/>. Allocates a list; intended for the
/// periodic per-request timeout watchdog only. The entries are NOT removed by this
/// call — the caller decides which to time out.
/// </summary>
public IReadOnlyList<KeyValuePair<ushort, InFlightRequest>> SnapshotOlderThan(DateTimeOffset threshold)
{
var stale = new List<KeyValuePair<ushort, InFlightRequest>>();
foreach (var kvp in _entries)
{
if (kvp.Value.SentAtUtc <= threshold)
stale.Add(new KeyValuePair<ushort, InFlightRequest>(kvp.Key, kvp.Value));
}
return stale;
}
}
@@ -0,0 +1,41 @@
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// One upstream party interested in a single backend round-trip. Carries the upstream
/// pipe to deliver the response to AND the original MBAP TxId that the party sent — the
/// multiplexer must rewrite the response's MBAP TxId back to <see cref="OriginalTxId"/>
/// before handing the frame to the pipe, so each upstream sees the proxy as transparent.
///
/// <para><b>Phase 9 invariant:</b> exactly one <see cref="InterestedParty"/> per
/// <see cref="InFlightRequest"/>. <b>Phase 10 (read coalescing)</b> reuses this exact
/// shape to fan-out a single backend response to multiple upstream parties. Do not
/// collapse this into a single field on <see cref="InFlightRequest"/>.</para>
/// </summary>
internal sealed record InterestedParty(UpstreamPipe Pipe, ushort OriginalTxId);
/// <summary>
/// Per-backend-request correlation record. Stored in <see cref="CorrelationMap"/> keyed
/// by the proxy-assigned TxId; looked up by the backend reader task to:
/// <list type="bullet">
/// <item><description>Restore each interested party's original MBAP TxId before forwarding
/// the response upstream (transparent multiplexing contract).</description></item>
/// <item><description>Provide the BCD rewriter with the originating request's
/// <c>StartAddress</c> / <c>Qty</c> for FC03/FC04 response decoding — the response
/// PDU itself does not carry the start address.</description></item>
/// <item><description>Measure backend round-trip time via <see cref="SentAtUtc"/>
/// (replaces the per-pair stopwatch slot from the 1:1 model).</description></item>
/// </list>
///
/// <para><b>Phase 9:</b> <see cref="InterestedParties"/> always has exactly one element.
/// The list shape is the load-bearing seam that <b>Phase 10 — read coalescing</b> hooks
/// into to fan out a single PLC response to multiple upstream clients without further
/// refactor of the multiplexer's data model. Reviewer note: do <i>not</i> simplify back
/// to a single <c>UpstreamPipe</c> field.</para>
/// </summary>
internal sealed record InFlightRequest(
byte UnitId,
byte Fc,
ushort StartAddress,
ushort Qty,
IReadOnlyList<InterestedParty> InterestedParties,
DateTimeOffset SentAtUtc);
@@ -0,0 +1,121 @@
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Source-generated <see cref="LoggerMessage"/> definitions for the TxId-multiplexing
/// connection layer. Event names are stable — do not rename without updating
/// docs/design.md's "Logging" event-name table.
/// </summary>
internal static partial class MultiplexerLogEvents
{
/// <summary>
/// Emitted once per upstream client accept. Replaces the per-pair
/// <c>mbproxy.client.connected</c> event from the 1:1 model (same event name,
/// same property shape — operators' log queries are unchanged).
/// </summary>
[LoggerMessage(
EventId = 110,
EventName = "mbproxy.client.connected",
Level = LogLevel.Information,
Message = "Client connected: Plc={Plc} RemoteEp={RemoteEp}")]
public static partial void ClientConnected(
ILogger logger,
string plc,
string remoteEp);
/// <summary>
/// Emitted when an upstream pipe is closed (clean disconnect, fault, or cascade).
/// </summary>
[LoggerMessage(
EventId = 111,
EventName = "mbproxy.client.disconnected",
Level = LogLevel.Information,
Message = "Client disconnected: Plc={Plc} RemoteEp={RemoteEp} Reason={Reason}")]
public static partial void ClientDisconnected(
ILogger logger,
string plc,
string remoteEp,
string reason);
/// <summary>
/// Emitted when the multiplexer successfully opens its single backend connection to a PLC.
/// </summary>
[LoggerMessage(
EventId = 112,
EventName = "mbproxy.multiplex.backend.connected",
Level = LogLevel.Information,
Message = "Backend multiplex connection up: Plc={Plc} Host={Host} Port={Port}")]
public static partial void BackendConnected(
ILogger logger,
string plc,
string host,
int port);
/// <summary>
/// Emitted when the multiplexer cascades a backend disconnect to all attached upstream
/// clients. <c>UpstreamCount</c> is the number of upstream pipes that were closed and
/// <c>InFlightCount</c> is the number of in-flight requests dropped.
/// </summary>
[LoggerMessage(
EventId = 113,
EventName = "mbproxy.multiplex.backend.disconnected",
Level = LogLevel.Warning,
Message = "Backend multiplex connection down: Plc={Plc} UpstreamCount={UpstreamCount} InFlightCount={InFlightCount} Reason={Reason}")]
public static partial void BackendDisconnected(
ILogger logger,
string plc,
int upstreamCount,
int inFlightCount,
string reason);
/// <summary>
/// Emitted once when the TxId allocator refuses to allocate — every slot in the 16-bit
/// space is currently in flight. The multiplexer responds to the upstream with a
/// Modbus exception (code 04 / Slave Device Failure). Realistically unreachable under
/// normal load (ECOM serializes at ~2-10 ms per request); a stress-only path.
/// </summary>
[LoggerMessage(
EventId = 114,
EventName = "mbproxy.multiplex.saturated",
Level = LogLevel.Error,
Message = "Multiplexer TxId space saturated — returning exception 04 to upstream: Plc={Plc} RemoteEp={RemoteEp}")]
public static partial void Saturated(
ILogger logger,
string plc,
string remoteEp);
/// <summary>
/// Emitted when the backend connect Polly pipeline fails. Mirrors the existing
/// <c>mbproxy.backend.failed</c> event from the 1:1 model so operators' alerts keep
/// working unchanged after Phase 9.
/// </summary>
[LoggerMessage(
EventId = 115,
EventName = "mbproxy.backend.failed",
Level = LogLevel.Warning,
Message = "Backend connect failed: Plc={Plc} Reason={Reason}")]
public static partial void BackendFailed(
ILogger logger,
string plc,
string reason);
/// <summary>
/// Emitted when the per-request watchdog times out an in-flight request whose response
/// never arrived within <c>BackendRequestTimeoutMs</c>. The upstream party receives a
/// Modbus exception (code 0x0B / Gateway Target Device Failed To Respond) and the
/// proxy TxId is freed. Causes include: PLC dropped the response, network packet loss,
/// or a backend that echoes the wrong MBAP TxId (e.g. pymodbus 3.13.0's
/// concurrent-multiplexed-request bug).
/// </summary>
[LoggerMessage(
EventId = 116,
EventName = "mbproxy.multiplex.request.timeout",
Level = LogLevel.Warning,
Message = "In-flight request timed out: Plc={Plc} ProxyTxId={ProxyTxId} OriginalTxId={OriginalTxId} Fc={Fc} ElapsedMs={ElapsedMs}")]
public static partial void RequestTimeout(
ILogger logger,
string plc,
ushort proxyTxId,
ushort originalTxId,
byte fc,
long elapsedMs);
}
@@ -0,0 +1,664 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading.Channels;
using Mbproxy.Options;
using Polly;
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Owner of the single backend TCP connection to one PLC. Multiplexes many
/// <see cref="UpstreamPipe"/> instances onto that one socket by rewriting MBAP transaction
/// IDs so concurrent in-flight requests from different upstream clients remain
/// distinguishable on the shared wire. The multiplexer:
///
/// <list type="bullet">
/// <item><description>Opens and re-opens the backend socket through a Polly retry pipeline
/// that matches the <see cref="ResilienceOptions.BackendConnect"/> profile.</description></item>
/// <item><description>Runs one backend writer task that drains <see cref="_outboundChannel"/>
/// into the backend socket (single writer; no socket-level synchronisation needed).</description></item>
/// <item><description>Runs one backend reader task that decodes MBAP frames from the backend,
/// looks each frame up in the <see cref="CorrelationMap"/>, restores each interested
/// party's original TxId, and hands the frame to that party's
/// <see cref="UpstreamPipe._responseChannel"/>.</description></item>
/// <item><description>Cascades a backend disconnect by closing every attached pipe and
/// freeing every allocated proxy TxId, then waits for the next upstream request to
/// arrive (which triggers a fresh backend connect via Polly).</description></item>
/// </list>
///
/// <para><b>Threading invariants:</b> a single backend writer touches the backend socket
/// for sends; a single backend reader touches the same socket for receives. Per-upstream
/// read tasks call <see cref="OnUpstreamFrameAsync"/>, which allocates a proxy TxId, queues
/// the request frame into <see cref="_outboundChannel"/>, and returns. Upstream-side writes
/// flow through each pipe's response channel — never directly through this class.</para>
///
/// <para><b>Lifecycle:</b> the multiplexer is created with the backend offline. The first
/// <see cref="OnUpstreamFrameAsync"/> call (or the first <see cref="Attach"/> if you prefer
/// eager-start) triggers backend connect through the Polly pipeline. Subsequent in-flight
/// requests reuse the same socket. <see cref="DisposeAsync"/> tears down the backend
/// socket, the writer/reader tasks, and every attached pipe.</para>
/// </summary>
internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvider
{
private const int OutboundChannelCapacity = 256;
private readonly PlcOptions _plc;
private readonly ConnectionOptions _connectionOptions;
private readonly IPduPipeline _pipeline;
private readonly PerPlcContext _ctx;
private readonly ILogger<PlcMultiplexer> _logger;
private readonly ResiliencePipeline? _backendConnectPipeline;
private readonly TxIdAllocator _allocator = new();
private readonly CorrelationMap _correlation = new();
private readonly Channel<byte[]> _outboundChannel = Channel.CreateBounded<byte[]>(
new BoundedChannelOptions(OutboundChannelCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
});
// Attached pipes — Phase 9 needs the list for the status page; Phase 10 will need it for
// coalescing (fan-out). ConcurrentDictionary keyed on UpstreamPipe.Id for O(1) detach.
private readonly ConcurrentDictionary<Guid, UpstreamPipe> _pipes = new();
// Lifecycle plumbing. Backend tasks share a CTS; cascading disconnect cancels it,
// which terminates both the writer and reader tasks. The next call to
// EnsureBackendConnectedAsync constructs a fresh CTS and a fresh backend socket.
private readonly object _backendLock = new();
private Socket? _backendSocket;
private CancellationTokenSource? _backendCts;
private Task? _backendWriterTask;
private Task? _backendReaderTask;
private readonly CancellationTokenSource _disposeCts = new();
private bool _disposed;
private Task? _watchdogTask;
public PlcMultiplexer(
PlcOptions plc,
ConnectionOptions connectionOptions,
IPduPipeline pipeline,
PerPlcContext perPlcContext,
ILogger<PlcMultiplexer> logger,
ResiliencePipeline? backendConnectPipeline = null)
{
_plc = plc;
_connectionOptions = connectionOptions;
_pipeline = pipeline;
_ctx = perPlcContext;
_logger = logger;
_backendConnectPipeline = backendConnectPipeline;
// Register this multiplexer as the live telemetry source for the PLC's counters.
_ctx.Counters.SetMultiplexProvider(this);
// Spin up the per-request timeout watchdog. It scans the correlation map at a fixed
// interval and times out any in-flight request older than BackendRequestTimeoutMs.
// Critical for: lost responses, dead-PLC paths, and backends that mis-echo TxIds
// (e.g. pymodbus 3.13.0's concurrent-multiplexed-request bug — see test files).
_watchdogTask = Task.Run(() => RunRequestTimeoutWatchdogAsync(_disposeCts.Token), CancellationToken.None);
}
// ── IMultiplexCountersProvider ────────────────────────────────────────────
public long InFlightCount => _allocator.InFlightCount;
public long TxIdWraps => _allocator.WrapCount;
public long BackendQueueDepth => _outboundChannel.Reader.Count;
// ── Public surface ────────────────────────────────────────────────────────
/// <summary>
/// Read-only collection of currently-attached upstream pipes. Used by the status page.
/// </summary>
public IReadOnlyCollection<UpstreamPipe> AttachedPipes => _pipes.Values.ToArray();
/// <summary>
/// Attaches an upstream pipe to this multiplexer. The caller is responsible for
/// running the pipe's read+write loops (typically via <see cref="StartPipeAsync"/>)
/// which wires the pipe's OnFrame callback back into <see cref="OnUpstreamFrameAsync"/>.
/// </summary>
public void Attach(UpstreamPipe pipe)
{
if (_disposed)
throw new ObjectDisposedException(nameof(PlcMultiplexer));
_pipes[pipe.Id] = pipe;
}
/// <summary>
/// Starts the read+write tasks for <paramref name="pipe"/> and returns a task that
/// completes when the pipe's read loop ends. The multiplexer detaches the pipe when
/// its read loop returns.
/// </summary>
public Task StartPipeAsync(UpstreamPipe pipe, CancellationToken ct)
{
Attach(pipe);
// The write loop runs to completion when the pipe is disposed or the channel
// completes. We don't await it directly — it's joined inside DisposeAsync of the pipe.
_ = Task.Run(() => pipe.RunWriteLoopAsync(ct), CancellationToken.None);
var readLoop = pipe.RunReadLoopAsync(
(frame, frameCt) => OnUpstreamFrameAsync(pipe, frame, frameCt),
ct);
// When the pipe's read loop finishes, detach it. Don't dispose it here; the
// listener (or the cascade walker) owns disposal.
_ = readLoop.ContinueWith(prev =>
{
_pipes.TryRemove(pipe.Id, out _);
}, TaskScheduler.Default);
return readLoop;
}
/// <summary>
/// Tears down the multiplexer: closes the backend connection, cancels both backend
/// tasks, drains every in-flight correlation entry, and closes every attached pipe.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
// Stop the counters provider link so a status snapshot during teardown doesn't
// see live-but-soon-to-be-empty internal state.
_ctx.Counters.SetMultiplexProvider(null);
await _disposeCts.CancelAsync().ConfigureAwait(false);
// Best-effort join the watchdog so its in-flight log/dispatch settles before tests
// assert on counter state.
if (_watchdogTask is not null)
{
try { await _watchdogTask.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); }
catch { /* swallow */ }
}
await TearDownBackendAsync("disposing", cascadeUpstreams: true).ConfigureAwait(false);
_outboundChannel.Writer.TryComplete();
// Dispose all attached pipes.
foreach (var pipe in _pipes.Values)
{
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
}
_pipes.Clear();
_disposeCts.Dispose();
}
// ── Backend connect / teardown ────────────────────────────────────────────
private async Task<bool> EnsureBackendConnectedAsync(CancellationToken ct)
{
if (_disposed) return false;
// Fast path: already connected.
if (_backendSocket is { Connected: true } && _backendCts is { IsCancellationRequested: false })
return true;
// Serialise concurrent connect attempts from many upstream pipes.
await _connectGate.WaitAsync(ct).ConfigureAwait(false);
try
{
// Re-check after acquiring the gate.
if (_backendSocket is { Connected: true } && _backendCts is { IsCancellationRequested: false })
return true;
// Build a fresh backend socket and Polly-connect.
var backend = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
try
{
if (_backendConnectPipeline is not null)
{
await _backendConnectPipeline.ExecuteAsync(async attemptToken =>
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(attemptToken);
cts.CancelAfter(_connectionOptions.BackendConnectTimeoutMs);
await backend.ConnectAsync(_plc.Host, _plc.Port, cts.Token).ConfigureAwait(false);
}, ct).ConfigureAwait(false);
}
else
{
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
connectCts.CancelAfter(_connectionOptions.BackendConnectTimeoutMs);
await backend.ConnectAsync(_plc.Host, _plc.Port, connectCts.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
string reason = ex is OperationCanceledException
? $"Backend connect timed out or cancelled after {_connectionOptions.BackendConnectTimeoutMs} ms"
: ex.Message;
MultiplexerLogEvents.BackendFailed(_logger, _plc.Name, reason);
_ctx.Counters.IncrementConnectFailed();
backend.Dispose();
return false;
}
// Successful connect. Wire up the backend tasks.
var cts2 = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token);
lock (_backendLock)
{
_backendSocket = backend;
_backendCts = cts2;
_backendWriterTask = Task.Run(() => RunBackendWriterAsync(backend, cts2.Token), CancellationToken.None);
_backendReaderTask = Task.Run(() => RunBackendReaderAsync(backend, cts2.Token), CancellationToken.None);
}
_ctx.Counters.IncrementConnectSuccess();
MultiplexerLogEvents.BackendConnected(_logger, _plc.Name, _plc.Host, _plc.Port);
return true;
}
finally
{
_connectGate.Release();
}
}
private readonly SemaphoreSlim _connectGate = new(1, 1);
private async Task TearDownBackendAsync(string reason, bool cascadeUpstreams)
{
Socket? oldSocket;
CancellationTokenSource? oldCts;
Task? writer, reader;
lock (_backendLock)
{
oldSocket = _backendSocket;
oldCts = _backendCts;
writer = _backendWriterTask;
reader = _backendReaderTask;
_backendSocket = null;
_backendCts = null;
_backendWriterTask = null;
_backendReaderTask = null;
}
if (oldSocket is null && oldCts is null) return;
try { oldCts?.Cancel(); } catch { /* best effort */ }
try { oldSocket?.Shutdown(SocketShutdown.Both); } catch { /* already closed */ }
try { oldSocket?.Dispose(); } catch { /* best effort */ }
// Drain correlation map; cascade-close every interested upstream pipe.
var dropped = _correlation.DrainAll();
var cascadeIds = new HashSet<Guid>();
foreach (var kvp in dropped)
{
_allocator.Release(kvp.Key);
foreach (var party in kvp.Value.InterestedParties)
cascadeIds.Add(party.Pipe.Id);
}
int upstreamCount = 0;
if (cascadeUpstreams)
{
// Close every attached pipe that had a request in flight; the others will
// simply re-issue on next request through a fresh backend connect.
// Per the design doc, ALL attached upstreams cascade on backend disconnect.
upstreamCount = _pipes.Count;
// Snapshot keys before disposal modifies the dictionary indirectly.
var pipeList = _pipes.Values.ToArray();
foreach (var pipe in pipeList)
{
try { await pipe.DisposeAsync().ConfigureAwait(false); }
catch { /* best effort */ }
}
_pipes.Clear();
_ctx.Counters.AddDisconnectCascades(upstreamCount);
}
// Best-effort join.
try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
oldCts?.Dispose();
if (upstreamCount > 0 || dropped.Count > 0)
MultiplexerLogEvents.BackendDisconnected(_logger, _plc.Name, upstreamCount, dropped.Count, reason);
}
// ── Backend writer / reader tasks ─────────────────────────────────────────
private async Task RunBackendWriterAsync(Socket backend, CancellationToken ct)
{
try
{
await foreach (var frame in _outboundChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
int sent = 0;
while (sent < frame.Length)
{
int n = await backend.SendAsync(
frame.AsMemory(sent, frame.Length - sent),
SocketFlags.None,
ct).ConfigureAwait(false);
if (n == 0) throw new SocketException((int)SocketError.ConnectionReset);
sent += n;
}
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
// Backend failure — cascade.
_ = TearDownBackendAsync($"writer fault: {ex.Message}", cascadeUpstreams: true);
}
}
private async Task RunBackendReaderAsync(Socket backend, CancellationToken ct)
{
byte[] headerBuf = new byte[MbapFrame.HeaderSize];
try
{
while (!ct.IsCancellationRequested)
{
if (!await FillAsync(backend, headerBuf, 0, MbapFrame.HeaderSize, ct).ConfigureAwait(false))
break;
if (!MbapFrame.TryParseHeader(headerBuf.AsSpan(),
out ushort proxyTxId, out _, out ushort length, out _))
break;
if (length < 1)
{
// Degenerate frame — drop.
continue;
}
int pduBodyLen = length - 1;
if (pduBodyLen > MbapFrame.MaxPduBodySize)
{
// Frame too large — backend is misbehaving; force teardown.
_logger.LogWarning(
"Oversized backend frame: Plc={Plc} PduBody={Body} > Max={Max}",
_plc.Name, pduBodyLen, MbapFrame.MaxPduBodySize);
break;
}
byte[] frame = new byte[MbapFrame.HeaderSize + pduBodyLen];
Buffer.BlockCopy(headerBuf, 0, frame, 0, MbapFrame.HeaderSize);
if (!await FillAsync(backend, frame, MbapFrame.HeaderSize, pduBodyLen, ct).ConfigureAwait(false))
break;
if (!_correlation.TryRemove(proxyTxId, out var inFlight))
{
// No correlation entry — either a stale response after cascade, or
// the PLC sent something unsolicited. Drop the frame.
continue;
}
// Free the allocator slot immediately so it can be reused.
_allocator.Release(proxyTxId);
// Update EWMA round-trip from when we sent the request.
long elapsedMs = (DateTimeOffset.UtcNow - inFlight.SentAtUtc).Ticks * 100; // 100 ns per tick
// UpdateRoundTripEwma expects Stopwatch ticks, but we have wall-clock.
// Convert ms back to Stopwatch ticks:
long ticks = (long)((double)(DateTimeOffset.UtcNow - inFlight.SentAtUtc).TotalSeconds * Stopwatch.Frequency);
if (ticks > 0)
_ctx.Counters.UpdateRoundTripEwma(ticks);
// Apply the BCD rewriter on the response. Build a per-call context clone
// that carries CurrentRequest so the rewriter can decode FC03/04 slots.
var responseCtx = _ctx.WithCurrentRequest(inFlight);
_pipeline.Process(
MbapDirection.ResponseToClient,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, pduBodyLen),
responseCtx);
// Fan out to each interested party with their original TxId restored.
// Phase 9: always exactly one party. Phase 10: N parties (read coalescing).
foreach (var party in inFlight.InterestedParties)
{
if (!party.Pipe.IsAlive)
continue;
// The frame buffer is private to this iteration; if there are multiple
// parties (Phase 10), each gets its own copy with its own original TxId
// patched in. Phase 9 always has Count == 1, so the single-buffer path
// is the common case; we copy to keep Phase-10 forward compatibility.
byte[] outFrame = inFlight.InterestedParties.Count == 1
? frame
: (byte[])frame.Clone();
outFrame[0] = (byte)(party.OriginalTxId >> 8);
outFrame[1] = (byte)(party.OriginalTxId & 0xFF);
await party.Pipe.SendResponseAsync(outFrame, ct).ConfigureAwait(false);
}
}
// Reader exited cleanly — backend closed by remote. Cascade.
_ = TearDownBackendAsync("backend reader EOF", cascadeUpstreams: true);
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
_ = TearDownBackendAsync($"reader fault: {ex.Message}", cascadeUpstreams: true);
}
}
// ── Upstream → multiplexer entry point ────────────────────────────────────
private async ValueTask OnUpstreamFrameAsync(UpstreamPipe pipe, byte[] frame, CancellationToken ct)
{
if (_disposed) return;
// Ensure backend is connected. Failure here means we cannot service the request;
// close the upstream pipe (consistent with the 1:1 model's behaviour on connect
// failure).
if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false))
{
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
return;
}
if (frame.Length < MbapFrame.HeaderSize)
return;
if (!MbapFrame.TryParseHeader(frame.AsSpan(0, MbapFrame.HeaderSize),
out ushort originalTxId, out _, out _, out byte unitId))
return;
if (!_allocator.TryAllocate(out ushort proxyTxId))
{
MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?");
// Synthesize Modbus exception 04 (Slave Device Failure).
byte fc = frame.Length > MbapFrame.HeaderSize ? frame[MbapFrame.HeaderSize] : (byte)0;
byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fc, exceptionCode: 4);
await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
return;
}
// Parse the PDU FC + start/qty (for FC03/04) so the response decoder has the
// correlation it needs.
int pduOffset = MbapFrame.HeaderSize;
byte fcByte = frame[pduOffset];
ushort startAddr = 0;
ushort qty = 0;
if (fcByte is 0x03 or 0x04 && frame.Length >= pduOffset + 5)
{
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
}
var inFlight = new InFlightRequest(
UnitId: unitId,
Fc: fcByte,
StartAddress: startAddr,
Qty: qty,
InterestedParties: [new InterestedParty(pipe, originalTxId)],
SentAtUtc: DateTimeOffset.UtcNow);
if (!_correlation.TryAdd(proxyTxId, inFlight))
{
// Should be impossible: the allocator just guaranteed proxyTxId is free.
_allocator.Release(proxyTxId);
_logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxId);
return;
}
// Peak in-flight tracking.
_ctx.Counters.ObserveInFlight(_allocator.InFlightCount);
// Apply the BCD rewriter on the request. Use a per-call context with CurrentRequest
// (the rewriter doesn't currently need it on request, but Phase 10 may).
var requestCtx = _ctx.WithCurrentRequest(inFlight);
_pipeline.Process(
MbapDirection.RequestToBackend,
frame.AsSpan(0, MbapFrame.HeaderSize),
frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize),
requestCtx);
// Overwrite the MBAP TxId with the proxy TxId.
frame[0] = (byte)(proxyTxId >> 8);
frame[1] = (byte)(proxyTxId & 0xFF);
// Enqueue for the backend writer task.
try
{
await _outboundChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// Channel completed during shutdown — release the proxy TxId.
if (_correlation.TryRemove(proxyTxId, out _))
_allocator.Release(proxyTxId);
}
}
// ── Per-request timeout watchdog ──────────────────────────────────────────
/// <summary>
/// Periodically scans the correlation map for in-flight requests whose response has
/// not arrived within <see cref="ConnectionOptions.BackendRequestTimeoutMs"/>. For each
/// stale entry: removes it from the map, frees its allocator slot, and delivers a
/// Modbus exception (code 0x0B / Gateway Target Device Failed To Respond) to each
/// interested party with the original TxId restored.
///
/// <para><b>Why this exists.</b> In the 1:1 connection model, a lost response would
/// fault the dedicated backend socket and the upstream pair would close. The multiplexed
/// model needs an explicit per-request timer because a single missing or mis-routed
/// response would otherwise leak a correlation entry forever and hang the upstream
/// pipe indefinitely. Real-world causes: PLC drops a response, network packet loss,
/// backend that mis-echoes MBAP TxIds.</para>
/// </summary>
private async Task RunRequestTimeoutWatchdogAsync(CancellationToken ct)
{
// Tick at ~quarter of the request timeout for responsive cleanup, but cap to a
// 1-second floor so the watchdog doesn't busy-wake on very small timeouts.
int tickMs = Math.Max(100, _connectionOptions.BackendRequestTimeoutMs / 4);
try
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(tickMs, ct).ConfigureAwait(false);
var threshold = DateTimeOffset.UtcNow.AddMilliseconds(-_connectionOptions.BackendRequestTimeoutMs);
var stale = _correlation.SnapshotOlderThan(threshold);
if (stale.Count == 0) continue;
foreach (var kvp in stale)
{
ushort proxyTxId = kvp.Key;
// Try to claim the entry; if another path (response, cascade) already removed it,
// skip — no work to do.
if (!_correlation.TryRemove(proxyTxId, out var req))
continue;
_allocator.Release(proxyTxId);
long elapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds;
foreach (var party in req.InterestedParties)
{
MultiplexerLogEvents.RequestTimeout(
_logger, _plc.Name, proxyTxId, party.OriginalTxId, req.Fc, elapsedMs);
if (!party.Pipe.IsAlive)
continue;
// Deliver Modbus exception 0x0B (Gateway Target Device Failed To Respond)
// to the upstream client. This lets the client's library raise a clean
// ModbusException rather than hanging on a timeout.
byte[] excFrame = BuildExceptionFrame(party.OriginalTxId, req.UnitId, req.Fc, exceptionCode: 0x0B);
try
{
await party.Pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false);
}
catch
{
// Best-effort delivery; if the pipe is going down, the client
// discovers the failure through its own socket close path.
}
}
}
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
_logger.LogError(ex, "Request-timeout watchdog faulted: Plc={Plc}", _plc.Name);
}
}
// ── Helpers ───────────────────────────────────────────────────────────────
private static async Task<bool> FillAsync(
Socket socket, byte[] buf, int offset, int count, CancellationToken ct)
{
int remaining = count;
while (remaining > 0)
{
int n = await socket.ReceiveAsync(
buf.AsMemory(offset + (count - remaining), remaining),
SocketFlags.None, ct).ConfigureAwait(false);
if (n == 0) return false;
remaining -= n;
}
return true;
}
private static byte[] BuildExceptionFrame(ushort originalTxId, byte unitId, byte fc, byte exceptionCode)
{
// Modbus exception PDU = [fc | 0x80][exceptionCode].
// MBAP length covers UnitId (1) + PDU (2) = 3.
var frame = new byte[MbapFrame.HeaderSize + 2];
frame[0] = (byte)(originalTxId >> 8);
frame[1] = (byte)(originalTxId & 0xFF);
frame[2] = 0; // ProtocolId
frame[3] = 0;
frame[4] = 0; // Length high
frame[5] = 3; // Length low: UnitId(1) + ExFc(1) + ExCode(1)
frame[6] = unitId;
frame[7] = (byte)(fc | 0x80);
frame[8] = exceptionCode;
return frame;
}
}
@@ -0,0 +1,142 @@
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Allocates 16-bit MBAP transaction IDs (proxy TxIds) used to multiplex many upstream
/// clients onto a single shared backend connection per PLC. The allocator tracks which
/// IDs are currently in flight and scans forward from a rolling cursor to find the next
/// free slot, mimicking the natural cadence of Modbus clients while keeping reuse
/// distance maximally large in steady state.
///
/// <para>State is protected by a single <see cref="object"/> lock. Contention is
/// negligible in practice — the allocator is per-PLC and one PLC's wire rate is bounded
/// by the controller's internal scan time (a few ms per request on an H2-ECOM100).
/// The lock is preferred over a lock-free approach for readability and worst-case
/// determinism (Polly retries, cascade cleanup, and saturation paths must not race).</para>
///
/// <para><b>Memory:</b> <c>bool[65536]</c> (~64 KB) per PLC. With ~54 PLCs that is
/// ~3.4 MB total — well within budget for a service that already ships at ~30 MB working
/// set under load.</para>
///
/// <para><b>Wrap counter:</b> increments every time the rolling cursor rolls over
/// 0xFFFF → 0x0000 during a successful allocation scan. Frequent wraps indicate either
/// very high churn or extreme in-flight depth and are surfaced as a telemetry signal,
/// not an error.</para>
/// </summary>
internal sealed class TxIdAllocator
{
// 65,536 slots total — the full uint16 space.
private const int SlotCount = 65536;
private readonly object _lock = new();
private readonly bool[] _inUse = new bool[SlotCount];
private ushort _next; // rolling cursor; 0 on construction
private int _inFlightCount; // 0..65536
private long _wrapCount; // monotonic; never resets
/// <summary>
/// Number of currently-in-flight proxy TxIds (i.e., allocated but not yet released).
/// Read under the same lock that mutates it; the snapshot is a simple atomic read of
/// an int but we still hold the lock for cross-field consistency with <c>_inUse</c>.
/// </summary>
public int InFlightCount
{
get
{
lock (_lock)
{
return _inFlightCount;
}
}
}
/// <summary>
/// Number of times the rolling cursor has wrapped 0xFFFF → 0x0000 during a
/// successful allocation since the allocator was constructed. Read without locking
/// via <see cref="Interlocked.Read"/> for the hot status-page path.
/// </summary>
public long WrapCount => Interlocked.Read(ref _wrapCount);
/// <summary>
/// Attempts to allocate the next free proxy TxId.
/// Returns <c>true</c> with <paramref name="id"/> set when an ID was allocated.
/// Returns <c>false</c> when every slot in the 16-bit space is currently in use;
/// the caller is responsible for emitting <c>mbproxy.multiplex.saturated</c> and
/// returning a Modbus exception (code 04 / Slave Device Failure) to the upstream.
/// </summary>
public bool TryAllocate(out ushort id)
{
lock (_lock)
{
if (_inFlightCount >= SlotCount)
{
id = 0;
return false;
}
// Scan forward from _next for the next free slot. _inFlightCount < SlotCount
// guarantees at least one free slot, so the loop terminates within at most
// SlotCount iterations even in the pathological full-minus-one case.
ushort start = _next;
ushort cursor = start;
do
{
if (!_inUse[cursor])
{
_inUse[cursor] = true;
_inFlightCount++;
// Advance the cursor; track wrap.
unchecked
{
ushort nextCursor = (ushort)(cursor + 1);
if (nextCursor == 0)
Interlocked.Increment(ref _wrapCount);
_next = nextCursor;
}
id = cursor;
return true;
}
unchecked
{
cursor = (ushort)(cursor + 1);
}
}
while (cursor != start);
// Defensive: should be unreachable given the InFlightCount check above.
id = 0;
return false;
}
}
/// <summary>
/// Releases a previously-allocated proxy TxId. Releasing an ID that is not currently
/// allocated is a no-op (defensive: cascade-on-disconnect can call <see cref="Release"/>
/// after a concurrent timeout path has already done so).
/// </summary>
public void Release(ushort id)
{
lock (_lock)
{
if (_inUse[id])
{
_inUse[id] = false;
_inFlightCount--;
}
}
}
/// <summary>
/// Test-only: returns whether the given proxy TxId is currently marked in use.
/// Internal so it remains usable from unit tests via InternalsVisibleTo.
/// </summary>
internal bool IsAllocated(ushort id)
{
lock (_lock)
{
return _inUse[id];
}
}
}
@@ -0,0 +1,281 @@
using System.Net;
using System.Net.Sockets;
using System.Threading.Channels;
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// One accepted upstream client socket, exposed as an asynchronous frame pipe to the
/// owning <see cref="PlcMultiplexer"/>. The pipe reads complete MBAP frames from the
/// upstream socket and hands each frame to a multiplexer-supplied <c>onFrame</c> callback;
/// it also exposes a write channel that the multiplexer drains to send response frames
/// back to the upstream client.
///
/// <para><b>Lifecycle:</b> constructed by <see cref="PlcListener"/> on accept; attached
/// to the multiplexer; runs its read loop until the upstream socket closes, the pipe is
/// disposed, or the multiplexer cascades a backend disconnect.</para>
///
/// <para><b>Concurrency model:</b> each pipe runs exactly two tasks — a read task and a
/// write task. The read task drives the multiplexer (one frame at a time, which preserves
/// the per-upstream-client one-in-flight invariant); the write task drains
/// <see cref="_responseChannel"/> and writes each frame to the socket. No third task ever
/// touches the socket.</para>
///
/// <para><b>One-in-flight-per-upstream:</b> the read loop processes frames sequentially.
/// A multi-PDU-pipelined client would still get correct service because the multiplexer
/// can have multiple distinct <c>OnFrame</c> calls outstanding from <i>different</i>
/// upstream pipes; a single upstream cannot multi-PDU-pipeline itself.</para>
/// </summary>
internal sealed partial class UpstreamPipe : IAsyncDisposable
{
// Capacity 16: enough to buffer responses while the upstream's TCP send buffer drains,
// small enough that backpressure kicks in on a wedged consumer. Drop-on-fault behaviour
// applies — if the upstream is dead, _alive flips to false and pending writes are
// discarded by the multiplexer before they ever enter the channel.
private const int ResponseChannelCapacity = 16;
private readonly Socket _upstream;
private readonly ILogger _logger;
private readonly string _plcName;
private readonly Channel<byte[]> _responseChannel = Channel.CreateBounded<byte[]>(
new BoundedChannelOptions(ResponseChannelCapacity)
{
FullMode = BoundedChannelFullMode.Wait, // backpressure, not drop
SingleReader = true,
SingleWriter = false, // multiplexer adds; potential future paths too
});
// Internal CTS lets the multiplexer signal "drop this pipe now" without waiting for
// the upstream socket to close cleanly.
private readonly CancellationTokenSource _cts = new();
private bool _disposed;
// Phase 9: per-pipe forwarded-PDU counter (replaces the per-pair counter from the
// 1:1 model). Read by the status page.
private long _pdusForwardedCount;
/// <summary>Stable identity for status-page reporting and cascade cleanup.</summary>
public Guid Id { get; } = Guid.NewGuid();
/// <summary>The upstream client's remote endpoint, captured at construction.</summary>
public IPEndPoint? RemoteEp { get; }
/// <summary>UTC time at which the upstream socket was accepted.</summary>
public DateTimeOffset ConnectedAtUtc { get; } = DateTimeOffset.UtcNow;
/// <summary>
/// Number of request PDUs read from this upstream and forwarded into the multiplexer.
/// Incremented by <see cref="RunReadLoopAsync"/> after each successful frame parse.
/// </summary>
public long PdusForwardedCount => Interlocked.Read(ref _pdusForwardedCount);
/// <summary>
/// <c>true</c> while the pipe's read+write tasks are running. Flips to <c>false</c>
/// on disposal or any fault on either direction.
/// </summary>
public bool IsAlive => !_disposed && !_cts.IsCancellationRequested;
public UpstreamPipe(Socket upstream, string plcName, ILogger logger)
{
_upstream = upstream;
_upstream.NoDelay = true;
RemoteEp = upstream.RemoteEndPoint as IPEndPoint;
_plcName = plcName;
_logger = logger;
string remoteStr = RemoteEp?.ToString() ?? "?";
MultiplexerLogEvents.ClientConnected(_logger, _plcName, remoteStr);
}
/// <summary>
/// Runs the read side of the pipe. Reads complete MBAP frames from the upstream
/// socket and invokes <paramref name="onFrame"/> for each. Returns when:
/// <list type="bullet">
/// <item><description>The upstream closes cleanly (clean EOF on the first byte of a frame).</description></item>
/// <item><description>The pipe is disposed (CTS fires).</description></item>
/// <item><description>An exception is thrown by <paramref name="onFrame"/>.</description></item>
/// </list>
///
/// <para>The frame buffer is owned by this loop; <paramref name="onFrame"/> receives
/// a fresh <see cref="byte"/>[] each call (the multiplexer needs to retain a copy to
/// build <see cref="InFlightRequest"/>, so we don't try to share the buffer).</para>
/// </summary>
public async Task RunReadLoopAsync(
Func<byte[], CancellationToken, ValueTask> onFrame,
CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
var token = linked.Token;
// 7-byte header + max 253-byte PDU body = 260 bytes per frame.
byte[] headerBuf = new byte[MbapFrame.HeaderSize];
try
{
while (!token.IsCancellationRequested)
{
// Read the 7-byte MBAP header.
if (!await FillAsync(_upstream, headerBuf, 0, MbapFrame.HeaderSize, token).ConfigureAwait(false))
return; // clean EOF — upstream went away.
if (!MbapFrame.TryParseHeader(headerBuf.AsSpan(),
out _, out _, out ushort length, out _))
return;
if (length < 1)
{
// Length field claims no body — forward the header alone via a fresh buffer.
byte[] degenerate = new byte[MbapFrame.HeaderSize];
Buffer.BlockCopy(headerBuf, 0, degenerate, 0, MbapFrame.HeaderSize);
await onFrame(degenerate, token).ConfigureAwait(false);
Interlocked.Increment(ref _pdusForwardedCount);
continue;
}
int pduBodyLen = length - 1;
if (pduBodyLen > MbapFrame.MaxPduBodySize)
{
// Frame too large for the buffer — close the upstream.
_logger.LogWarning(
"Oversized upstream frame: Plc={Plc} PduBody={Body} > Max={Max}",
_plcName, pduBodyLen, MbapFrame.MaxPduBodySize);
return;
}
// Allocate a fresh frame buffer per PDU; the multiplexer retains it.
byte[] frame = new byte[MbapFrame.HeaderSize + pduBodyLen];
Buffer.BlockCopy(headerBuf, 0, frame, 0, MbapFrame.HeaderSize);
if (!await FillAsync(_upstream, frame, MbapFrame.HeaderSize, pduBodyLen, token)
.ConfigureAwait(false))
return;
Interlocked.Increment(ref _pdusForwardedCount);
await onFrame(frame, token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Normal shutdown.
}
catch (SocketException)
{
// Upstream socket closed by remote end — normal.
}
catch (ObjectDisposedException)
{
// Socket disposed by write loop or DisposeAsync — normal.
}
}
/// <summary>
/// Runs the write side of the pipe. Drains <see cref="_responseChannel"/> and writes
/// each frame to the upstream socket. Returns when the channel completes or the
/// upstream socket fails.
/// </summary>
public async Task RunWriteLoopAsync(CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
var token = linked.Token;
try
{
await foreach (var frame in _responseChannel.Reader.ReadAllAsync(token).ConfigureAwait(false))
{
await SendAllAsync(_upstream, frame.AsMemory(), token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Normal shutdown.
}
catch (SocketException)
{
// Upstream remote closed — normal.
}
catch (ObjectDisposedException)
{
// Socket disposed elsewhere — normal.
}
}
/// <summary>
/// Enqueues <paramref name="frame"/> for delivery on the upstream socket. Returns
/// without blocking when the pipe is no longer alive (the multiplexer will discover
/// the dead pipe on its next correlation lookup and drop responses bound for it).
/// </summary>
public async ValueTask SendResponseAsync(byte[] frame, CancellationToken ct)
{
if (!IsAlive)
return;
try
{
await _responseChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// Pipe disposed mid-write — drop silently.
}
catch (OperationCanceledException)
{
// Caller cancelled — drop silently.
}
}
/// <summary>
/// Closes the pipe: cancels the read+write loops and shuts down the socket. Idempotent.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
try { _responseChannel.Writer.TryComplete(); } catch { /* already complete */ }
await _cts.CancelAsync().ConfigureAwait(false);
try { _upstream.Shutdown(SocketShutdown.Both); } catch { /* already closed */ }
_upstream.Dispose();
_cts.Dispose();
string remoteStr = RemoteEp?.ToString() ?? "?";
MultiplexerLogEvents.ClientDisconnected(_logger, _plcName, remoteStr, "Pipe disposed");
}
// ── Low-level I/O helpers ─────────────────────────────────────────────────────
private static async Task<bool> FillAsync(
Socket socket, byte[] buf, int offset, int count, CancellationToken ct)
{
int remaining = count;
bool firstRead = true;
while (remaining > 0)
{
int received = await socket.ReceiveAsync(
buf.AsMemory(offset + (count - remaining), remaining),
SocketFlags.None,
ct).ConfigureAwait(false);
if (received == 0)
return firstRead && remaining == count ? false : false;
remaining -= received;
firstRead = false;
}
return true;
}
private static async Task SendAllAsync(Socket socket, Memory<byte> memory, CancellationToken ct)
{
while (memory.Length > 0)
{
int sent = await socket.SendAsync(memory, SocketFlags.None, ct).ConfigureAwait(false);
if (sent == 0) throw new SocketException((int)SocketError.ConnectionReset);
memory = memory[sent..];
}
}
}