using System.Net; using System.Net.Sockets; using System.Text.Json; using Mbproxy; using Mbproxy.Options; using Mbproxy.Proxy; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NModbus; using Serilog; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Multiplexing; /// /// End-to-end tests for the Phase-9 TxId multiplexer against the pymodbus DL205 simulator. /// /// pymodbus 3.13.0 simulator quirk. The simulator's ServerRequestHandler /// stores a single last_pdu field per TCP connection and schedules /// handle_later via asyncio.call_soon. If two MBAP frames arrive in the same /// recv-buffer (which the multiplexer can cause on a shared backend connection), the /// later frame overwrites last_pdu before the first scheduled handler runs, /// and both responses then carry the same TxId. The real DL260 ECOM does not suffer this /// quirk (it properly echoes per-request MBAP TxIds), so this is purely a simulator /// limitation — the multiplexer's TxId rewriting is verified end-to-end against a stub /// backend in . /// /// Test strategy here: exercise the connection-cap lift (>4 simultaneous /// upstream clients) and the BCD-rewriter integration against a real PLC-shaped backend, /// but issue requests on each client after the previous client's response has /// returned so the proxy's shared backend conn does not pump concurrent frames into /// pymodbus's broken framer. Mux correctness under truly concurrent backend traffic is /// proven against the stub backend in . /// /// The per-request watchdog (BackendRequestTimeoutMs) in /// defends against pymodbus's bug /// in production by surfacing a Modbus exception 0x0B back to upstream clients after the /// configured timeout — see for the unit coverage. /// [Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))] [Trait("Category", "E2E")] public sealed class MultiplexerE2ETests { private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim; public MultiplexerE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim) => _sim = sim; // ── E2E 1: Five simultaneous upstream clients (connection-cap lift) ────────────── /// /// Headline test: prove that the multiplexer accepts the 5th upstream client on the /// same proxy port — a 1:1 model would have failed at backend connect (H2-ECOM100 /// cap = 4). Each client's request is serialised behind the previous client's response /// so the pymodbus 3.13 simulator's concurrent-frame bug never triggers; the /// multiplexer's connection ceiling, not its under-concurrency behaviour, is what /// this test proves. /// [Fact(Timeout = 5_000)] public async Task E2E_FiveSimultaneousClients_AllReadHR1072_AllGetDecoded_1234() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); var config = new Dictionary { ["Mbproxy:AdminPort"] = "0", [$"Mbproxy:Plcs:0:Name"] = "TestPLC", [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), [$"Mbproxy:Plcs:0:Host"] = _sim.Host, [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", ["Mbproxy:BcdTags:Global:0:Address"] = "1072", ["Mbproxy:BcdTags:Global:0:Width"] = "16", }; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); // Open five simultaneous TCP connections to the proxy first (under a 1:1 model // each would have needed a dedicated backend socket, blowing through the // 4-client cap). var clients = new TcpClient[5]; try { for (int i = 0; i < clients.Length; i++) { clients[i] = new TcpClient(); await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); } // Now issue one read on each client, serialised. The serialisation keeps // pymodbus 3.13's framer in known-good single-PDU mode. for (int i = 0; i < clients.Length; i++) { var master = new ModbusFactory().CreateMaster(clients[i]); ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); regs[0].ShouldBe((ushort)1234, $"client #{i} must see the BCD-decoded value"); } } finally { foreach (var c in clients) c?.Dispose(); } } // ── E2E 2: Many sequential requests through 3 clients ──────────────────────────── /// /// Issue 21 sequential FC03 requests round-robined across three clients. Validates /// per-pipe forwarding, allocator re-use, and counter increments under a sustained /// (if not parallel) load through the multiplexed backend connection. /// [Fact(Timeout = 5_000)] public async Task E2E_TwentyOneSequential_FC03_Requests_AcrossThreeClients_AllSucceed() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); var clients = new TcpClient[3]; var masters = new IModbusMaster[3]; try { for (int i = 0; i < clients.Length; i++) { clients[i] = new TcpClient(); await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); masters[i] = new ModbusFactory().CreateMaster(clients[i]); } // 21 requests round-robin across 3 clients. Serialised so no two requests are // simultaneously in flight on the multiplexer's shared backend connection. int ok = 0; for (int i = 0; i < 21; i++) { _ = masters[i % 3].ReadHoldingRegisters(1, 0, 1); ok++; } ok.ShouldBe(21); } finally { foreach (var c in clients) c?.Dispose(); } } // ── E2E 3: BCD rewriter still works through the multiplexed model ──────────────── /// /// Three clients, each writing a different decimal value to a different BCD-configured /// address via FC06 and reading it back. Proves the rewriter and the multiplexer's /// per-request threading /// preserve BCD encoding round-trips across multiple multiplexed clients. /// [Fact(Timeout = 5_000)] public async Task E2E_RewriterStillWorks_UnderMultiplexedThreeClients() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); // Configure three BCD addresses each width 16 for FC06 writes. The sim profile's // writable HR range is [200..209] (see tests/sim/dl205.json's "write" list); reads // outside that range succeed but writes return exception 02. We use 200/202/204. var config = new Dictionary { ["Mbproxy:AdminPort"] = "0", [$"Mbproxy:Plcs:0:Name"] = "TestPLC", [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), [$"Mbproxy:Plcs:0:Host"] = _sim.Host, [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", ["Mbproxy:BcdTags:Global:0:Address"] = "200", ["Mbproxy:BcdTags:Global:0:Width"] = "16", ["Mbproxy:BcdTags:Global:1:Address"] = "202", ["Mbproxy:BcdTags:Global:1:Width"] = "16", ["Mbproxy:BcdTags:Global:2:Address"] = "204", ["Mbproxy:BcdTags:Global:2:Width"] = "16", }; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); (ushort addr, ushort val)[] cases = [ (200, 1234), (202, 5678), (204, 9999), ]; var clients = new TcpClient[3]; try { for (int i = 0; i < clients.Length; i++) { clients[i] = new TcpClient(); await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); } // Serialised across clients so pymodbus only sees one frame at a time. for (int i = 0; i < cases.Length; i++) { var master = new ModbusFactory().CreateMaster(clients[i]); master.WriteSingleRegister(1, cases[i].addr, cases[i].val); ushort[] regs = master.ReadHoldingRegisters(1, cases[i].addr, 1); regs[0].ShouldBe(cases[i].val, $"BCD round-trip for addr {cases[i].addr} via client #{i} must preserve the client's binary value"); } } finally { foreach (var c in clients) c?.Dispose(); } } // ── E2E 4: Status page reflects multiplexer state ──────────────────────────────── /// /// Verifies that the status JSON surfaces the new Phase-9 mux fields: inFlight, /// maxInFlight, txIdWraps, disconnectCascades, queueDepth. /// [Fact(Timeout = 5_000)] public async Task E2E_StatusPage_Shows_InFlightAndMaxInFlight() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); int adminPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:AdminPort"] = adminPort.ToString(); var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(400, TestContext.Current.CancellationToken); // Drive a handful of sequential reads to bump maxInFlight ≥ 1. using (var client = new TcpClient()) { await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(client); for (int i = 0; i < 5; i++) _ = master.ReadHoldingRegisters(1, 0, 1); } // Now read /status.json and assert the new fields exist and maxInFlight ≥ 1. using var httpClient = new HttpClient(); var resp = await httpClient.GetStringAsync( $"http://127.0.0.1:{adminPort}/status.json", TestContext.Current.CancellationToken); using var doc = JsonDocument.Parse(resp); var plc = doc.RootElement.GetProperty("plcs")[0]; var backend = plc.GetProperty("backend"); backend.TryGetProperty("inFlight", out _).ShouldBeTrue("status.json must expose backend.inFlight"); backend.TryGetProperty("maxInFlight", out _).ShouldBeTrue("status.json must expose backend.maxInFlight"); backend.TryGetProperty("txIdWraps", out _).ShouldBeTrue("status.json must expose backend.txIdWraps"); backend.TryGetProperty("disconnectCascades", out _).ShouldBeTrue("status.json must expose backend.disconnectCascades"); backend.TryGetProperty("queueDepth", out _).ShouldBeTrue("status.json must expose backend.queueDepth"); backend.GetProperty("maxInFlight").GetInt64() .ShouldBeGreaterThanOrEqualTo(1, "at least one request must have been in flight during the burst"); } // ── E2E 5: Backend disconnect cascade + recovery (uses stub backend, not pymodbus) ─ /// /// Backend disconnect cascade behaviour. Uses a stand-in stub backend rather than the /// pymodbus simulator so we can kill the backend mid-flight without disturbing the /// shared simulator fixture, AND so we are not subject to pymodbus 3.13's /// concurrent-frame quirk for the multi-client-in-flight scenario. /// /// Timeout is 8 s (above the 5 s default) because the test exercises three sequential /// upstream-client connects + a Polly-paced backend reconnect, which intentionally /// includes 50/100/200/500/1000 ms backoffs. /// [Fact(Timeout = 8_000)] public async Task E2E_BackendDisconnect_DuringInflight_CascadesUpstream_AndRecovers() { // This test uses a stand-in stub backend (not the pymodbus sim) so we can kill // the backend mid-flight without disturbing the shared simulator fixture. int backendPort = PickFreePort(); var listener = new TcpListener(IPAddress.Loopback, backendPort); listener.Start(); var serverCts = new CancellationTokenSource(); var serverToken = serverCts.Token; _ = Task.Run(async () => { try { while (!serverToken.IsCancellationRequested) { var s = await listener.AcceptSocketAsync(serverToken); _ = Task.Run(async () => { try { // Drain forever — never respond. Test will kill us shortly. var buf = new byte[256]; while (!serverToken.IsCancellationRequested) { int n = await s.ReceiveAsync(buf, SocketFlags.None, serverToken); if (n == 0) break; } } catch { } finally { try { s.Dispose(); } catch { } } }, serverToken); } } catch { } }, serverToken); int proxyPort = PickFreePort(); var config = new Dictionary { ["Mbproxy:AdminPort"] = "0", [$"Mbproxy:Plcs:0:Name"] = "Stub", [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), [$"Mbproxy:Plcs:0:Host"] = "127.0.0.1", [$"Mbproxy:Plcs:0:Port"] = backendPort.ToString(), ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", // Long request timeout so the watchdog doesn't fire during the test's wait window. ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "30000", // Aggressive backend retry so the second connect happens fast. ["Mbproxy:Resilience:BackendConnect:MaxAttempts"] = "5", ["Mbproxy:Resilience:BackendConnect:BackoffMs:0"] = "50", ["Mbproxy:Resilience:BackendConnect:BackoffMs:1"] = "100", ["Mbproxy:Resilience:BackendConnect:BackoffMs:2"] = "200", ["Mbproxy:Resilience:BackendConnect:BackoffMs:3"] = "500", ["Mbproxy:Resilience:BackendConnect:BackoffMs:4"] = "1000", }; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); try { // Connect three clients and start a request from each. var clients = new List(); try { for (int i = 0; i < 3; i++) { var c = new TcpClient(); await c.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); await c.GetStream().WriteAsync(BuildRawFc03((ushort)(0x1000 + i), 0, 1), TestContext.Current.CancellationToken); clients.Add(c); } // Kill the backend. await serverCts.CancelAsync(); listener.Stop(); // All three should observe a clean EOF. foreach (var c in clients) { var buf = new byte[1]; using var d = new CancellationTokenSource(TimeSpan.FromSeconds(2)); int n; try { n = await c.GetStream().ReadAsync(buf.AsMemory(), d.Token); } catch { n = 0; } n.ShouldBe(0, "upstream must observe a clean EOF after backend cascade"); } } finally { foreach (var c in clients) c.Dispose(); } // Relaunch the stub backend on the same port. var newListener = new TcpListener(IPAddress.Loopback, backendPort); newListener.Start(); using var newServerCts = new CancellationTokenSource(); var newServerToken = newServerCts.Token; _ = Task.Run(async () => { try { var s = await newListener.AcceptSocketAsync(newServerToken); var buf = new byte[256]; while (!newServerToken.IsCancellationRequested) { int n = await s.ReceiveAsync(buf, SocketFlags.None, newServerToken); if (n == 0) break; } } catch { } }, newServerToken); try { // A new upstream client should successfully connect through the multiplexer // (the multiplexer's backend connect logic will retry through Polly). using var clientD = new TcpClient(); await clientD.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); // The write triggers backend reconnect. await clientD.GetStream().WriteAsync( BuildRawFc03(0x2000, 0, 1), TestContext.Current.CancellationToken); // We don't expect a response from our drain-only stub — just verify the // multiplexer didn't drop the upstream socket immediately. await Task.Delay(300, TestContext.Current.CancellationToken); clientD.Connected.ShouldBeTrue("upstream socket should remain open after backend reconnect"); } finally { await newServerCts.CancelAsync(); newListener.Stop(); } } finally { try { serverCts.Dispose(); } catch { } } } // ── E2E 6: Backend keepalive heartbeat keeps an idle connection warm ───────────── /// /// With keepalive enabled, an idle backend connection receives periodic FC03 heartbeat /// probes. This test idles a simulator-backed connection past /// BackendHeartbeatIdleMs, verifies backendHeartbeatsSent climbs on the /// status page, and confirms a later real read still round-trips on the same /// (un-cascaded) connection. /// [Fact(Timeout = 8_000)] public async Task E2E_Keepalive_IdleBackend_ReceivesHeartbeats_AndStaysUsable() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); int adminPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:AdminPort"] = adminPort.ToString(); // Short idle window so the heartbeat fires several times within the test budget. config["Mbproxy:Connection:Keepalive:Enabled"] = "true"; config["Mbproxy:Connection:Keepalive:BackendHeartbeatIdleMs"] = "700"; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); using (var client = new TcpClient()) { await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(client); // One read brings the backend up and starts the heartbeat loop. _ = master.ReadHoldingRegisters(1, 0, 1); // Idle the connection so the heartbeat loop fires repeatedly. await Task.Delay(2500, TestContext.Current.CancellationToken); // A later read still succeeds — the connection was never cascaded. ushort[] regs = master.ReadHoldingRegisters(1, 0, 1); regs.Length.ShouldBe(1, "the idle-then-active connection must still serve reads"); } using var httpClient = new HttpClient(); var resp = await httpClient.GetStringAsync( $"http://127.0.0.1:{adminPort}/status.json", TestContext.Current.CancellationToken); using var doc = JsonDocument.Parse(resp); var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); backend.TryGetProperty("backendHeartbeatsSent", out _) .ShouldBeTrue("status.json must expose backend.backendHeartbeatsSent"); backend.GetProperty("backendHeartbeatsSent").GetInt64() .ShouldBeGreaterThanOrEqualTo(1, "an idle backend must have received at least one heartbeat"); backend.GetProperty("backendHeartbeatsFailed").GetInt64() .ShouldBe(0, "every heartbeat against the live simulator must be answered"); backend.GetProperty("backendIdleDisconnects").GetInt64() .ShouldBe(0, "an answered heartbeat must never tear the backend down"); } // ── Helpers ────────────────────────────────────────────────────────────────────── private Dictionary MakeBaseConfig(int proxyPort) => new() { ["Mbproxy:AdminPort"] = "0", [$"Mbproxy:Plcs:0:Name"] = "TestPLC", [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), [$"Mbproxy:Plcs:0:Host"] = _sim.Host, [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", }; private static IHost BuildBcdHost(Dictionary config) { var builder = Host.CreateApplicationBuilder(); builder.Configuration.AddInMemoryCollection(config); builder.Services.AddSerilog( new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(), dispose: false); builder.AddMbproxyOptions(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); if (int.TryParse(config["Mbproxy:AdminPort"], out int admin) && admin > 0) builder.AddMbproxyAdmin(); return builder.Build(); } private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int p = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return p; } private static byte[] BuildRawFc03(ushort txId, ushort start, ushort qty, byte unit = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unit, 0x03, (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; private sealed class AsyncHostDispose : IAsyncDisposable { private readonly IHost _host; public AsyncHostDispose(IHost host) => _host = host; public async ValueTask DisposeAsync() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); try { await _host.StopAsync(cts.Token); } catch { } _host.Dispose(); } } }