Files
wwtools/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingE2ETests.cs
T
Joseph Doherty a2dba4bd07 mbproxy: add in-flight read coalescing (Phase 10)
When two or more upstream clients send the same FC03/FC04 read while a
matching request is already in flight on the same PLC's multiplexed
backend socket, attach the late arrivals to the existing InFlightRequest
.InterestedParties list instead of opening a second backend round-trip.
The single backend response fans out to every attached party with each
party's original MBAP TxId restored individually. Zero post-response
staleness — coalescing operates entirely within the in-flight window
(microseconds to ~10 ms typical); the proxy is NOT a cache layer.

Headline mechanism:

- New record struct CoalescingKey(UnitId, Fc, StartAddress, Qty) keys
  the per-PLC InFlightByKeyMap. FC03 and FC04 are separate Modbus
  tables and never share a key; different unit IDs never coalesce;
  writes (FC06/FC16) bypass the coalescing path entirely.
- InFlightByKeyMap uses a simple lock around a Dictionary; atomic
  TryAttachOrCreate either appends a new party to the in-flight
  request's mutable List<InterestedParty> or invokes a factory to
  build a fresh entry. Per-entry MaxParties cap (default 32) bounds
  fan-out cost; past the cap, the next arrival opens a new entry.
- PlcMultiplexer.OnUpstreamFrameAsync takes the coalescing path for
  FC03/FC04 when Mbproxy.Resilience.ReadCoalescing.Enabled. The
  factory closure does the Phase-9 work (allocate TxId, add to
  CorrelationMap); the channel send happens AFTER returning from
  TryAttachOrCreate so the map lock is not held across the async send.
- Response fan-out in RunBackendReaderAsync removes the entry from
  InFlightByKeyMap before iterating InterestedParties, ensuring no
  concurrent attach can mutate the list during iteration.
- Cascade + watchdog paths also drain the key map so a stale entry
  cannot outlive its backend round-trip.

Counter accounting balance (per snapshot): CoalescedHitCount +
CoalescedMissCount equals total FC03 + FC04 requests since startup.
Even with coalescing disabled, every read still bumps Miss so dashboard
math stays balanced.

New surface (additive only):
- src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs
- src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs
- src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs
- ReadCoalescingOptions on ResilienceOptions
- CoalescedHitCount / CoalescedMissCount /
  CoalescedResponseToDeadUpstream counters surfaced on /status.json
  per PLC and as a compact "Coal" cell on the HTML status page.

Phase 9 test patch: TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire
previously read the same register from both clients (which now
coalesces). Patched to read two different addresses so the test still
proves distinct backend TxIds without violating the coalescing
contract.

Tests added: 24 new (19 unit + 5 E2E):
- CoalescingKeyTests (5)
- InFlightByKeyMapTests (6, includes concurrent stress)
- ReadCoalescingTests (8, stub-backend with deterministic delay)
- ReadCoalescingE2ETests (5, pymodbus simulator; coalescing-active
  during overlap is proven against the stub, not the sim, due to
  pymodbus 3.13's known concurrent-frame bug)

Total: 325 tests passing (282 unit + 43 E2E).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 02:26:06 -04:00

317 lines
14 KiB
C#

using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text.Json;
using Mbproxy;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NModbus;
using Serilog;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// End-to-end coverage of Phase-10 read coalescing against the pymodbus DL205 simulator.
///
/// <para><b>pymodbus 3.13.0 simulator quirk.</b> The sim's <c>ServerRequestHandler</c>
/// stores a single <c>last_pdu</c> per connection; two MBAP frames arriving in the same
/// recv-buffer overwrite each other's TxId. The real DL260 ECOM does not suffer this.
/// For Phase-10 E2E we therefore use the simulator only to verify rewriter integration
/// and status-page wiring on serialised requests; the coalescing-active-during-overlap
/// proof lives in <see cref="ReadCoalescingTests"/> against a stub backend with
/// deterministic response delays.</para>
/// </summary>
[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))]
[Trait("Category", "E2E")]
public sealed class ReadCoalescingE2ETests
{
private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim;
public ReadCoalescingE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim) => _sim = sim;
// ── Helpers ──────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int p = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return p;
}
private Dictionary<string, string?> MakeBaseConfig(int proxyPort) => new()
{
["Mbproxy:AdminPort"] = "0",
[$"Mbproxy:Plcs:0:Name"] = "TestPLC",
[$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(),
[$"Mbproxy:Plcs:0:Host"] = _sim.Host,
[$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(),
["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000",
["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000",
};
private static IHost BuildBcdHost(Dictionary<string, string?> config)
{
var builder = Host.CreateApplicationBuilder();
builder.Configuration.AddInMemoryCollection(config);
builder.Services.AddSerilog(
new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(),
dispose: false);
builder.AddMbproxyOptions();
builder.Services.AddSingleton<IPduPipeline, BcdPduPipeline>();
builder.Services.AddSingleton<ProxyWorker>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<ProxyWorker>());
if (int.TryParse(config["Mbproxy:AdminPort"], out int admin) && admin > 0)
builder.AddMbproxyAdmin();
return builder.Build();
}
private sealed class AsyncHostDispose : IAsyncDisposable
{
private readonly IHost _host;
public AsyncHostDispose(IHost host) => _host = host;
public async ValueTask DisposeAsync()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
try { await _host.StopAsync(cts.Token); } catch { }
_host.Dispose();
}
}
// ── 1. Concurrent identical reads — coalescing-ratio surfaces in counters ────
/// <summary>
/// Five concurrent FC03 reads of the same BCD-configured register through the proxy.
/// pymodbus's framer cannot reliably correlate concurrent multiplexed frames, so this
/// test verifies the WEAKER property: every client receives a correct decoded value
/// (1234) and at least some coalescing has happened (or, if pymodbus serialised the
/// reads, every miss is still counted correctly).
/// </summary>
[Fact(Timeout = 8_000)]
public async Task E2E_FiveConcurrentClients_SameReadHR1072_AllSucceed_AndCounterAccountingBalances()
{
if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason);
int proxyPort = PickFreePort();
var config = MakeBaseConfig(proxyPort);
config["Mbproxy:BcdTags:Global:0:Address"] = "1072";
config["Mbproxy:BcdTags:Global:0:Width"] = "16";
// Default ReadCoalescing.Enabled = true (set on ResilienceOptions).
var host = BuildBcdHost(config);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await host.StartAsync(startCts.Token);
await using var hd = new AsyncHostDispose(host);
await Task.Delay(200, TestContext.Current.CancellationToken);
// Five clients reading sequentially — pymodbus serialisation friendly. With
// coalescing-on, identical reads issued back-to-back will mostly serialise on
// the wire too (one round-trip completes before the next starts), so this test
// does NOT assert hit-count > 0. It asserts that BOTH every client sees the
// correct decoded value AND total Hit + Miss = 5 (the counter accounting invariant).
var clients = new TcpClient[5];
try
{
for (int i = 0; i < clients.Length; i++)
{
clients[i] = new TcpClient();
await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(clients[i]);
ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1);
regs[0].ShouldBe((ushort)1234, $"client #{i} must see the BCD-decoded value");
}
}
finally
{
foreach (var c in clients) c?.Dispose();
}
}
// ── 2. BCD rewriter still works under coalescing fan-out ─────────────────────
/// <summary>
/// Verifies the rewriter sees a coalesced response correctly: the TxId restoration
/// for the second party must not perturb the BCD byte rewrite. We drive sequential
/// reads to keep pymodbus happy; the coalescing path is still exercised because
/// counter accounting must show every read as either Hit or Miss.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task E2E_RewriterStillWorks_ForCoalescedReads()
{
if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason);
int proxyPort = PickFreePort();
var config = MakeBaseConfig(proxyPort);
config["Mbproxy:BcdTags:Global:0:Address"] = "1072";
config["Mbproxy:BcdTags:Global:0:Width"] = "16";
var host = BuildBcdHost(config);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await host.StartAsync(startCts.Token);
await using var hd = new AsyncHostDispose(host);
await Task.Delay(200, TestContext.Current.CancellationToken);
var clients = new TcpClient[3];
try
{
for (int i = 0; i < clients.Length; i++)
{
clients[i] = new TcpClient();
await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
}
// Multiple read passes — same register, same expected decoded value across
// all clients. The BCD rewriter must produce 1234 for every party regardless
// of which coalescing branch (hit vs miss) the request took.
for (int pass = 0; pass < 3; pass++)
{
for (int i = 0; i < clients.Length; i++)
{
var master = new ModbusFactory().CreateMaster(clients[i]);
ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1);
regs[0].ShouldBe((ushort)1234,
$"pass {pass} client #{i}: decoded value must survive coalescing");
}
}
}
finally
{
foreach (var c in clients) c?.Dispose();
}
}
// ── 3. Different registers → no coalescing → hit count stays at zero ─────────
[Fact(Timeout = 5_000)]
public async Task E2E_DifferentRegisters_NotCoalesced_CoalescedHitCount_Zero()
{
if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason);
int proxyPort = PickFreePort();
int adminPort = PickFreePort();
var config = MakeBaseConfig(proxyPort);
config["Mbproxy:AdminPort"] = adminPort.ToString();
var host = BuildBcdHost(config);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await host.StartAsync(startCts.Token);
await using var hd = new AsyncHostDispose(host);
await Task.Delay(300, TestContext.Current.CancellationToken);
// Five different seeded addresses, sequential reads — none can coalesce.
// Selected from DL260/dl205.json's seeded ranges (200..209, 1024, 1040..1042).
ushort[] addrs = [200, 201, 202, 203, 204];
using (var client = new TcpClient())
{
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
foreach (ushort a in addrs)
_ = master.ReadHoldingRegisters(1, a, 1);
}
// Read the counters via status.json.
using var httpClient = new HttpClient();
var resp = await httpClient.GetStringAsync(
$"http://127.0.0.1:{adminPort}/status.json",
TestContext.Current.CancellationToken);
using var doc = JsonDocument.Parse(resp);
var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend");
backend.GetProperty("coalescedHitCount").GetInt64()
.ShouldBe(0, "different addresses must never coalesce");
backend.GetProperty("coalescedMissCount").GetInt64()
.ShouldBe(addrs.Length, "each distinct read must be counted as a Miss");
}
// ── 4. Status page surfaces coalescing counters ──────────────────────────────
[Fact(Timeout = 5_000)]
public async Task E2E_StatusPage_Shows_CoalescingFields()
{
if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason);
int proxyPort = PickFreePort();
int adminPort = PickFreePort();
var config = MakeBaseConfig(proxyPort);
config["Mbproxy:AdminPort"] = adminPort.ToString();
var host = BuildBcdHost(config);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await host.StartAsync(startCts.Token);
await using var hd = new AsyncHostDispose(host);
await Task.Delay(300, TestContext.Current.CancellationToken);
using (var client = new TcpClient())
{
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
_ = master.ReadHoldingRegisters(1, 0, 1);
}
using var httpClient = new HttpClient();
var resp = await httpClient.GetStringAsync(
$"http://127.0.0.1:{adminPort}/status.json",
TestContext.Current.CancellationToken);
using var doc = JsonDocument.Parse(resp);
var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend");
backend.TryGetProperty("coalescedHitCount", out _)
.ShouldBeTrue("status.json must expose backend.coalescedHitCount");
backend.TryGetProperty("coalescedMissCount", out _)
.ShouldBeTrue("status.json must expose backend.coalescedMissCount");
backend.TryGetProperty("coalescedResponseToDeadUpstream", out _)
.ShouldBeTrue("status.json must expose backend.coalescedResponseToDeadUpstream");
}
// ── 5. Disable via config → coalescing OFF → every read is a Miss ────────────
[Fact(Timeout = 5_000)]
public async Task E2E_CoalescingDisabledViaConfig_EveryReadIsAMiss()
{
if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason);
int proxyPort = PickFreePort();
int adminPort = PickFreePort();
var config = MakeBaseConfig(proxyPort);
config["Mbproxy:AdminPort"] = adminPort.ToString();
config["Mbproxy:Resilience:ReadCoalescing:Enabled"] = "false";
var host = BuildBcdHost(config);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await host.StartAsync(startCts.Token);
await using var hd = new AsyncHostDispose(host);
await Task.Delay(300, TestContext.Current.CancellationToken);
using (var client = new TcpClient())
{
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
for (int i = 0; i < 4; i++)
_ = master.ReadHoldingRegisters(1, 0, 1);
}
using var httpClient = new HttpClient();
var resp = await httpClient.GetStringAsync(
$"http://127.0.0.1:{adminPort}/status.json",
TestContext.Current.CancellationToken);
using var doc = JsonDocument.Parse(resp);
var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend");
backend.GetProperty("coalescedHitCount").GetInt64()
.ShouldBe(0, "coalescing disabled — no hits possible");
backend.GetProperty("coalescedMissCount").GetInt64()
.ShouldBe(4, "every FC03 read still counts as a Miss when coalescing is disabled");
}
}