diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs index 51c7421..c90ba80 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs @@ -756,4 +756,473 @@ public sealed class PlcMultiplexerTests cache1.Dispose(); } } + + // ── Phase 12 (W3 final-tier race tests) ────────────────────────────────── + + /// + /// Reflection helper — drains the multiplexer's TxIdAllocator by calling + /// TryAllocate in a loop until it refuses. Returns the number of slots taken + /// so callers can later release them if needed. Used only by the saturation tests + /// below; production code never touches the allocator from outside the multiplexer. + /// + private static int DrainAllocator(PlcMultiplexer mux) + { + var allocField = typeof(PlcMultiplexer).GetField("_allocator", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; + object alloc = allocField.GetValue(mux)!; + var tryAllocate = alloc.GetType().GetMethod("TryAllocate", + System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance)!; + + int taken = 0; + object?[] args = [(ushort)0]; + while ((bool)tryAllocate.Invoke(alloc, args)!) + taken++; + return taken; + } + + /// + /// W3 #5 — TxId allocator saturation propagates as a Modbus exception 04 to the + /// upstream client (no hang, no crash). The 16-bit TxId space (65,536 slots) is + /// pre-saturated via reflection so the next request hits the + /// !_allocator.TryAllocate branch in OnUpstreamFrameAsync immediately. + /// + [Fact] + public async Task TxIdAllocator_Saturated_NextRequest_GetsException04_WithOriginalTxId() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); + + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Force the multiplexer to bring up its backend so the saturation check at + // OnUpstreamFrameAsync's allocator path is the only thing left to fail. + // (We send a normal request first, drain the response, then saturate.) + await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + + int taken = DrainAllocator(mux); + // The TxId space is 65536; the previous request released its slot when the + // response landed, so we should be able to take ≥ 65535 fresh slots. + taken.ShouldBeGreaterThanOrEqualTo(65_535); + + // Send a request that MUST hit saturation. Use a non-coalescing FC (06) to + // exercise the simpler non-coalescing saturation branch directly. + const ushort txId = 0xBEEF; + await client.SendAsync(BuildFc06WriteFrame(txId, addr: 100, value: 0x1234), SocketFlags.None); + + var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + + // Validate the saturation exception: original TxId echoed, exception bit set, + // exception code 04 (SLAVE_DEVICE_FAILURE). + ushort echoTxId = (ushort)((rsp[0] << 8) | rsp[1]); + echoTxId.ShouldBe(txId, "saturation exception must echo the original client TxId"); + byte fc = rsp[7]; + (fc & 0x80).ShouldBe(0x80, "FC must have the exception bit set"); + (fc & 0x7F).ShouldBe(0x06, "original FC must be FC06"); + rsp[8].ShouldBe((byte)0x04, "exception code must be 0x04 (Slave Device Failure)"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + /// + /// W3 #6 — under TxId saturation, two concurrent identical FC03 reads must BOTH + /// receive exception 04 (one as the leader directly, the other either via a + /// coalesced fan-out from the W1.2 cleanup OR via its own independent saturation + /// path — either timing produces the same observable contract). Validates that no + /// pipe hangs forever waiting for a backend response that would never arrive. + /// + [Fact] + public async Task TxIdAllocator_Saturated_TwoConcurrentIdenticalReads_BothPipesGetException04() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); + + // Bring backend up via a primer request from a throwaway pipe. + var (primer, primerPipe, primerListener, _) = await ConnectClientAsync(mux, plc.Name); + await primer.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(primer, TestContext.Current.CancellationToken); + primer.Dispose(); + await primerPipe.DisposeAsync(); + primerListener.Stop(); + + // Saturate after primer has released its slot. + DrainAllocator(mux); + + // Two pipes, each sends the same FC03 (unitId=1, fc=03, start=200, qty=1). + var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); + var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); + try + { + const ushort txA = 0xAAA1; + const ushort txB = 0xBBB1; + // Fire concurrently; both targets the same coalescing key. + await Task.WhenAll( + cA.SendAsync(BuildFc03ReadFrame(txA, 200, 1), SocketFlags.None), + cB.SendAsync(BuildFc03ReadFrame(txB, 200, 1), SocketFlags.None)); + + var rspA = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken); + var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken); + + // Both must be exception 04 with the original TxId echoed — the W1.2 contract + // is "no late attacher hangs." + foreach (var (rsp, expectedTxId, label) in new[] { + (rspA, txA, "A"), (rspB, txB, "B") }) + { + ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); + echo.ShouldBe(expectedTxId, $"{label}: original TxId must be echoed"); + byte fc = rsp[7]; + (fc & 0x80).ShouldBe(0x80, $"{label}: exception bit must be set"); + rsp[8].ShouldBe((byte)0x04, $"{label}: exception code must be 0x04"); + } + } + finally + { + cA.Dispose(); cB.Dispose(); + await pA.DisposeAsync(); await pB.DisposeAsync(); + lA.Stop(); lB.Stop(); + } + } + + /// + /// W3 #7 — backend-reader head-of-line block. One upstream pipe is wedged by the + /// test holding its socket-receive side without reading. The W1.3 fix routes the + /// fan-out through TrySendResponse so the per-PLC backend reader cannot be + /// stalled by a wedged pipe; responses to a healthy peer must keep flowing and the + /// wedged pipe's responseDropForFullUpstream counter must increment. + /// + /// Pre-W1.3 the synchronous await SendResponseAsync inside the reader + /// would block on the wedged pipe's full bounded channel and starve every peer. + /// + [Fact] + public async Task SlowUpstream_DoesNotStallPeerResponses_DropCounterIncrements() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); + + // Wedged client A — connects but NEVER reads from its socket. The mux's per-pipe + // response channel (cap 16) plus the kernel send buffer eventually fill, then + // TrySendResponse returns false and the drop counter increments. + var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name); + // Healthy client B — will read responses normally. + var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name); + try + { + // Pump A with many requests but DON'T read responses from A's socket. The + // exact count to fill the channel + kernel buffer is environment-dependent; + // a few hundred 12-byte responses is enough on Windows loopback. + const int wedgePumpCount = 500; + for (ushort i = 1; i <= wedgePumpCount; i++) + { + await cA.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); + } + + // Give time for backend round-trips to fan back, channels to fill, and drops + // to start happening on A. + // We don't synchronously assert the drop counter yet — we want to first prove + // B is still being served before doing the drop assertion, otherwise a slow + // CI machine might fail the timing on B before the drops register. + const ushort txB = 0xB001; + await cB.SendAsync(BuildFc03ReadFrame(txB, 0, 1), SocketFlags.None); + + // B's response must arrive within a few hundred ms even with A wedged. If + // the W1.3 fix were missing, the reader would be blocked on A's channel and + // B would time out. + var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + ushort echoB = (ushort)((rspB[0] << 8) | rspB[1]); + echoB.ShouldBe(txB, "B's TxId must be echoed; reader must not be stalled by A"); + + // Now poll for A's drop counter to register. The drops happen as the reader + // tries to fan responses to A and the channel/socket is full; this needs + // pumpCount drops minus channel-capacity (16) minus a few in transit. + long drops = 0; + for (int i = 0; i < 50; i++) + { + drops = ctx.Counters.Snapshot().ResponseDropForFullUpstream; + if (drops > 0) break; + await Task.Delay(50, TestContext.Current.CancellationToken); + } + drops.ShouldBeGreaterThan(0, + "ResponseDropForFullUpstream must increment when A's bounded response channel fills"); + } + finally + { + cA.Dispose(); cB.Dispose(); + await pA.DisposeAsync(); await pB.DisposeAsync(); + lA.Stop(); lB.Stop(); + } + } + + /// + /// W3 #8 — watchdog↔response race. The W1 design uses claim-then-dispatch: + /// CorrelationMap.TryRemove is the single source of truth, so exactly ONE + /// of (response delivered, watchdog timeout) wins for any given proxy TxId. This + /// test exercises the race window directly: a stub backend that responds at almost + /// exactly the request-timeout deadline. Across many iterations the test drives + /// both branches and verifies the contract: + /// + /// Exactly one response per request (no double-delivery). + /// Every response carries the original client TxId. + /// No request hangs. + /// + /// + [Fact] + public async Task WatchdogVsResponse_Race_AlwaysExactlyOneOutcome_PerRequest() + { + // Backend that replies after a delay we can wedge to fall close to the watchdog + // deadline. Watchdog tick is max(100, RequestTimeoutMs/4); with RequestTimeoutMs + // = 400 the tick is 100 ms. Configure the backend to delay 350-450 ms for each + // request so some land before, some after the timeout. + int backendPort = PickFreePort(); + var rng = new Random(12345); + var slowBackend = new SlowResponseBackend(backendPort, () => rng.Next(350, 450)); + await using var _ = slowBackend; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 }; + await using var mux = await BuildMuxAsync(plc, connOpts, ctx); + + var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + try + { + const int iterations = 30; + int normalResponses = 0; + int timeoutResponses = 0; + + for (ushort i = 1; i <= iterations; i++) + { + await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); + var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + + // Contract 1: original TxId echoed regardless of which branch won. + ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); + echo.ShouldBe(i, $"iteration {i}: TxId must be echoed"); + + // Branch detection. + byte fc = rsp[7]; + if ((fc & 0x80) != 0) + { + // Watchdog branch: exception 0x0B (Gateway Target Failed To Respond). + rsp[8].ShouldBe((byte)0x0B, $"iteration {i}: watchdog must use exception code 0x0B"); + timeoutResponses++; + } + else + { + // Response branch: normal FC03 response. + normalResponses++; + } + } + + // Contract 2: no request hung — every iteration produced exactly one response. + (normalResponses + timeoutResponses).ShouldBe(iterations); + // The race should produce a mix; 30 iterations spanning 350-450 ms with a + // 400 ms deadline should yield both branches in any healthy run. We assert + // both outcomes were hit at least once to prove the race window is real and + // both branches are exercised. + normalResponses.ShouldBeGreaterThan(0, "at least one request must beat the watchdog"); + timeoutResponses.ShouldBeGreaterThan(0, "at least one request must lose to the watchdog"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + /// + /// W3 #9 — cascade racing with new accepts. Stress-test: while the backend is repeatedly + /// killed and resurrected (forcing repeated cascade cycles), new upstream clients + /// connect and disconnect concurrently. The contract verified is the + /// no-crash-under-churn property: the multiplexer must survive arbitrary interleavings + /// of teardown and new-pipe-attach without throwing into the host or leaking sockets. + /// + /// The originally-flagged race window — a new pipe added between + /// _pipes.Values.ToArray() and _pipes.Clear() in TearDownBackendAsync + /// — leaves the new pipe alive but orphaned from _pipes. Its read loop will + /// receive normal traffic until the next cascade or its socket closes. This test + /// doesn't try to reproduce that exact ordering (which is microseconds wide); it + /// instead proves that arbitrary interleavings of cascade + accept under load do not + /// cause an observable failure (exception, hang, or counter inconsistency). + /// + [Fact(Timeout = 15_000)] + public async Task CascadeVsNewAccept_StressChurn_NoCrash_NoHang() + { + int backendPort = PickFreePort(); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx); + + // Backend that's brought up and torn down repeatedly to force cascade cycles. + StubBackend? backend = new(backendPort); + + const int cascadeCycles = 3; + const int connectsPerCycle = 8; + + try + { + for (int cycle = 0; cycle < cascadeCycles; cycle++) + { + // Spawn many concurrent connect+request tasks. + var tasks = Enumerable.Range(0, connectsPerCycle).Select(async _ => + { + Socket? client = null; + UpstreamPipe? pipe = null; + TcpListener? listener = null; + try + { + (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name); + await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None, + TestContext.Current.CancellationToken); + // Read with a short deadline; cascades will close us mid-flight. + try + { + await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken); + } + catch + { + // Expected when our pipe is cascaded mid-request — no test failure. + } + } + finally + { + try { client?.Dispose(); } catch { } + try { if (pipe is not null) await pipe.DisposeAsync(); } catch { } + try { listener?.Stop(); } catch { } + } + }).ToArray(); + + // Wait for the connect storm to be in flight, then kill the backend mid-storm. + await Task.Delay(40, TestContext.Current.CancellationToken); + await backend.DisposeAsync(); + + // Drain the connect tasks. + await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(5), + TestContext.Current.CancellationToken); + + // Bring backend back up for the next cycle. + backend = new StubBackend(backendPort); + await Task.Delay(50, TestContext.Current.CancellationToken); + } + + // Survival check — the multiplexer must still service a normal request after + // all the cascade churn. + var (cFinal, pFinal, lFinal, _) = await ConnectClientAsync(mux, plc.Name); + try + { + await cFinal.SendAsync(BuildFc03ReadFrame(0xF1F1, 0, 1), SocketFlags.None, + TestContext.Current.CancellationToken); + var rsp = await ReadOneFrameAsync(cFinal, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + ushort echo = (ushort)((rsp[0] << 8) | rsp[1]); + echo.ShouldBe((ushort)0xF1F1, "multiplexer must still serve traffic after cascade churn"); + } + finally + { + cFinal.Dispose(); + await pFinal.DisposeAsync(); + lFinal.Stop(); + } + } + finally + { + try { await backend.DisposeAsync(); } catch { } + } + } + + /// + /// Backend stub that delays each response by a caller-supplied amount. Used by the + /// W3 #8 watchdog race test. + /// + private sealed class SlowResponseBackend : IAsyncDisposable + { + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cts = new(); + private readonly Func _delayMsFactory; + private readonly List _clientTasks = new(); + + public SlowResponseBackend(int port, Func delayMsFactory) + { + _delayMsFactory = delayMsFactory; + _listener = new TcpListener(IPAddress.Loopback, port); + _listener.Start(); + _ = AcceptLoop(); + } + + private async Task AcceptLoop() + { + try + { + while (!_cts.IsCancellationRequested) + { + var s = await _listener.AcceptSocketAsync(_cts.Token); + var task = Task.Run(() => HandleAsync(s)); + lock (_clientTasks) _clientTasks.Add(task); + } + } + catch { /* shutdown */ } + } + + private async Task HandleAsync(Socket s) + { + try + { + while (!_cts.IsCancellationRequested) + { + var req = await ReadOneFrameAsync(s, _cts.Token); + if (req.Length < 8) break; + + ushort txId = (ushort)((req[0] << 8) | req[1]); + byte unitId = req[6]; + int delayMs = _delayMsFactory(); + // Schedule the response after delay; don't block this iteration so + // pipelined requests can race within one client connection. + _ = Task.Run(async () => + { + try + { + await Task.Delay(delayMs, _cts.Token); + byte[] rsp = BuildFc03Response(txId, unitId, 0x0001); + await s.SendAsync(rsp, SocketFlags.None, _cts.Token); + } + catch { /* shutdown / pipe down */ } + }); + } + } + catch { /* normal */ } + finally { try { s.Dispose(); } catch { } } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + try { _listener.Stop(); } catch { } + Task[] snap; + lock (_clientTasks) snap = _clientTasks.ToArray(); + try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } + _cts.Dispose(); + } + } }