using System.Collections.Concurrent; using System.Collections.Frozen; using System.Net; using System.Net.Sockets; using Mbproxy.Bcd; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Multiplexing; /// /// Integration tests for against a stub backend /// (a that canned-responds). Uses real sockets but no simulator. /// [Trait("Category", "Unit")] public sealed class PlcMultiplexerTests { // ── Helpers ──────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } /// /// Reads exactly bytes from . /// private static async Task ReadExactAsync(Socket socket, int count, CancellationToken ct) { var buf = new byte[count]; int read = 0; while (read < count) { int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); if (n == 0) throw new IOException("EOF"); read += n; } return buf; } private static async Task ReadOneFrameAsync(Socket socket, CancellationToken ct) { var header = await ReadExactAsync(socket, 7, ct); ushort length = (ushort)((header[4] << 8) | header[5]); int bodyLen = length - 1; var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty(); var frame = new byte[7 + bodyLen]; Buffer.BlockCopy(header, 0, frame, 0, 7); if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); return frame; } private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unitId, 0x03, (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; private static byte[] BuildFc06WriteFrame(ushort txId, ushort addr, ushort value, byte unitId = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unitId, 0x06, (byte)(addr >> 8), (byte)(addr & 0xFF), (byte)(value >> 8), (byte)(value & 0xFF), ]; private static byte[] BuildFc03Response(ushort txId, byte unitId, params ushort[] registers) { int bodyLen = 2 + registers.Length * 2; // FC + byteCount + register data var frame = new byte[7 + bodyLen]; frame[0] = (byte)(txId >> 8); frame[1] = (byte)(txId & 0xFF); frame[2] = 0; frame[3] = 0; ushort length = (ushort)(1 + bodyLen); // UnitId + PDU frame[4] = (byte)(length >> 8); frame[5] = (byte)(length & 0xFF); frame[6] = unitId; frame[7] = 0x03; frame[8] = (byte)(registers.Length * 2); for (int i = 0; i < registers.Length; i++) { frame[9 + i * 2] = (byte)(registers[i] >> 8); frame[9 + i * 2 + 1] = (byte)(registers[i] & 0xFF); } return frame; } /// /// FC06 response echo with txId / addr / value. /// private static byte[] BuildFc06Response(ushort txId, byte unitId, ushort addr, ushort value) { var frame = new byte[7 + 5]; frame[0] = (byte)(txId >> 8); frame[1] = (byte)(txId & 0xFF); frame[2] = 0; frame[3] = 0; frame[4] = 0; frame[5] = 6; // length: UnitId(1) + FC(1) + Addr(2) + Value(2) frame[6] = unitId; frame[7] = 0x06; frame[8] = (byte)(addr >> 8); frame[9] = (byte)(addr & 0xFF); frame[10] = (byte)(value >> 8); frame[11] = (byte)(value & 0xFF); return frame; } private static PerPlcContext MakeContext(string name, params BcdTag[] tags) { var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary(); var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty; return new PerPlcContext { PlcName = name, TagMap = map, Counters = new ProxyCounters(), Logger = NullLogger.Instance, }; } /// /// A stub backend that echoes FC03 responses for every request, recording the proxy /// TxIds it sees on the wire so tests can verify the multiplexer rewrites them. /// private sealed class StubBackend : IAsyncDisposable { public int Port { get; } private readonly TcpListener _listener; private readonly CancellationTokenSource _cts = new(); private readonly List _clientTasks = new(); public ConcurrentQueue SeenProxyTxIds { get; } = new(); public Func? FcResponseFactory { get; set; } public StubBackend(int port) { Port = port; _listener = new TcpListener(IPAddress.Loopback, port); _listener.Start(); _ = AcceptLoop(); } private async Task AcceptLoop() { try { while (!_cts.IsCancellationRequested) { Socket s = await _listener.AcceptSocketAsync(_cts.Token); var task = Task.Run(() => HandleAsync(s)); lock (_clientTasks) _clientTasks.Add(task); } } catch { /* shutdown */ } } private async Task HandleAsync(Socket s) { try { while (!_cts.IsCancellationRequested) { var req = await ReadOneFrameAsync(s, _cts.Token); if (req.Length < 8) break; ushort txId = (ushort)((req[0] << 8) | req[1]); SeenProxyTxIds.Enqueue(txId); byte unitId = req[6]; byte fc = req[7]; byte[] response; if (FcResponseFactory is not null) { ushort start = req.Length >= 10 ? (ushort)((req[8] << 8) | req[9]) : (ushort)0; ushort qty = req.Length >= 12 ? (ushort)((req[10] << 8) | req[11]) : (ushort)0; response = FcResponseFactory(fc, start, qty, txId); } else if (fc == 0x03) { // Default: FC03 echo a single register containing 0x1234. response = BuildFc03Response(txId, unitId, 0x1234); } else if (fc == 0x06) { ushort addr = (ushort)((req[8] << 8) | req[9]); ushort value = (ushort)((req[10] << 8) | req[11]); response = BuildFc06Response(txId, unitId, addr, value); } else { break; } await s.SendAsync(response, SocketFlags.None, _cts.Token); } } catch { /* normal */ } finally { try { s.Dispose(); } catch { } } } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); try { _listener.Stop(); } catch { } Task[] snap; lock (_clientTasks) snap = _clientTasks.ToArray(); try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } _cts.Dispose(); } } private static async Task BuildMuxAsync( PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx) { var mux = new PlcMultiplexer( plc, connOpts, new BcdPduPipeline(), ctx, NullLogger.Instance, backendConnectPipeline: null); await Task.Yield(); return mux; } private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener, int proxyPort)> ConnectClientAsync(PlcMultiplexer mux, string plcName) { int proxyPort = PickFreePort(); var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); proxyListener.Start(); var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; await client.ConnectAsync(IPAddress.Loopback, proxyPort); var upstream = await proxyListener.AcceptSocketAsync(); var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); return (client, pipe, proxyListener, proxyPort); } // ── Tests ───────────────────────────────────────────────────────────────── [Fact] public async Task SingleUpstream_RoundTripsFC03_Through_Multiplexer() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1", BcdTag.Create(100, 16)); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { await client.SendAsync(BuildFc03ReadFrame(0x1234, 100, 1), SocketFlags.None); var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]); rspTxId.ShouldBe((ushort)0x1234, "the original TxId must be restored on the way back to the client"); // BCD decode of the stub's 0x1234 response = 1234. ushort decoded = (ushort)((rsp[9] << 8) | rsp[10]); decoded.ShouldBe((ushort)1234); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } [Fact] public async Task SingleUpstream_RoundTripsFC06_Through_Multiplexer() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1", BcdTag.Create(200, 16)); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { // Client writes binary 1234; proxy encodes to BCD 0x1234 on the way out. await client.SendAsync(BuildFc06WriteFrame(0xABCD, 200, 1234), SocketFlags.None); var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]); rspTxId.ShouldBe((ushort)0xABCD); // Echo bytes decoded back to client binary. ushort echoed = (ushort)((rsp[10] << 8) | rsp[11]); echoed.ShouldBe((ushort)1234); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } [Fact] public async Task TwoUpstreams_ConcurrentFC03_BothGetCorrectResponses() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort) { // Both clients read address 100; both should see their own TxId echoed. FcResponseFactory = (fc, start, qty, txId) => { byte unitId = 1; return fc == 0x03 ? BuildFc03Response(txId, unitId, 0x1234) : throw new InvalidOperationException("unexpected fc"); }, }; var ctx = MakeContext("PLC1", BcdTag.Create(100, 16)); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name); try { // Both clients use the same upstream TxId (0x0001). That would clash on a // shared backend wire if the mux didn't rewrite the TxId. await c1.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None); await c2.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None); var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); // Both responses must carry the original (colliding) TxId. ((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001); ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0001); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name); var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name); try { // Both clients use the same upstream TxId 0x0007 — the proxy must hand out // distinct proxy TxIds on the backend wire. Reads target DIFFERENT addresses // so coalescing does not fuse them into a single backend request. await c1.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None); await c2.SendAsync(BuildFc03ReadFrame(0x0007, 10, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); // Collect what the backend saw. var seen = new HashSet(backend.SeenProxyTxIds); seen.Count.ShouldBeGreaterThanOrEqualTo(2, "the multiplexer must allocate distinct proxy TxIds even when upstreams collide"); } finally { c1.Dispose(); c2.Dispose(); await p1.DisposeAsync(); await p2.DisposeAsync(); l1.Stop(); l2.Stop(); } } [Fact] public async Task UpstreamDisconnect_DoesNotAffectOtherUpstreams() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); try { // Drop client A entirely. cA.Dispose(); await Task.Delay(50, TestContext.Current.CancellationToken); // Client B should still be able to round-trip. await cB.SendAsync(BuildFc03ReadFrame(0x0042, 0, 1), SocketFlags.None); var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken); ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0042); } finally { cB.Dispose(); await pA.DisposeAsync(); await pB.DisposeAsync(); lA.Stop(); lB.Stop(); } } [Fact] public async Task BackendDisconnect_CascadesToAllUpstreams() { int backendPort = PickFreePort(); var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); var (cC, pC, lC, _) = await ConnectClientAsync(mux, plc.Name); try { // Force a round-trip on each so backend connect occurs first. await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); await cB.SendAsync(BuildFc03ReadFrame(2, 0, 1), SocketFlags.None); await cC.SendAsync(BuildFc03ReadFrame(3, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(cC, TestContext.Current.CancellationToken); // Kill the backend. await backend.DisposeAsync(); // All three upstream sockets should observe a clean EOF within 500 ms. var sw = System.Diagnostics.Stopwatch.StartNew(); await WaitForCloseAsync(cA, TestContext.Current.CancellationToken); await WaitForCloseAsync(cB, TestContext.Current.CancellationToken); await WaitForCloseAsync(cC, TestContext.Current.CancellationToken); sw.Stop(); sw.ElapsedMilliseconds.ShouldBeLessThan(2000, "cascade should propagate quickly"); // Poll briefly for the cascade counter — there is an inherent scheduling gap // between "upstream socket EOF observed" (WaitForCloseAsync returns) and "the // multiplexer's TearDownBackendAsync increments the counter after awaiting // every pipe.DisposeAsync". This poll absorbs that scheduling jitter without // weakening the assertion's semantics — the counter MUST reach 3 (or more) // because all three upstream pipes were attached when the cascade fired. long cascades = 0; for (int i = 0; i < 50; i++) { cascades = ctx.Counters.Snapshot().BackendDisconnectCascades; if (cascades >= 3) break; await Task.Delay(20, TestContext.Current.CancellationToken); } cascades.ShouldBeGreaterThanOrEqualTo(3); } finally { cA.Dispose(); cB.Dispose(); cC.Dispose(); await pA.DisposeAsync(); await pB.DisposeAsync(); await pC.DisposeAsync(); lA.Stop(); lB.Stop(); lC.Stop(); } } [Fact] public async Task RequestTimeoutWatchdog_DeliversException0B_ToUpstream_WhenBackendNeverResponds() { // A drain-only stub that consumes requests but never responds. The multiplexer's // per-request watchdog must surface a Modbus exception 0x0B to the upstream client // once BackendRequestTimeoutMs elapses, freeing the proxy TxId + correlation entry. int backendPort = PickFreePort(); var drainListener = new TcpListener(IPAddress.Loopback, backendPort); drainListener.Start(); var drainCts = new CancellationTokenSource(); var drainToken = drainCts.Token; _ = Task.Run(async () => { try { while (!drainToken.IsCancellationRequested) { var s = await drainListener.AcceptSocketAsync(drainToken); _ = Task.Run(async () => { var buf = new byte[256]; try { while (!drainToken.IsCancellationRequested) { int n = await s.ReceiveAsync(buf, SocketFlags.None, drainToken); if (n == 0) break; } } catch { } finally { try { s.Dispose(); } catch { } } }, drainToken); } } catch { } }, drainToken); try { var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; // Short request timeout so the test does not have to wait long. var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 }; await using var mux = await BuildMuxAsync(plc, connOpts, ctx); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { await client.SendAsync(BuildFc03ReadFrame(0xABCD, 0, 1), SocketFlags.None); // The watchdog should deliver an exception within ~watchdog-tick * 2. var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]); rspTxId.ShouldBe((ushort)0xABCD, "watchdog must echo the original client TxId"); byte fcByte = rsp[7]; (fcByte & 0x80).ShouldBe(0x80, "FC must have the exception bit set"); (fcByte & 0x7F).ShouldBe(0x03, "original FC must be FC03 (read holding registers)"); rsp[8].ShouldBe((byte)0x0B, "exception code must be 0x0B (Gateway Target Device Failed To Respond)"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } finally { await drainCts.CancelAsync(); try { drainListener.Stop(); } catch { } drainCts.Dispose(); } } [Fact] public async Task BackendReconnect_AfterCascade_NextUpstreamRequest_Succeeds() { int backendPort = PickFreePort(); var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); try { await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken); await backend.DisposeAsync(); await WaitForCloseAsync(cA, TestContext.Current.CancellationToken); cA.Dispose(); await pA.DisposeAsync(); lA.Stop(); } catch { /* tolerate any teardown noise */ } // Start a new backend on the same port. await using var backend2 = new StubBackend(backendPort); // A fresh client should round-trip cleanly through the same multiplexer. var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); try { await cB.SendAsync(BuildFc03ReadFrame(0x7777, 0, 1), SocketFlags.None); var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken); ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x7777); } finally { cB.Dispose(); await pB.DisposeAsync(); lB.Stop(); } } private static async Task WaitForCloseAsync(Socket s, CancellationToken ct) { var buf = new byte[1]; using var deadline = CancellationTokenSource.CreateLinkedTokenSource(ct); deadline.CancelAfter(TimeSpan.FromSeconds(2)); while (!deadline.IsCancellationRequested) { try { int n = await s.ReceiveAsync(buf, SocketFlags.None, deadline.Token); if (n == 0) return; } catch { return; } } } // ── ReplaceContext live-swap regression tests ──────────────────────────────── /// /// Verifies that swaps the live per-PLC /// context on the running multiplexer, so the very next PDU's BCD rewriter uses the /// new tag map (not the captured-at-construction map). Without the live swap this /// scenario would silently keep using the old map until the listener faulted and the /// supervisor's Polly loop reconstructed everything. /// [Fact] public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); backend.FcResponseFactory = (fc, _, _, txId) => fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty(); // Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234. var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16)); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { // Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2). await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None); var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]); decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234"); // Swap to an empty tag map (counters preserved per the design's reseat contract). var ctx2 = new PerPlcContext { PlcName = "PLC1", TagMap = BcdTagMap.Empty, Counters = ctx1.Counters, Logger = NullLogger.Instance, }; mux.ReplaceContext(ctx2); // Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged. await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None); var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]); raw2.ShouldBe((ushort)0x1234, "after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } /// /// Verifies that swapping in a fresh response cache via /// makes the running multiplexer consult /// the NEW cache for subsequent reads, not the old cache that was disposed by the /// supervisor. Without the live swap the running mux would keep its constructor- /// captured cache reference and either return stale entries or hit a disposed cache. /// [Fact] public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); backend.FcResponseFactory = (fc, _, _, txId) => fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty(); // Context 1 — cacheable tag at addr 200 with TTL 60_000 ms. var tag = new BcdTag(200, 16, CacheTtlMs: 60_000); var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary(); var map = new BcdTagMap(dict); var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); var ctx1 = new PerPlcContext { PlcName = "PLC1", TagMap = map, Counters = new ProxyCounters(), Logger = NullLogger.Instance, Cache = cache1, }; var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { // Read 1 — populates cache1 from backend. await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); await Task.Delay(50, TestContext.Current.CancellationToken); int afterFirst = backend.SeenProxyTxIds.Count; cache1.Count.ShouldBe(1, "cache1 must contain the first read"); // Read 2 — must hit cache1 (no backend traffic). await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic"); // Swap in a brand-new (empty) cache via ReplaceContext. var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000); var ctx2 = new PerPlcContext { PlcName = "PLC1", TagMap = map, Counters = ctx1.Counters, Logger = NullLogger.Instance, Cache = cache2, }; mux.ReplaceContext(ctx2); // Read 3 — old cache had the entry, but mux now uses cache2 which is empty, // so the next read MUST go to the backend. await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); await Task.Delay(50, TestContext.Current.CancellationToken); backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1, "after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)"); cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); cache1.Dispose(); } } // ── Final-tier race tests ───────────────────────────────────────────────── /// /// Reflection helper — drains the multiplexer's TxIdAllocator by calling /// TryAllocate in a loop until it refuses. Returns the number of slots taken /// so callers can later release them if needed. Used only by the saturation tests /// below; production code never touches the allocator from outside the multiplexer. /// private static int DrainAllocator(PlcMultiplexer mux) { var allocField = typeof(PlcMultiplexer).GetField("_allocator", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; object alloc = allocField.GetValue(mux)!; var tryAllocate = alloc.GetType().GetMethod("TryAllocate", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance)!; int taken = 0; object?[] args = [(ushort)0]; while ((bool)tryAllocate.Invoke(alloc, args)!) taken++; return taken; } /// /// TxId allocator saturation propagates as a Modbus exception 04 to the upstream /// client (no hang, no crash). The 16-bit TxId space (65,536 slots) is pre-saturated /// via reflection so the next request hits the !_allocator.TryAllocate branch /// in OnUpstreamFrameAsync immediately. /// [Fact] public async Task TxIdAllocator_Saturated_NextRequest_GetsException04_WithOriginalTxId() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { // Force the multiplexer to bring up its backend so the saturation check at // OnUpstreamFrameAsync's allocator path is the only thing left to fail. // (We send a normal request first, drain the response, then saturate.) await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); int taken = DrainAllocator(mux); // The TxId space is 65536; the previous request released its slot when the // response landed, so we should be able to take ≥ 65535 fresh slots. taken.ShouldBeGreaterThanOrEqualTo(65_535); // Send a request that MUST hit saturation. Use a non-coalescing FC (06) to // exercise the simpler non-coalescing saturation branch directly. const ushort txId = 0xBEEF; await client.SendAsync(BuildFc06WriteFrame(txId, addr: 100, value: 0x1234), SocketFlags.None); var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); // Validate the saturation exception: original TxId echoed, exception bit set, // exception code 04 (SLAVE_DEVICE_FAILURE). ushort echoTxId = (ushort)((rsp[0] << 8) | rsp[1]); echoTxId.ShouldBe(txId, "saturation exception must echo the original client TxId"); byte fc = rsp[7]; (fc & 0x80).ShouldBe(0x80, "FC must have the exception bit set"); (fc & 0x7F).ShouldBe(0x06, "original FC must be FC06"); rsp[8].ShouldBe((byte)0x04, "exception code must be 0x04 (Slave Device Failure)"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } /// /// Under TxId saturation, two concurrent identical FC03 reads must BOTH receive /// exception 04 (one as the leader directly, the other either via a coalesced /// fan-out from the saturation cleanup OR via its own independent saturation path — /// either timing produces the same observable contract). Validates that no pipe /// hangs forever waiting for a backend response that would never arrive. /// [Fact] public async Task TxIdAllocator_Saturated_TwoConcurrentIdenticalReads_BothPipesGetException04() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); // Bring backend up via a primer request from a throwaway pipe. var (primer, primerPipe, primerListener, _) = await ConnectClientAsync(mux, plc.Name); await primer.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(primer, TestContext.Current.CancellationToken); primer.Dispose(); await primerPipe.DisposeAsync(); primerListener.Stop(); // Saturate after primer has released its slot. DrainAllocator(mux); // Two pipes, each sends the same FC03 (unitId=1, fc=03, start=200, qty=1). var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); try { const ushort txA = 0xAAA1; const ushort txB = 0xBBB1; // Fire concurrently; both targets the same coalescing key. await Task.WhenAll( cA.SendAsync(BuildFc03ReadFrame(txA, 200, 1), SocketFlags.None), cB.SendAsync(BuildFc03ReadFrame(txB, 200, 1), SocketFlags.None)); var rspA = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken); var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken); // Both must be exception 04 with the original TxId echoed — the contract // is "no late attacher hangs." foreach (var (rsp, expectedTxId, label) in new[] { (rspA, txA, "A"), (rspB, txB, "B") }) { ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); echo.ShouldBe(expectedTxId, $"{label}: original TxId must be echoed"); byte fc = rsp[7]; (fc & 0x80).ShouldBe(0x80, $"{label}: exception bit must be set"); rsp[8].ShouldBe((byte)0x04, $"{label}: exception code must be 0x04"); } } finally { cA.Dispose(); cB.Dispose(); await pA.DisposeAsync(); await pB.DisposeAsync(); lA.Stop(); lB.Stop(); } } /// /// Backend-reader head-of-line block guard. One upstream pipe is wedged by the test /// holding its socket-receive side without reading. The fan-out is routed through /// TrySendResponse so the per-PLC backend reader cannot be stalled by a /// wedged pipe; responses to a healthy peer must keep flowing and the wedged pipe's /// responseDropForFullUpstream counter must increment. /// /// A synchronous await SendResponseAsync inside the reader would block /// on the wedged pipe's full bounded channel and starve every peer. /// [Fact] public async Task SlowUpstream_DoesNotStallPeerResponses_DropCounterIncrements() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); // Wedged client A — connects but NEVER reads from its socket. The mux's per-pipe // response channel (cap 16) plus the kernel send buffer eventually fill, then // TrySendResponse returns false and the drop counter increments. var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); // Healthy client B — will read responses normally. var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); try { // Pump A with many requests but DON'T read responses from A's socket. The // exact count to fill the channel + kernel buffer is environment-dependent; // a few hundred 12-byte responses is enough on Windows loopback. const int wedgePumpCount = 500; for (ushort i = 1; i <= wedgePumpCount; i++) { await cA.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); } // Give time for backend round-trips to fan back, channels to fill, and drops // to start happening on A. // We don't synchronously assert the drop counter yet — we want to first prove // B is still being served before doing the drop assertion, otherwise a slow // CI machine might fail the timing on B before the drops register. const ushort txB = 0xB001; await cB.SendAsync(BuildFc03ReadFrame(txB, 0, 1), SocketFlags.None); // B's response must arrive within a few hundred ms even with A wedged. If // the non-blocking enqueue path were missing, the reader would be blocked on // A's channel and B would time out. var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); ushort echoB = (ushort)((rspB[0] << 8) | rspB[1]); echoB.ShouldBe(txB, "B's TxId must be echoed; reader must not be stalled by A"); // Now poll for A's drop counter to register. The drops happen as the reader // tries to fan responses to A and the channel/socket is full; this needs // pumpCount drops minus channel-capacity (16) minus a few in transit. long drops = 0; for (int i = 0; i < 50; i++) { drops = ctx.Counters.Snapshot().ResponseDropForFullUpstream; if (drops > 0) break; await Task.Delay(50, TestContext.Current.CancellationToken); } drops.ShouldBeGreaterThan(0, "ResponseDropForFullUpstream must increment when A's bounded response channel fills"); } finally { cA.Dispose(); cB.Dispose(); await pA.DisposeAsync(); await pB.DisposeAsync(); lA.Stop(); lB.Stop(); } } /// /// Watchdog↔response race. The design uses claim-then-dispatch: /// CorrelationMap.TryRemove is the single source of truth, so exactly ONE /// of (response delivered, watchdog timeout) wins for any given proxy TxId. This /// test exercises the race window directly: a stub backend that responds at almost /// exactly the request-timeout deadline. Across many iterations the test drives /// both branches and verifies the contract: /// /// Exactly one response per request (no double-delivery). /// Every response carries the original client TxId. /// No request hangs. /// /// [Fact] public async Task WatchdogVsResponse_Race_AlwaysExactlyOneOutcome_PerRequest() { // Backend that replies after a delay we can wedge to fall close to the watchdog // deadline. Watchdog tick is max(100, RequestTimeoutMs/4); with RequestTimeoutMs // = 400 the tick is 100 ms. Configure the backend to delay 350-450 ms for each // request so some land before, some after the timeout. int backendPort = PickFreePort(); // Deterministic alternation rather than seeded Random. Random with a fixed seed is // not stable across .NET major versions (Microsoft has changed the implementation, // e.g. legacy → Xoshiro128 in .NET 6), so a runtime upgrade could land all samples // on one side of the watchdog deadline and break the "both branches must fire" // assertion below. Counter-based alternation guarantees 15 fast (350 ms, beats // watchdog) and 15 slow (450 ms, loses to watchdog) responses across 30 iterations, // regardless of runtime. int reqCount = 0; var slowBackend = new SlowResponseBackend(backendPort, () => { int n = Interlocked.Increment(ref reqCount); return (n & 1) == 1 ? 350 : 450; }); await using var _ = slowBackend; var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 }; await using var mux = await BuildMuxAsync(plc, connOpts, ctx); var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); try { const int iterations = 30; int normalResponses = 0; int timeoutResponses = 0; for (ushort i = 1; i <= iterations; i++) { await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); // Contract 1: original TxId echoed regardless of which branch won. ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); echo.ShouldBe(i, $"iteration {i}: TxId must be echoed"); // Branch detection. byte fc = rsp[7]; if ((fc & 0x80) != 0) { // Watchdog branch: exception 0x0B (Gateway Target Failed To Respond). rsp[8].ShouldBe((byte)0x0B, $"iteration {i}: watchdog must use exception code 0x0B"); timeoutResponses++; } else { // Response branch: normal FC03 response. normalResponses++; } } // Contract 2: no request hung — every iteration produced exactly one response. (normalResponses + timeoutResponses).ShouldBe(iterations); // The race should produce a mix; 30 iterations spanning 350-450 ms with a // 400 ms deadline should yield both branches in any healthy run. We assert // both outcomes were hit at least once to prove the race window is real and // both branches are exercised. normalResponses.ShouldBeGreaterThan(0, "at least one request must beat the watchdog"); timeoutResponses.ShouldBeGreaterThan(0, "at least one request must lose to the watchdog"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } /// /// Cascade racing with new accepts. Stress-test: while the backend is repeatedly /// killed and resurrected (forcing repeated cascade cycles), new upstream clients /// connect and disconnect concurrently. The contract verified is the /// no-crash-under-churn property: the multiplexer must survive arbitrary interleavings /// of teardown and new-pipe-attach without throwing into the host or leaking sockets. /// /// The race window — a new pipe added between /// _pipes.Values.ToArray() and _pipes.Clear() in TearDownBackendAsync /// — leaves the new pipe alive but orphaned from _pipes. Its read loop will /// receive normal traffic until the next cascade or its socket closes. This test /// doesn't try to reproduce that exact ordering (which is microseconds wide); it /// instead proves that arbitrary interleavings of cascade + accept under load do not /// cause an observable failure (exception, hang, or counter inconsistency). /// [Fact(Timeout = 15_000)] public async Task CascadeVsNewAccept_StressChurn_NoCrash_NoHang() { int backendPort = PickFreePort(); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); // Backend that's brought up and torn down repeatedly to force cascade cycles. StubBackend? backend = new(backendPort); const int cascadeCycles = 3; const int connectsPerCycle = 8; try { for (int cycle = 0; cycle < cascadeCycles; cycle++) { // Spawn many concurrent connect+request tasks. var tasks = Enumerable.Range(0, connectsPerCycle).Select(async _ => { Socket? client = null; UpstreamPipe? pipe = null; TcpListener? listener = null; try { (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None, TestContext.Current.CancellationToken); // Read with a short deadline; cascades will close us mid-flight. try { await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken); } catch { // Expected when our pipe is cascaded mid-request — no test failure. } } finally { try { client?.Dispose(); } catch { } try { if (pipe is not null) await pipe.DisposeAsync(); } catch { } try { listener?.Stop(); } catch { } } }).ToArray(); // Wait for the connect storm to be in flight, then kill the backend mid-storm. await Task.Delay(40, TestContext.Current.CancellationToken); await backend.DisposeAsync(); // Drain the connect tasks. await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); // Bring backend back up for the next cycle. backend = new StubBackend(backendPort); await Task.Delay(50, TestContext.Current.CancellationToken); } // Survival check — the multiplexer must still service a normal request after // all the cascade churn. var (cFinal, pFinal, lFinal, _) = await ConnectClientAsync(mux, plc.Name); try { await cFinal.SendAsync(BuildFc03ReadFrame(0xF1F1, 0, 1), SocketFlags.None, TestContext.Current.CancellationToken); var rsp = await ReadOneFrameAsync(cFinal, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); echo.ShouldBe((ushort)0xF1F1, "multiplexer must still serve traffic after cascade churn"); } finally { cFinal.Dispose(); await pFinal.DisposeAsync(); lFinal.Stop(); } } finally { try { await backend.DisposeAsync(); } catch { } } } /// /// Backend stub that delays each response by a caller-supplied amount. Used by the /// watchdog-vs-response race test. /// private sealed class SlowResponseBackend : IAsyncDisposable { private readonly TcpListener _listener; private readonly CancellationTokenSource _cts = new(); private readonly Func _delayMsFactory; private readonly List _clientTasks = new(); public SlowResponseBackend(int port, Func delayMsFactory) { _delayMsFactory = delayMsFactory; _listener = new TcpListener(IPAddress.Loopback, port); _listener.Start(); _ = AcceptLoop(); } private async Task AcceptLoop() { try { while (!_cts.IsCancellationRequested) { var s = await _listener.AcceptSocketAsync(_cts.Token); var task = Task.Run(() => HandleAsync(s)); lock (_clientTasks) _clientTasks.Add(task); } } catch { /* shutdown */ } } private async Task HandleAsync(Socket s) { try { while (!_cts.IsCancellationRequested) { var req = await ReadOneFrameAsync(s, _cts.Token); if (req.Length < 8) break; ushort txId = (ushort)((req[0] << 8) | req[1]); byte unitId = req[6]; int delayMs = _delayMsFactory(); // Schedule the response after delay; don't block this iteration so // pipelined requests can race within one client connection. _ = Task.Run(async () => { try { await Task.Delay(delayMs, _cts.Token); byte[] rsp = BuildFc03Response(txId, unitId, 0x0001); await s.SendAsync(rsp, SocketFlags.None, _cts.Token); } catch { /* shutdown / pipe down */ } }); } } catch { /* normal */ } finally { try { s.Dispose(); } catch { } } } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); try { _listener.Stop(); } catch { } Task[] snap; lock (_clientTasks) snap = _clientTasks.ToArray(); try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } _cts.Dispose(); } } }