using System.Collections.Concurrent;
using System.Collections.Frozen;
using System.Net;
using System.Net.Sockets;
using Mbproxy.Bcd;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
///
/// Phase-10 unit tests for read coalescing against a stub backend (real sockets, no
/// simulator). The stub gives us deterministic control over backend response timing so
/// the "overlapping in-flight" window is large enough for late requests to actually
/// coalesce. The pymodbus simulator cannot be used here — its known concurrent-MBAP-frame
/// bug (see ) would invalidate the proxy-TxId echo path
/// that coalescing relies on.
///
[Trait("Category", "Unit")]
public sealed class ReadCoalescingTests
{
// ── Frame builders / readers ─────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static async Task ReadExactAsync(Socket s, int count, CancellationToken ct)
{
var buf = new byte[count];
int read = 0;
while (read < count)
{
int n = await s.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
if (n == 0) throw new IOException("EOF");
read += n;
}
return buf;
}
private static async Task ReadOneFrameAsync(Socket s, CancellationToken ct)
{
var header = await ReadExactAsync(s, 7, ct);
ushort length = (ushort)((header[4] << 8) | header[5]);
int bodyLen = length - 1;
var body = bodyLen > 0 ? await ReadExactAsync(s, bodyLen, ct) : Array.Empty();
var frame = new byte[7 + bodyLen];
Buffer.BlockCopy(header, 0, frame, 0, 7);
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
return frame;
}
private static byte[] BuildFc03(ushort txId, ushort start, ushort qty, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x03,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc04(ushort txId, ushort start, ushort qty, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x04,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc06(ushort txId, ushort addr, ushort value, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x06,
(byte)(addr >> 8), (byte)(addr & 0xFF),
(byte)(value >> 8), (byte)(value & 0xFF),
];
private static byte[] BuildFc03Response(ushort txId, byte unit, params ushort[] regs)
{
int bodyLen = 2 + regs.Length * 2;
var frame = new byte[7 + bodyLen];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
ushort len = (ushort)(1 + bodyLen);
frame[4] = (byte)(len >> 8);
frame[5] = (byte)(len & 0xFF);
frame[6] = unit;
frame[7] = 0x03;
frame[8] = (byte)(regs.Length * 2);
for (int i = 0; i < regs.Length; i++)
{
frame[9 + i * 2] = (byte)(regs[i] >> 8);
frame[9 + i * 2 + 1] = (byte)(regs[i] & 0xFF);
}
return frame;
}
private static byte[] BuildFc06Response(ushort txId, byte unit, ushort addr, ushort value)
{
var frame = new byte[12];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
frame[4] = 0; frame[5] = 6;
frame[6] = unit;
frame[7] = 0x06;
frame[8] = (byte)(addr >> 8); frame[9] = (byte)(addr & 0xFF);
frame[10] = (byte)(value >> 8); frame[11] = (byte)(value & 0xFF);
return frame;
}
// ── Holding-the-response stub backend ─────────────────────────────────────
///
/// Stub backend that delays its response by . The delay
/// gives the test a deterministic in-flight window so a second client's identical
/// request actually overlaps the first request's wire-time. Records every proxy TxId
/// it sees so the test can count distinct backend round-trips.
///
private sealed class DelayedStubBackend : IAsyncDisposable
{
public int Port { get; }
public int ResponseDelayMs { get; set; } = 200;
public ConcurrentQueue SeenProxyTxIds { get; } = new();
public int RequestCount => SeenProxyTxIds.Count;
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly List _clientTasks = new();
public DelayedStubBackend(int port)
{
Port = port;
_listener = new TcpListener(IPAddress.Loopback, port);
_listener.Start();
_ = AcceptLoop();
}
private async Task AcceptLoop()
{
try
{
while (!_cts.IsCancellationRequested)
{
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
var t = Task.Run(() => HandleAsync(s));
lock (_clientTasks) _clientTasks.Add(t);
}
}
catch { }
}
private async Task HandleAsync(Socket s)
{
try
{
while (!_cts.IsCancellationRequested)
{
var req = await ReadOneFrameAsync(s, _cts.Token);
if (req.Length < 8) break;
ushort txId = (ushort)((req[0] << 8) | req[1]);
byte unit = req[6];
byte fc = req[7];
SeenProxyTxIds.Enqueue(txId);
// Schedule the response asynchronously so the next request (from a
// second client) can race onto the multiplexer while this one is
// still in flight.
_ = Task.Run(async () =>
{
try
{
await Task.Delay(ResponseDelayMs, _cts.Token);
byte[] response;
if (fc == 0x03 || fc == 0x04)
{
// Default register value 0x1234 (BCD 1234).
response = BuildFc03Response(txId, unit, 0x1234);
response[7] = fc; // restore actual FC byte
}
else if (fc == 0x06)
{
ushort addr = (ushort)((req[8] << 8) | req[9]);
ushort val = (ushort)((req[10] << 8) | req[11]);
response = BuildFc06Response(txId, unit, addr, val);
}
else { return; }
await s.SendAsync(response, SocketFlags.None, _cts.Token);
}
catch { }
});
}
}
catch { }
finally { try { s.Dispose(); } catch { } }
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
try { _listener.Stop(); } catch { }
Task[] snap;
lock (_clientTasks) snap = _clientTasks.ToArray();
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
_cts.Dispose();
}
}
// ── Mux construction / client helpers ────────────────────────────────────
private static PerPlcContext MakeContext(string name, params BcdTag[] tags)
{
var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary();
var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty;
return new PerPlcContext
{
PlcName = name,
TagMap = map,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
};
}
private static PlcMultiplexer BuildMux(
PlcOptions plc,
ConnectionOptions connOpts,
PerPlcContext ctx,
ReadCoalescingOptions coalescing)
{
return new PlcMultiplexer(
plc, connOpts,
new BcdPduPipeline(),
ctx,
NullLogger.Instance,
backendConnectPipeline: null,
coalescingOptions: () => coalescing);
}
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener)>
ConnectClientAsync(PlcMultiplexer mux, string plcName)
{
int proxyPort = PickFreePort();
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
var upstream = await proxyListener.AcceptSocketAsync();
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
return (client, pipe, proxyListener);
}
// ── Tests ────────────────────────────────────────────────────────────────
[Fact]
public async Task TwoClients_SameRequest_OnlyOneBackendRoundTrip()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 300 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// First client opens the in-flight entry; small gap lets the multiplexer enqueue
// before the second arrives. The 300 ms delay then gives the second client
// ample window to coalesce onto the first.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(80, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001);
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002);
backend.RequestCount.ShouldBe(1, "exactly one backend round-trip must service both clients");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(1);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task TwoClients_DifferentRequests_BothHitBackend()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// Different start addresses → different keys → no coalescing.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03(0x0002, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "two distinct keys must produce two backend round-trips");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task FiveClients_SameRequest_OneBackendRoundTrip_FiveResponses()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var sockets = new List();
var pipes = new List();
var lists = new List();
try
{
for (int i = 0; i < 5; i++)
{
var (c, p, l) = await ConnectClientAsync(mux, plc.Name);
sockets.Add(c); pipes.Add(p); lists.Add(l);
}
// First client opens; the rest race in during the 400 ms window.
await sockets[0].SendAsync(BuildFc03((ushort)1, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
for (int i = 1; i < sockets.Count; i++)
await sockets[i].SendAsync(BuildFc03((ushort)(i + 1), 100, 1), SocketFlags.None);
// Read back every client's response.
for (int i = 0; i < sockets.Count; i++)
{
var rsp = await ReadOneFrameAsync(sockets[i], TestContext.Current.CancellationToken);
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)(i + 1),
$"client #{i} must see its own original TxId restored");
}
backend.RequestCount.ShouldBeLessThanOrEqualTo(2,
"at most 2 backend round-trips (one for the leader, one for any racy first-arrival)");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBeGreaterThanOrEqualTo(3,
"at least 3 of the 5 clients should have coalesced");
}
finally
{
foreach (var s in sockets) s.Dispose();
foreach (var p in pipes) await p.DisposeAsync();
foreach (var l in lists) l.Stop();
}
}
[Fact]
public async Task FC03_And_FC04_SameAddress_NOT_Coalesced()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 200 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// FC03 vs FC04 — different Modbus tables, never coalesce.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc04(0x0002, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "FC03 and FC04 must never share a backend round-trip");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task FC06_Write_NeverCoalesced()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 100 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// Two identical FC06 writes — writes must never coalesce (non-idempotent).
await c1.SendAsync(BuildFc06(0x0001, 200, 1234), SocketFlags.None);
await c2.SendAsync(BuildFc06(0x0002, 200, 1234), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "FC06 writes must always hit the backend separately");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0,
"writes are never counted as coalescing hits");
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(0,
"writes are not part of the coalescing accounting");
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task OneClient_DisconnectsMidFlight_OthersStillGetResponse_AndDeadUpstreamCounterIncrements()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
// Drop client 1 mid-flight (before the backend response arrives).
c1.Dispose();
await p1.DisposeAsync();
// Client 2 must still get its response.
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002);
// Give fan-out a beat to record the dead-upstream skip on the c1 side.
await Task.Delay(100, TestContext.Current.CancellationToken);
ctx.Counters.Snapshot().CoalescedResponseToDeadUpstream.ShouldBeGreaterThanOrEqualTo(1,
"the disconnected client's fan-out slot must increment the dead-upstream counter");
}
finally
{
c2.Dispose();
await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task AtMaxParties_NextRequest_StartsFreshBackendRoundTrip()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
// MaxParties = 2 forces the third identical request to open a fresh entry.
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 2 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
var (c3, p3, l3) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(50, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
await Task.Delay(50, TestContext.Current.CancellationToken);
await c3.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c3, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2,
"MaxParties=2 caps the first entry at 2; the third request opens its own round-trip");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1, "exactly one party joined the leader");
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2,
"the leader and the overflow are both misses");
}
finally
{
c1.Dispose(); c2.Dispose(); c3.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync(); await p3.DisposeAsync();
l1.Stop(); l2.Stop(); l3.Stop();
}
}
[Fact]
public async Task CoalescingDisabled_TwoIdenticalReads_BothHitBackend()
{
// Sanity: with Enabled=false the multiplexer takes the Phase-9 path for every
// FC03/FC04 request. Both identical reads must produce a backend round-trip and
// every request counts as a Miss (Hit + Miss = total FC03/FC04 invariant).
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = false, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "coalescing disabled: each identical read must hit the backend");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, "every FC03 request still counts as a Miss");
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
}