Files
Joseph Doherty 56eee3c563 mbproxy: initial commit through Phase 9 (TxId multiplexing)
Adds the mbproxy service end-to-end. Phases 00-08 implement the
production-ready single-listener / 1:1-backend transparent Modbus TCP
proxy with bidirectional BCD rewriting for the ~54-PLC DL205/DL260
fleet. Phase 9 replaces the connection layer with a single backend
socket per PLC plus MBAP TxId rewriting, lifting the H2-ECOM100's
4-concurrent-client cap as an operational ceiling.

Phase 9 additions of note:
- PlcMultiplexer + UpstreamPipe + TxIdAllocator + CorrelationMap
- InFlightRequest with IReadOnlyList<InterestedParty> (load-bearing
  for Phase 10 read coalescing — do not collapse to a single field)
- Per-request watchdog: surfaces Modbus exception 0x0B to upstream
  on BackendRequestTimeoutMs, defending against lost responses,
  dead-PLC paths, and pymodbus 3.13.0's concurrent-multiplexed-
  request bug (its ServerRequestHandler.last_pdu state race)
- Status DTO + HTML gain inFlight / maxInFlight / txIdWraps /
  disconnectCascades / queueDepth (Tier 1.6 in docs/kpi.md)

Tests: 263 unit + 38 E2E. Multiplexer correctness under truly
concurrent backend traffic is proved against a stub backend in
PlcMultiplexerTests; MultiplexerE2ETests paces requests so pymodbus
3.13's single-PDU framer stays in known-good mode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 01:49:35 -04:00

478 lines
21 KiB
C#

using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using Mbproxy;
using Mbproxy.Proxy;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NModbus;
using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy;
/// <summary>
/// End-to-end tests for the BCD rewriter pipeline against the pymodbus DL205 simulator.
///
/// Each test starts an in-process proxy host configured to point at the simulator,
/// connects an NModbus client to the proxy's listen port, and asserts bidirectional
/// BCD rewriting behaviour.
///
/// All tests skip gracefully when the simulator is unavailable (Python / pymodbus missing).
/// </summary>
[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))]
[Trait("Category", "E2E")]
public sealed class RewriterE2ETests
{
private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim;
public RewriterE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim)
{
_sim = sim;
}
// ── 1. FC03 HR1072 with BCD configured → decoded 1234 ────────────────────
/// <summary>
/// Configure a 16-bit BCD tag at address 1072 (seeded 0x1234 in the simulator).
/// The proxy should decode the BCD nibbles and return binary 1234 to the client.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Read_HR1072_AsBcd_ReturnsDecoded_1234()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd16Addresses: [1072]);
await using var _ = new AsyncHostDispose(host, cts);
using var client = new TcpClient();
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
ushort[] regs = master.ReadHoldingRegisters(slaveAddress: 1, startAddress: 1072, numberOfPoints: 1);
// Simulator stores 0x1234 = raw BCD. Proxy should decode → 1234 decimal.
regs[0].ShouldBe((ushort)1234);
}
// ── 2. FC03 HR1072 without BCD configured → raw 0x1234 ───────────────────
/// <summary>
/// Same address, no BCD tags configured. The proxy passes the raw BCD nibbles through.
/// Verifies the rewriter is opt-in per tag.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Read_HR1072_AsRaw_WhenNotConfigured_Returns_0x1234()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
// Empty BCD tag list — no rewriting.
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd16Addresses: []);
await using var _ = new AsyncHostDispose(host, cts);
using var client = new TcpClient();
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
ushort[] regs = master.ReadHoldingRegisters(slaveAddress: 1, startAddress: 1072, numberOfPoints: 1);
// Raw BCD nibbles pass through unchanged.
regs[0].ShouldBe((ushort)0x1234);
}
// ── 3. FC06 write BCD → simulator stores encoded nibbles ────────────────
/// <summary>
/// Configure a 16-bit BCD tag at address 200 (in the simulator's writable scratch range).
/// Write decimal 9876 through the proxy; read back raw from the simulator and expect 0x9876.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Write_HR200_AsBcd_StoresEncoded_0x9876()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd16Addresses: [200]);
await using var _ = new AsyncHostDispose(host, cts);
// Write through the proxy (client side: binary 9876).
using var proxyClient = new TcpClient();
await proxyClient.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var proxyMaster = new ModbusFactory().CreateMaster(proxyClient);
proxyMaster.WriteSingleRegister(slaveAddress: 1, registerAddress: 200, value: 9876);
// Read raw from the simulator directly (bypassing the proxy).
using var simClient = new TcpClient();
await simClient.ConnectAsync(_sim.Host, _sim.Port, TestContext.Current.CancellationToken);
var simMaster = new ModbusFactory().CreateMaster(simClient);
ushort[] raw = simMaster.ReadHoldingRegisters(slaveAddress: 1, startAddress: 200, numberOfPoints: 1);
// Simulator should store BCD-encoded 9876 = 0x9876.
raw[0].ShouldBe((ushort)0x9876);
}
// ── 4. FC03 read 32-bit BCD pair at HR1072/HR1073 (CDAB) ────────────────
/// <summary>
/// Reads a 32-bit BCD pair at address 1072/1073 (CDAB layout).
/// Simulator seeds: 1072=0x1234 (low word), 1073=0x0000 (high word).
/// Decoded = 0*10000 + 1234 = 1234.
/// This verifies the CDAB word order is handled end-to-end.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Read_HR1072_HR1073_AsBcd32_ReturnsDecoded_From_CDAB()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd32Addresses: [1072]);
await using var _ = new AsyncHostDispose(host, cts);
using var client = new TcpClient();
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
// Read both registers of the 32-bit pair.
ushort[] regs = master.ReadHoldingRegisters(slaveAddress: 1, startAddress: 1072, numberOfPoints: 2);
// After decoding: low 4 digits = 1234, high 4 digits = 0
// The proxy returns decoded binary values in CDAB order:
// regs[0] = low 4 decoded digits = 1234
// regs[1] = high 4 decoded digits = 0
regs[0].ShouldBe((ushort)1234); // decoded low 4 digits
regs[1].ShouldBe((ushort)0); // decoded high 4 digits
}
// ── 5. Partial FC03 on high register of 32-bit pair → raw + warning ──────
/// <summary>
/// Read only the high register (1073) of a 32-bit BCD pair at 1072/1073.
/// The proxy cannot decode a partial pair — it should pass through raw and log
/// mbproxy.rewrite.partial_bcd.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Partial_FC03_OnHighRegisterOf_32BitPair_PassesThroughRaw_AndLogsWarning()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var sink = new CapturingSink();
var serilog = new LoggerConfiguration()
.MinimumLevel.Warning()
.WriteTo.Sink(sink)
.CreateLogger();
var (proxyPort, host, cts) = await StartBcdProxyAsync(
bcd32Addresses: [1072],
serilogOverride: serilog);
await using var _ = new AsyncHostDispose(host, cts);
using var client = new TcpClient();
await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var master = new ModbusFactory().CreateMaster(client);
// Read only the high register (1073) — partial overlap for the 32-bit pair.
ushort[] regs = master.ReadHoldingRegisters(slaveAddress: 1, startAddress: 1073, numberOfPoints: 1);
// The raw simulator value for HR1073 is 0x0000 (high word of the 32-bit pair).
regs[0].ShouldBe((ushort)0x0000); // raw passthrough
// The partial_bcd warning should have been logged.
var partialEvents = sink.Events
.Where(e => e.MessageTemplate.Text.Contains("mbproxy.rewrite.partial_bcd")
|| e.MessageTemplate.Text.Contains("Partial BCD overlap"))
.ToList();
partialEvents.ShouldNotBeEmpty("Expected mbproxy.rewrite.partial_bcd warning to be logged");
}
// ── 6. MBAP TxId preserved after rewriting (20 consecutive) ─────────────
/// <summary>
/// Issues 20 consecutive FC03 reads with manually-incremented TxIds through a proxy
/// that has BCD rewriting active (tag at 1072). Verifies the MBAP header is never
/// tampered with by the rewriter.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task MbapTxId_StillPreserved_AfterRewriting_20Consecutive()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd16Addresses: [1072]);
await using var _ = new AsyncHostDispose(host, cts);
using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
await socket.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
const int count = 20;
byte[] reqBuf = new byte[12]; // FC03 request frame
byte[] rspBuf = new byte[260];
for (ushort txId = 1; txId <= count; txId++)
{
// Build FC03 request: read 1 register at address 1072.
reqBuf[0] = (byte)(txId >> 8);
reqBuf[1] = (byte)(txId & 0xFF);
reqBuf[2] = 0x00;
reqBuf[3] = 0x00;
reqBuf[4] = 0x00;
reqBuf[5] = 0x06; // Length
reqBuf[6] = 0x01; // UnitId
reqBuf[7] = 0x03; // FC03
reqBuf[8] = 0x04; // Start addr high (1072 = 0x0430)
reqBuf[9] = 0x30; // Start addr low
reqBuf[10] = 0x00;
reqBuf[11] = 0x01; // Qty = 1
await socket.SendAsync(reqBuf.AsMemory(), SocketFlags.None, TestContext.Current.CancellationToken);
// Read 7-byte response header.
int read = 0;
while (read < 7)
read += await socket.ReceiveAsync(rspBuf.AsMemory(read, 7 - read), SocketFlags.None,
TestContext.Current.CancellationToken);
ushort rspTxId = (ushort)((rspBuf[0] << 8) | rspBuf[1]);
ushort rspLength = (ushort)((rspBuf[4] << 8) | rspBuf[5]);
rspTxId.ShouldBe(txId, $"TxId mismatch on iteration {txId}");
// Drain the body.
int bodyLen = rspLength - 1;
if (bodyLen > 0)
{
int bodyRead = 0;
while (bodyRead < bodyLen)
bodyRead += await socket.ReceiveAsync(rspBuf.AsMemory(7 + bodyRead, bodyLen - bodyRead),
SocketFlags.None, TestContext.Current.CancellationToken);
}
}
}
// ── 7. FC16 with 16-bit BCD in middle of write range ────────────────────
/// <summary>
/// FC16 (Write Multiple Registers) covering a 3-register span where only the middle
/// register is a configured BCD tag. The proxy must encode the middle slot and leave
/// the flanks untouched. Verifies per-register selectivity within a multi-register write.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Write_FC16_With_Bcd16_InRange_StoresEncoded_AtOnlyTheBcdSlot()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
// Configure a 16-bit BCD tag at the middle register of a 3-register write.
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd16Addresses: [205]);
await using var _ = new AsyncHostDispose(host, cts);
// FC16 write to HR204..HR206 with binary values [10, 9876, 20].
using var proxyClient = new TcpClient();
await proxyClient.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var proxyMaster = new ModbusFactory().CreateMaster(proxyClient);
proxyMaster.WriteMultipleRegisters(slaveAddress: 1, startAddress: 204,
data: new ushort[] { 10, 9876, 20 });
// Read raw from the simulator directly.
using var simClient = new TcpClient();
await simClient.ConnectAsync(_sim.Host, _sim.Port, TestContext.Current.CancellationToken);
var simMaster = new ModbusFactory().CreateMaster(simClient);
ushort[] raw = simMaster.ReadHoldingRegisters(slaveAddress: 1, startAddress: 204, numberOfPoints: 3);
raw[0].ShouldBe((ushort)10, "HR204 is not a BCD tag — must pass through unchanged");
raw[1].ShouldBe((ushort)0x9876, "HR205 is a 16-bit BCD tag — must be re-encoded to nibbles");
raw[2].ShouldBe((ushort)20, "HR206 is not a BCD tag — must pass through unchanged");
}
// ── 8. FC16 with 32-bit BCD pair → both halves CDAB-encoded ─────────────
/// <summary>
/// FC16 covering both halves of a configured 32-bit BCD pair. The pipeline reconstructs
/// the binary integer from the CDAB-ordered registers (binaryValue = high * 10000 + low),
/// encodes it as a BCD pair, and writes back in CDAB order.
///
/// Example: client writes [low=5678, high=1234] → binaryValue = 12345678
/// → Encode32(12345678) = (bcdLow=0x5678, bcdHigh=0x1234)
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Write_FC16_With_Bcd32Pair_StoresCdabEncoded()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
// Configure a 32-bit BCD tag spanning HR207 + HR208 (both in [200, 209] scratch range).
var (proxyPort, host, cts) = await StartBcdProxyAsync(bcd32Addresses: [207]);
await using var _ = new AsyncHostDispose(host, cts);
// FC16 write of [low=5678, high=1234] → decimal 12345678.
using var proxyClient = new TcpClient();
await proxyClient.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var proxyMaster = new ModbusFactory().CreateMaster(proxyClient);
proxyMaster.WriteMultipleRegisters(slaveAddress: 1, startAddress: 207,
data: new ushort[] { 5678, 1234 });
using var simClient = new TcpClient();
await simClient.ConnectAsync(_sim.Host, _sim.Port, TestContext.Current.CancellationToken);
var simMaster = new ModbusFactory().CreateMaster(simClient);
ushort[] raw = simMaster.ReadHoldingRegisters(slaveAddress: 1, startAddress: 207, numberOfPoints: 2);
raw[0].ShouldBe((ushort)0x5678, "HR207 (low word of CDAB pair) must hold low 4 BCD digits");
raw[1].ShouldBe((ushort)0x1234, "HR208 (high word of CDAB pair) must hold high 4 BCD digits");
}
// ── 9. FC16 partial overlap on 32-bit pair → raw + warning ──────────────
/// <summary>
/// FC16 writes only the LOW register of a configured 32-bit BCD pair (qty=1 at the low
/// address). The pipeline cannot safely encode half of a 32-bit value, so it passes the
/// register through raw and logs mbproxy.rewrite.partial_bcd.
/// </summary>
[Fact(Timeout = 5_000)]
public async Task Write_FC16_PartialBcd32_OnLowAddressOnly_PassesThroughRaw_AndLogsWarning()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
var sink = new CapturingSink();
var serilog = new LoggerConfiguration()
.MinimumLevel.Warning()
.WriteTo.Sink(sink)
.CreateLogger();
// Configure a 32-bit BCD tag at HR207 + HR208 (pair).
var (proxyPort, host, cts) = await StartBcdProxyAsync(
bcd32Addresses: [207],
serilogOverride: serilog);
await using var _ = new AsyncHostDispose(host, cts);
// FC16 write of [42] to HR207 only — partial overlap on the 32-bit pair.
using var proxyClient = new TcpClient();
await proxyClient.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken);
var proxyMaster = new ModbusFactory().CreateMaster(proxyClient);
proxyMaster.WriteMultipleRegisters(slaveAddress: 1, startAddress: 207,
data: new ushort[] { 42 });
// Simulator should hold the raw value 42 (no rewriting on partial overlap).
using var simClient = new TcpClient();
await simClient.ConnectAsync(_sim.Host, _sim.Port, TestContext.Current.CancellationToken);
var simMaster = new ModbusFactory().CreateMaster(simClient);
ushort[] raw = simMaster.ReadHoldingRegisters(slaveAddress: 1, startAddress: 207, numberOfPoints: 1);
raw[0].ShouldBe((ushort)42, "Partial-overlap write must pass through raw (not BCD-encoded)");
// The partial_bcd warning must have been logged.
var partialEvents = sink.Events
.Where(e => e.MessageTemplate.Text.Contains("mbproxy.rewrite.partial_bcd")
|| e.MessageTemplate.Text.Contains("Partial BCD overlap"))
.ToList();
partialEvents.ShouldNotBeEmpty("Expected mbproxy.rewrite.partial_bcd warning on partial FC16 write");
}
// ── Helpers ──────────────────────────────────────────────────────────────
private async Task<(int proxyPort, IHost host, CancellationTokenSource cts)> StartBcdProxyAsync(
ushort[]? bcd16Addresses = null,
ushort[]? bcd32Addresses = null,
Serilog.ILogger? serilogOverride = null)
{
int proxyPort = PickFreePort();
var config = new Dictionary<string, string?>
{
["Mbproxy:AdminPort"] = "8080",
["Mbproxy:Plcs:0:Name"] = "TestPLC",
["Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(),
["Mbproxy:Plcs:0:Host"] = _sim.Host,
["Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(),
["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000",
["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000",
};
// Add BCD tag entries to the in-memory config.
int tagIndex = 0;
foreach (ushort addr in bcd16Addresses ?? [])
{
config[$"Mbproxy:BcdTags:Global:{tagIndex}:Address"] = addr.ToString();
config[$"Mbproxy:BcdTags:Global:{tagIndex}:Width"] = "16";
tagIndex++;
}
foreach (ushort addr in bcd32Addresses ?? [])
{
config[$"Mbproxy:BcdTags:Global:{tagIndex}:Address"] = addr.ToString();
config[$"Mbproxy:BcdTags:Global:{tagIndex}:Width"] = "32";
tagIndex++;
}
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var host = BuildBcdProxyHost(config, serilogOverride);
await host.StartAsync(startCts.Token);
await Task.Delay(150, TestContext.Current.CancellationToken);
var runCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
return (proxyPort, host, runCts);
}
private static IHost BuildBcdProxyHost(
Dictionary<string, string?> config,
Serilog.ILogger? serilogOverride = null)
{
var builder = Host.CreateApplicationBuilder();
builder.Configuration.AddInMemoryCollection(config);
var logger = serilogOverride
?? new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger();
builder.Services.AddSerilog(logger, dispose: false);
builder.AddMbproxyOptions();
// Use the real BcdPduPipeline (not NoopPduPipeline) for E2E rewriter tests.
builder.Services.AddSingleton<IPduPipeline, BcdPduPipeline>();
builder.Services.AddHostedService<ProxyWorker>();
return builder.Build();
}
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private sealed class AsyncHostDispose : IAsyncDisposable
{
private readonly IHost _host;
private readonly CancellationTokenSource _cts;
public AsyncHostDispose(IHost host, CancellationTokenSource cts)
{
_host = host;
_cts = cts;
}
public async ValueTask DisposeAsync()
{
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try { await _host.StopAsync(stopCts.Token); } catch { /* best effort */ }
_host.Dispose();
_cts.Dispose();
}
}
// ── Capturing log sink (shared with HostSmokeTests) ─────────────────────
private sealed class CapturingSink : ILogEventSink
{
private readonly ConcurrentQueue<LogEvent> _events = new();
public IEnumerable<LogEvent> Events => _events;
public void Emit(LogEvent logEvent) => _events.Enqueue(logEvent);
}
}