using System.Collections.Concurrent; using System.Collections.Frozen; using System.Net; using System.Net.Sockets; using Mbproxy.Bcd; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Multiplexing; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Multiplexing; /// /// Phase-10 unit tests for read coalescing against a stub backend (real sockets, no /// simulator). The stub gives us deterministic control over backend response timing so /// the "overlapping in-flight" window is large enough for late requests to actually /// coalesce. The pymodbus simulator cannot be used here — its known concurrent-MBAP-frame /// bug (see ) would invalidate the proxy-TxId echo path /// that coalescing relies on. /// [Trait("Category", "Unit")] public sealed class ReadCoalescingTests { // ── Frame builders / readers ───────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static async Task ReadExactAsync(Socket s, int count, CancellationToken ct) { var buf = new byte[count]; int read = 0; while (read < count) { int n = await s.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); if (n == 0) throw new IOException("EOF"); read += n; } return buf; } private static async Task ReadOneFrameAsync(Socket s, CancellationToken ct) { var header = await ReadExactAsync(s, 7, ct); ushort length = (ushort)((header[4] << 8) | header[5]); int bodyLen = length - 1; var body = bodyLen > 0 ? await ReadExactAsync(s, bodyLen, ct) : Array.Empty(); var frame = new byte[7 + bodyLen]; Buffer.BlockCopy(header, 0, frame, 0, 7); if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); return frame; } private static byte[] BuildFc03(ushort txId, ushort start, ushort qty, byte unit = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unit, 0x03, (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; private static byte[] BuildFc04(ushort txId, ushort start, ushort qty, byte unit = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unit, 0x04, (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; private static byte[] BuildFc06(ushort txId, ushort addr, ushort value, byte unit = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unit, 0x06, (byte)(addr >> 8), (byte)(addr & 0xFF), (byte)(value >> 8), (byte)(value & 0xFF), ]; private static byte[] BuildFc03Response(ushort txId, byte unit, params ushort[] regs) { int bodyLen = 2 + regs.Length * 2; var frame = new byte[7 + bodyLen]; frame[0] = (byte)(txId >> 8); frame[1] = (byte)(txId & 0xFF); frame[2] = 0; frame[3] = 0; ushort len = (ushort)(1 + bodyLen); frame[4] = (byte)(len >> 8); frame[5] = (byte)(len & 0xFF); frame[6] = unit; frame[7] = 0x03; frame[8] = (byte)(regs.Length * 2); for (int i = 0; i < regs.Length; i++) { frame[9 + i * 2] = (byte)(regs[i] >> 8); frame[9 + i * 2 + 1] = (byte)(regs[i] & 0xFF); } return frame; } private static byte[] BuildFc06Response(ushort txId, byte unit, ushort addr, ushort value) { var frame = new byte[12]; frame[0] = (byte)(txId >> 8); frame[1] = (byte)(txId & 0xFF); frame[2] = 0; frame[3] = 0; frame[4] = 0; frame[5] = 6; frame[6] = unit; frame[7] = 0x06; frame[8] = (byte)(addr >> 8); frame[9] = (byte)(addr & 0xFF); frame[10] = (byte)(value >> 8); frame[11] = (byte)(value & 0xFF); return frame; } // ── Holding-the-response stub backend ───────────────────────────────────── /// /// Stub backend that delays its response by . The delay /// gives the test a deterministic in-flight window so a second client's identical /// request actually overlaps the first request's wire-time. Records every proxy TxId /// it sees so the test can count distinct backend round-trips. /// private sealed class DelayedStubBackend : IAsyncDisposable { public int Port { get; } public int ResponseDelayMs { get; set; } = 200; public ConcurrentQueue SeenProxyTxIds { get; } = new(); public int RequestCount => SeenProxyTxIds.Count; private readonly TcpListener _listener; private readonly CancellationTokenSource _cts = new(); private readonly List _clientTasks = new(); public DelayedStubBackend(int port) { Port = port; _listener = new TcpListener(IPAddress.Loopback, port); _listener.Start(); _ = AcceptLoop(); } private async Task AcceptLoop() { try { while (!_cts.IsCancellationRequested) { Socket s = await _listener.AcceptSocketAsync(_cts.Token); var t = Task.Run(() => HandleAsync(s)); lock (_clientTasks) _clientTasks.Add(t); } } catch { } } private async Task HandleAsync(Socket s) { try { while (!_cts.IsCancellationRequested) { var req = await ReadOneFrameAsync(s, _cts.Token); if (req.Length < 8) break; ushort txId = (ushort)((req[0] << 8) | req[1]); byte unit = req[6]; byte fc = req[7]; SeenProxyTxIds.Enqueue(txId); // Schedule the response asynchronously so the next request (from a // second client) can race onto the multiplexer while this one is // still in flight. _ = Task.Run(async () => { try { await Task.Delay(ResponseDelayMs, _cts.Token); byte[] response; if (fc == 0x03 || fc == 0x04) { // Default register value 0x1234 (BCD 1234). response = BuildFc03Response(txId, unit, 0x1234); response[7] = fc; // restore actual FC byte } else if (fc == 0x06) { ushort addr = (ushort)((req[8] << 8) | req[9]); ushort val = (ushort)((req[10] << 8) | req[11]); response = BuildFc06Response(txId, unit, addr, val); } else { return; } await s.SendAsync(response, SocketFlags.None, _cts.Token); } catch { } }); } } catch { } finally { try { s.Dispose(); } catch { } } } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); try { _listener.Stop(); } catch { } Task[] snap; lock (_clientTasks) snap = _clientTasks.ToArray(); try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } _cts.Dispose(); } } // ── Mux construction / client helpers ──────────────────────────────────── private static PerPlcContext MakeContext(string name, params BcdTag[] tags) { var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary(); var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty; return new PerPlcContext { PlcName = name, TagMap = map, Counters = new ProxyCounters(), Logger = NullLogger.Instance, }; } private static PlcMultiplexer BuildMux( PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx, ReadCoalescingOptions coalescing) { return new PlcMultiplexer( plc, connOpts, new BcdPduPipeline(), ctx, NullLogger.Instance, backendConnectPipeline: null, coalescingOptions: () => coalescing); } private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener)> ConnectClientAsync(PlcMultiplexer mux, string plcName) { int proxyPort = PickFreePort(); var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); proxyListener.Start(); var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; await client.ConnectAsync(IPAddress.Loopback, proxyPort); var upstream = await proxyListener.AcceptSocketAsync(); var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); return (client, pipe, proxyListener); } // ── Tests ──────────────────────────────────────────────────────────────── [Fact] public async Task TwoClients_SameRequest_OnlyOneBackendRoundTrip() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 300 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { // First client opens the in-flight entry; small gap lets the multiplexer enqueue // before the second arrives. The 300 ms delay then gives the second client // ample window to coalesce onto the first. await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await Task.Delay(80, TestContext.Current.CancellationToken); await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); ((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001); ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002); backend.RequestCount.ShouldBe(1, "exactly one backend round-trip must service both clients"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1); ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(1); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task TwoClients_DifferentRequests_BothHitBackend() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { // Different start addresses → different keys → no coalescing. await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await c2.SendAsync(BuildFc03(0x0002, 200, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); backend.RequestCount.ShouldBe(2, "two distinct keys must produce two backend round-trips"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task FiveClients_SameRequest_OneBackendRoundTrip_FiveResponses() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var sockets = new List(); var pipes = new List(); var lists = new List(); try { for (int i = 0; i < 5; i++) { var (c, p, l) = await ConnectClientAsync(mux, plc.Name); sockets.Add(c); pipes.Add(p); lists.Add(l); } // First client opens; the rest race in during the 400 ms window. await sockets[0].SendAsync(BuildFc03((ushort)1, 100, 1), SocketFlags.None); await Task.Delay(60, TestContext.Current.CancellationToken); for (int i = 1; i < sockets.Count; i++) await sockets[i].SendAsync(BuildFc03((ushort)(i + 1), 100, 1), SocketFlags.None); // Read back every client's response. for (int i = 0; i < sockets.Count; i++) { var rsp = await ReadOneFrameAsync(sockets[i], TestContext.Current.CancellationToken); ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)(i + 1), $"client #{i} must see its own original TxId restored"); } backend.RequestCount.ShouldBeLessThanOrEqualTo(2, "at most 2 backend round-trips (one for the leader, one for any racy first-arrival)"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBeGreaterThanOrEqualTo(3, "at least 3 of the 5 clients should have coalesced"); } finally { foreach (var s in sockets) s.Dispose(); foreach (var p in pipes) await p.DisposeAsync(); foreach (var l in lists) l.Stop(); } } [Fact] public async Task FC03_And_FC04_SameAddress_NOT_Coalesced() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 200 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { // FC03 vs FC04 — different Modbus tables, never coalesce. await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await c2.SendAsync(BuildFc04(0x0002, 100, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); backend.RequestCount.ShouldBe(2, "FC03 and FC04 must never share a backend round-trip"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task FC06_Write_NeverCoalesced() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 100 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { // Two identical FC06 writes — writes must never coalesce (non-idempotent). await c1.SendAsync(BuildFc06(0x0001, 200, 1234), SocketFlags.None); await c2.SendAsync(BuildFc06(0x0002, 200, 1234), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); backend.RequestCount.ShouldBe(2, "FC06 writes must always hit the backend separately"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0, "writes are never counted as coalescing hits"); ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(0, "writes are not part of the coalescing accounting"); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task OneClient_DisconnectsMidFlight_OthersStillGetResponse_AndDeadUpstreamCounterIncrements() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await Task.Delay(60, TestContext.Current.CancellationToken); await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); await Task.Delay(60, TestContext.Current.CancellationToken); // Drop client 1 mid-flight (before the backend response arrives). c1.Dispose(); await p1.DisposeAsync(); // Client 2 must still get its response. var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002); // Give fan-out a beat to record the dead-upstream skip on the c1 side. await Task.Delay(100, TestContext.Current.CancellationToken); ctx.Counters.Snapshot().CoalescedResponseToDeadUpstream.ShouldBeGreaterThanOrEqualTo(1, "the disconnected client's fan-out slot must increment the dead-upstream counter"); } finally { c2.Dispose(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task AtMaxParties_NextRequest_StartsFreshBackendRoundTrip() { int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; // MaxParties = 2 forces the third identical request to open a fresh entry. await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 2 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); var (c3, p3, l3) = await ConnectClientAsync(mux, plc.Name); try { await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await Task.Delay(50, TestContext.Current.CancellationToken); await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); await Task.Delay(50, TestContext.Current.CancellationToken); await c3.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c3, TestContext.Current.CancellationToken); backend.RequestCount.ShouldBe(2, "MaxParties=2 caps the first entry at 2; the third request opens its own round-trip"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1, "exactly one party joined the leader"); ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, "the leader and the overflow are both misses"); } finally { c1.Dispose(); c2.Dispose(); c3.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); await p3.DisposeAsync(); l1.Stop(); l2.Stop(); l3.Stop(); } } [Fact] public async Task CoalescingDisabled_TwoIdenticalReads_BothHitBackend() { // Sanity: with Enabled=false the multiplexer takes the Phase-9 path for every // FC03/FC04 request. Both identical reads must produce a backend round-trip and // every request counts as a Miss (Hit + Miss = total FC03/FC04 invariant). int backendPort = PickFreePort(); await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 }; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = BuildMux(plc, new ConnectionOptions(), ctx, new ReadCoalescingOptions { Enabled = false, MaxParties = 32 }); var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); try { await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); backend.RequestCount.ShouldBe(2, "coalescing disabled: each identical read must hit the backend"); ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, "every FC03 request still counts as a Miss"); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } }