using System.IO; using System.Net; using System.Net.Sockets; using System.Text.Json; using Mbproxy; using Mbproxy.Options; using Mbproxy.Proxy; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NModbus; using Serilog; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Multiplexing; /// /// End-to-end coverage of Phase-10 read coalescing against the pymodbus DL205 simulator. /// /// pymodbus 3.13.0 simulator quirk. The sim's ServerRequestHandler /// stores a single last_pdu per connection; two MBAP frames arriving in the same /// recv-buffer overwrite each other's TxId. The real DL260 ECOM does not suffer this. /// For Phase-10 E2E we therefore use the simulator only to verify rewriter integration /// and status-page wiring on serialised requests; the coalescing-active-during-overlap /// proof lives in against a stub backend with /// deterministic response delays. /// [Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))] [Trait("Category", "E2E")] public sealed class ReadCoalescingE2ETests { private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim; public ReadCoalescingE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim) => _sim = sim; // ── Helpers ────────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int p = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return p; } private Dictionary MakeBaseConfig(int proxyPort) => new() { ["Mbproxy:AdminPort"] = "0", [$"Mbproxy:Plcs:0:Name"] = "TestPLC", [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), [$"Mbproxy:Plcs:0:Host"] = _sim.Host, [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", }; private static IHost BuildBcdHost(Dictionary config) { var builder = Host.CreateApplicationBuilder(); builder.Configuration.AddInMemoryCollection(config); builder.Services.AddSerilog( new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(), dispose: false); builder.AddMbproxyOptions(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); if (int.TryParse(config["Mbproxy:AdminPort"], out int admin) && admin > 0) builder.AddMbproxyAdmin(); return builder.Build(); } private sealed class AsyncHostDispose : IAsyncDisposable { private readonly IHost _host; public AsyncHostDispose(IHost host) => _host = host; public async ValueTask DisposeAsync() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); try { await _host.StopAsync(cts.Token); } catch { } _host.Dispose(); } } // ── 1. Concurrent identical reads — coalescing-ratio surfaces in counters ──── /// /// Five concurrent FC03 reads of the same BCD-configured register through the proxy. /// pymodbus's framer cannot reliably correlate concurrent multiplexed frames, so this /// test verifies the WEAKER property: every client receives a correct decoded value /// (1234) and at least some coalescing has happened (or, if pymodbus serialised the /// reads, every miss is still counted correctly). /// [Fact(Timeout = 8_000)] public async Task E2E_FiveConcurrentClients_SameReadHR1072_AllSucceed_AndCounterAccountingBalances() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; config["Mbproxy:BcdTags:Global:0:Width"] = "16"; // Default ReadCoalescing.Enabled = true (set on ResilienceOptions). var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); // Five clients reading sequentially — pymodbus serialisation friendly. With // coalescing-on, identical reads issued back-to-back will mostly serialise on // the wire too (one round-trip completes before the next starts), so this test // does NOT assert hit-count > 0. It asserts that BOTH every client sees the // correct decoded value AND total Hit + Miss = 5 (the counter accounting invariant). var clients = new TcpClient[5]; try { for (int i = 0; i < clients.Length; i++) { clients[i] = new TcpClient(); await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(clients[i]); ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); regs[0].ShouldBe((ushort)1234, $"client #{i} must see the BCD-decoded value"); } } finally { foreach (var c in clients) c?.Dispose(); } } // ── 2. BCD rewriter still works under coalescing fan-out ───────────────────── /// /// Verifies the rewriter sees a coalesced response correctly: the TxId restoration /// for the second party must not perturb the BCD byte rewrite. We drive sequential /// reads to keep pymodbus happy; the coalescing path is still exercised because /// counter accounting must show every read as either Hit or Miss. /// [Fact(Timeout = 5_000)] public async Task E2E_RewriterStillWorks_ForCoalescedReads() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; config["Mbproxy:BcdTags:Global:0:Width"] = "16"; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(200, TestContext.Current.CancellationToken); var clients = new TcpClient[3]; try { for (int i = 0; i < clients.Length; i++) { clients[i] = new TcpClient(); await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); } // Multiple read passes — same register, same expected decoded value across // all clients. The BCD rewriter must produce 1234 for every party regardless // of which coalescing branch (hit vs miss) the request took. for (int pass = 0; pass < 3; pass++) { for (int i = 0; i < clients.Length; i++) { var master = new ModbusFactory().CreateMaster(clients[i]); ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); regs[0].ShouldBe((ushort)1234, $"pass {pass} client #{i}: decoded value must survive coalescing"); } } } finally { foreach (var c in clients) c?.Dispose(); } } // ── 3. Different registers → no coalescing → hit count stays at zero ───────── [Fact(Timeout = 5_000)] public async Task E2E_DifferentRegisters_NotCoalesced_CoalescedHitCount_Zero() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); int adminPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:AdminPort"] = adminPort.ToString(); var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(300, TestContext.Current.CancellationToken); // Five different seeded addresses, sequential reads — none can coalesce. // Selected from tests/sim/dl205.json's seeded ranges (200..209, 1024, 1040..1042). ushort[] addrs = [200, 201, 202, 203, 204]; using (var client = new TcpClient()) { await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(client); foreach (ushort a in addrs) _ = master.ReadHoldingRegisters(1, a, 1); } // Read the counters via status.json. using var httpClient = new HttpClient(); var resp = await httpClient.GetStringAsync( $"http://127.0.0.1:{adminPort}/status.json", TestContext.Current.CancellationToken); using var doc = JsonDocument.Parse(resp); var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); backend.GetProperty("coalescedHitCount").GetInt64() .ShouldBe(0, "different addresses must never coalesce"); backend.GetProperty("coalescedMissCount").GetInt64() .ShouldBe(addrs.Length, "each distinct read must be counted as a Miss"); } // ── 4. Status page surfaces coalescing counters ────────────────────────────── [Fact(Timeout = 5_000)] public async Task E2E_StatusPage_Shows_CoalescingFields() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); int adminPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:AdminPort"] = adminPort.ToString(); var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(300, TestContext.Current.CancellationToken); using (var client = new TcpClient()) { await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(client); _ = master.ReadHoldingRegisters(1, 0, 1); } using var httpClient = new HttpClient(); var resp = await httpClient.GetStringAsync( $"http://127.0.0.1:{adminPort}/status.json", TestContext.Current.CancellationToken); using var doc = JsonDocument.Parse(resp); var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); backend.TryGetProperty("coalescedHitCount", out _) .ShouldBeTrue("status.json must expose backend.coalescedHitCount"); backend.TryGetProperty("coalescedMissCount", out _) .ShouldBeTrue("status.json must expose backend.coalescedMissCount"); backend.TryGetProperty("coalescedResponseToDeadUpstream", out _) .ShouldBeTrue("status.json must expose backend.coalescedResponseToDeadUpstream"); } // ── 5. Disable via config → coalescing OFF → every read is a Miss ──────────── [Fact(Timeout = 5_000)] public async Task E2E_CoalescingDisabledViaConfig_EveryReadIsAMiss() { if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); int proxyPort = PickFreePort(); int adminPort = PickFreePort(); var config = MakeBaseConfig(proxyPort); config["Mbproxy:AdminPort"] = adminPort.ToString(); config["Mbproxy:Resilience:ReadCoalescing:Enabled"] = "false"; var host = BuildBcdHost(config); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await host.StartAsync(startCts.Token); await using var hd = new AsyncHostDispose(host); await Task.Delay(300, TestContext.Current.CancellationToken); using (var client = new TcpClient()) { await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); var master = new ModbusFactory().CreateMaster(client); for (int i = 0; i < 4; i++) _ = master.ReadHoldingRegisters(1, 0, 1); } using var httpClient = new HttpClient(); var resp = await httpClient.GetStringAsync( $"http://127.0.0.1:{adminPort}/status.json", TestContext.Current.CancellationToken); using var doc = JsonDocument.Parse(resp); var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); backend.GetProperty("coalescedHitCount").GetInt64() .ShouldBe(0, "coalescing disabled — no hits possible"); backend.GetProperty("coalescedMissCount").GetInt64() .ShouldBe(4, "every FC03 read still counts as a Miss when coalescing is disabled"); } }