using System.Net;
using System.Net.Sockets;
using System.Threading.Channels;
using Mbproxy.Options;
namespace Mbproxy.Proxy.Multiplexing;
///
/// One accepted upstream client socket, exposed as an asynchronous frame pipe to the
/// owning . The pipe reads complete MBAP frames from the
/// upstream socket and hands each frame to a multiplexer-supplied onFrame callback;
/// it also exposes a write channel that the multiplexer drains to send response frames
/// back to the upstream client.
///
/// Lifecycle: constructed by on accept; attached
/// to the multiplexer; runs its read loop until the upstream socket closes, the pipe is
/// disposed, or the multiplexer cascades a backend disconnect.
///
/// Concurrency model: each pipe runs exactly two tasks — a read task and a
/// write task. The read task drives the multiplexer (one frame at a time, which preserves
/// the per-upstream-client one-in-flight invariant); the write task drains
/// and writes each frame to the socket. No third task ever
/// touches the socket.
///
/// One-in-flight-per-upstream: the read loop processes frames sequentially.
/// A multi-PDU-pipelined client would still get correct service because the multiplexer
/// can have multiple distinct OnFrame calls outstanding from different
/// upstream pipes; a single upstream cannot multi-PDU-pipeline itself.
///
internal sealed partial class UpstreamPipe : IAsyncDisposable
{
// Capacity 16: enough to buffer responses while the upstream's TCP send buffer drains,
// small enough that backpressure kicks in on a wedged consumer. Drop-on-fault behaviour
// applies — if the upstream is dead, _alive flips to false and pending writes are
// discarded by the multiplexer before they ever enter the channel.
private const int ResponseChannelCapacity = 16;
private readonly Socket _upstream;
private readonly ILogger _logger;
private readonly string _plcName;
private readonly Channel _responseChannel = Channel.CreateBounded(
new BoundedChannelOptions(ResponseChannelCapacity)
{
FullMode = BoundedChannelFullMode.Wait, // backpressure, not drop
SingleReader = true,
SingleWriter = false, // multiplexer adds; potential future paths too
});
// Internal CTS lets the multiplexer signal "drop this pipe now" without waiting for
// the upstream socket to close cleanly.
private readonly CancellationTokenSource _cts = new();
// Volatile so writes from DisposeAsync are observed by IsAlive / TrySendResponse on
// other threads without a fence.
private volatile bool _disposed;
// Per-pipe forwarded-PDU counter. Read by the status page.
private long _pdusForwardedCount;
/// Stable identity for status-page reporting and cascade cleanup.
public Guid Id { get; } = Guid.NewGuid();
/// The upstream client's remote endpoint, captured at construction.
public IPEndPoint? RemoteEp { get; }
/// UTC time at which the upstream socket was accepted.
public DateTimeOffset ConnectedAtUtc { get; } = DateTimeOffset.UtcNow;
///
/// Number of request PDUs read from this upstream and forwarded into the multiplexer.
/// Incremented by after each successful frame parse.
///
public long PdusForwardedCount => Interlocked.Read(ref _pdusForwardedCount);
///
/// true while the pipe's read+write tasks are running. Flips to false
/// on disposal or any fault on either direction.
///
public bool IsAlive => !_disposed && !_cts.IsCancellationRequested;
public UpstreamPipe(Socket upstream, string plcName, ILogger logger, KeepaliveOptions? keepalive = null)
{
_upstream = upstream;
_upstream.NoDelay = true;
// Enable OS TCP keepalive on the accepted client socket so a half-open/dead
// client (gone without a TCP FIN) faults the read loop and is reaped, instead of
// leaking a pipe + correlation slots until the proxy next tries to write to it.
if (keepalive is not null)
SocketKeepalive.Apply(_upstream, keepalive);
RemoteEp = upstream.RemoteEndPoint as IPEndPoint;
_plcName = plcName;
_logger = logger;
string remoteStr = RemoteEp?.ToString() ?? "?";
MultiplexerLogEvents.ClientConnected(_logger, _plcName, remoteStr);
}
///
/// Runs the read side of the pipe. Reads complete MBAP frames from the upstream
/// socket and invokes for each. Returns when:
///
/// - The upstream closes cleanly (clean EOF on the first byte of a frame).
/// - The pipe is disposed (CTS fires).
/// - An exception is thrown by .
///
///
/// The frame buffer is owned by this loop; receives
/// a fresh [] each call (the multiplexer needs to retain a copy to
/// build , so we don't try to share the buffer).
///
public async Task RunReadLoopAsync(
Func onFrame,
CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
var token = linked.Token;
// 7-byte header + max 253-byte PDU body = 260 bytes per frame.
byte[] headerBuf = new byte[MbapFrame.HeaderSize];
try
{
while (!token.IsCancellationRequested)
{
// Read the 7-byte MBAP header.
if (!await FillAsync(_upstream, headerBuf, 0, MbapFrame.HeaderSize, token).ConfigureAwait(false))
return; // clean EOF — upstream went away.
if (!MbapFrame.TryParseHeader(headerBuf.AsSpan(),
out _, out _, out ushort length, out _))
return;
if (length < 2)
{
// A valid MBAP Length covers at least UnitId(1) + FC(1) = 2 bytes. A
// frame claiming less is malformed Modbus — there is no FC to route on
// and no PDU to forward. Close the upstream rather than allocate a
// proxy TxId and push a 7-byte garbage frame at the backend (review N1).
_logger.LogWarning(
"Malformed upstream frame: Plc={Plc} MbapLength={Length} < 2 — closing pipe",
_plcName, length);
return;
}
int pduBodyLen = length - 1;
if (pduBodyLen > MbapFrame.MaxPduBodySize)
{
// Frame too large for the buffer — close the upstream.
_logger.LogWarning(
"Oversized upstream frame: Plc={Plc} PduBody={Body} > Max={Max}",
_plcName, pduBodyLen, MbapFrame.MaxPduBodySize);
return;
}
// Allocate a fresh frame buffer per PDU; the multiplexer retains it.
byte[] frame = new byte[MbapFrame.HeaderSize + pduBodyLen];
Buffer.BlockCopy(headerBuf, 0, frame, 0, MbapFrame.HeaderSize);
if (!await FillAsync(_upstream, frame, MbapFrame.HeaderSize, pduBodyLen, token)
.ConfigureAwait(false))
return;
Interlocked.Increment(ref _pdusForwardedCount);
await onFrame(frame, token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Normal shutdown.
}
catch (SocketException)
{
// Upstream socket closed by remote end — normal.
}
catch (ObjectDisposedException)
{
// Socket disposed by write loop or DisposeAsync — normal.
}
}
///
/// Runs the write side of the pipe. Drains and writes
/// each frame to the upstream socket. Returns when the channel completes or the
/// upstream socket fails.
///
public async Task RunWriteLoopAsync(CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
var token = linked.Token;
try
{
await foreach (var frame in _responseChannel.Reader.ReadAllAsync(token).ConfigureAwait(false))
{
await SendAllAsync(_upstream, frame.AsMemory(), token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Normal shutdown.
}
catch (SocketException)
{
// Upstream remote closed — normal.
}
catch (ObjectDisposedException)
{
// Socket disposed elsewhere — normal.
}
}
///
/// Enqueues for delivery on the upstream socket. Returns
/// without blocking when the pipe is no longer alive (the multiplexer will discover
/// the dead pipe on its next correlation lookup and drop responses bound for it).
///
public async ValueTask SendResponseAsync(byte[] frame, CancellationToken ct)
{
if (!IsAlive)
return;
try
{
await _responseChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// Pipe disposed mid-write — drop silently.
}
catch (OperationCanceledException)
{
// Caller cancelled — drop silently.
}
}
///
/// Non-blocking response enqueue. Returns true when the frame was queued for
/// delivery, false when the pipe is dead OR the response channel is full.
/// Used by the per-PLC backend reader's fan-out loop so a single wedged upstream
/// cannot stall responses to peers sharing the same backend socket — without this, a
/// full _responseChannel on one pipe would block the reader task.
///
/// A false return indicates the frame is the multiplexer's responsibility
/// to drop and (optionally) account for via a counter. The wedged upstream's socket
/// will eventually time out and close on its own; its read loop will then dispose the
/// pipe and the multiplexer's correlation/coalescing entries will be reaped naturally.
///
public bool TrySendResponse(byte[] frame)
{
if (!IsAlive)
return false;
return _responseChannel.Writer.TryWrite(frame);
}
///
/// Closes the pipe: cancels the read+write loops and shuts down the socket. Idempotent.
///
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
try { _responseChannel.Writer.TryComplete(); } catch { /* already complete */ }
await _cts.CancelAsync().ConfigureAwait(false);
try { _upstream.Shutdown(SocketShutdown.Both); } catch { /* already closed */ }
_upstream.Dispose();
_cts.Dispose();
string remoteStr = RemoteEp?.ToString() ?? "?";
MultiplexerLogEvents.ClientDisconnected(_logger, _plcName, remoteStr, "Pipe disposed");
}
// ── Low-level I/O helpers ─────────────────────────────────────────────────────
private static async Task FillAsync(
Socket socket, byte[] buf, int offset, int count, CancellationToken ct)
{
int remaining = count;
while (remaining > 0)
{
int received = await socket.ReceiveAsync(
buf.AsMemory(offset + (count - remaining), remaining),
SocketFlags.None,
ct).ConfigureAwait(false);
// Clean EOF (pre-frame or mid-frame) — caller treats both the same.
if (received == 0)
return false;
remaining -= received;
}
return true;
}
private static async Task SendAllAsync(Socket socket, Memory memory, CancellationToken ct)
{
while (memory.Length > 0)
{
int sent = await socket.SendAsync(memory, SocketFlags.None, ct).ConfigureAwait(false);
if (sent == 0) throw new SocketException((int)SocketError.ConnectionReset);
memory = memory[sent..];
}
}
}