Files
Joseph Doherty 56eee3c563 mbproxy: initial commit through Phase 9 (TxId multiplexing)
Adds the mbproxy service end-to-end. Phases 00-08 implement the
production-ready single-listener / 1:1-backend transparent Modbus TCP
proxy with bidirectional BCD rewriting for the ~54-PLC DL205/DL260
fleet. Phase 9 replaces the connection layer with a single backend
socket per PLC plus MBAP TxId rewriting, lifting the H2-ECOM100's
4-concurrent-client cap as an operational ceiling.

Phase 9 additions of note:
- PlcMultiplexer + UpstreamPipe + TxIdAllocator + CorrelationMap
- InFlightRequest with IReadOnlyList<InterestedParty> (load-bearing
  for Phase 10 read coalescing — do not collapse to a single field)
- Per-request watchdog: surfaces Modbus exception 0x0B to upstream
  on BackendRequestTimeoutMs, defending against lost responses,
  dead-PLC paths, and pymodbus 3.13.0's concurrent-multiplexed-
  request bug (its ServerRequestHandler.last_pdu state race)
- Status DTO + HTML gain inFlight / maxInFlight / txIdWraps /
  disconnectCascades / queueDepth (Tier 1.6 in docs/kpi.md)

Tests: 263 unit + 38 E2E. Multiplexer correctness under truly
concurrent backend traffic is proved against a stub backend in
PlcMultiplexerTests; MultiplexerE2ETests paces requests so pymodbus
3.13's single-PDU framer stays in known-good mode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 01:49:35 -04:00

212 lines
8.0 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Net;
using System.Net.Sockets;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Polly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Supervision;
/// <summary>
/// End-to-end supervisor tests that run the proxy against the DL205 simulator.
/// These tests verify supervisor-level behaviour (recovery, counters) with a real
/// Modbus backend rather than a bare socket.
/// </summary>
[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))]
[Trait("Category", "E2E")]
public sealed class SupervisorE2ETests
{
private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim;
public SupervisorE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim)
{
_sim = sim;
}
// ── Helpers ───────────────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private PlcListenerSupervisor BuildSimSupervisor(
int listenPort,
RecoveryProfile? recoveryProfile = null)
{
var profile = recoveryProfile ?? new RecoveryProfile
{
InitialBackoffMs = [200, 200],
SteadyStateMs = 200,
};
ILoggerFactory loggerFactory = NullLoggerFactory.Instance;
var plcOpts = new PlcOptions
{
Name = "SimPLC",
ListenPort = listenPort,
Host = _sim.Host,
Port = _sim.Port,
};
var connOpts = new ConnectionOptions
{
BackendConnectTimeoutMs = 3000,
BackendRequestTimeoutMs = 3000,
};
var recoveryPipeline = PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance);
var backendPipeline = PolicyFactory.BuildBackendConnect(
new RetryProfile { MaxAttempts = 2, BackoffMs = [100, 500] },
NullLogger.Instance);
return new PlcListenerSupervisor(
plc: plcOpts,
connectionOptions: connOpts,
pipeline: new NoopPduPipeline(),
listenerLogger: loggerFactory.CreateLogger<PlcListener>(),
multiplexerLogger: loggerFactory.CreateLogger<Mbproxy.Proxy.Multiplexing.PlcMultiplexer>(),
pipeLogger: loggerFactory.CreateLogger("Mbproxy.Proxy.UpstreamPipe.Test"),
perPlcContext: null,
recoveryPipeline: recoveryPipeline,
logger: loggerFactory.CreateLogger<PlcListenerSupervisor>(),
backendConnectPipeline: backendPipeline);
}
// ── E2E 1: Recovery when blocking listener releases port ──────────────────────────────
[Fact(Timeout = 5_000)]
public async Task E2E_Recovery_When_BlockingListenerReleasesPort()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
int listenPort = PickFreePort();
// Block the port before starting the supervisor.
var blocker = new TcpListener(IPAddress.Any, listenPort);
blocker.Start();
await using var supervisor = BuildSimSupervisor(listenPort);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await supervisor.StartAsync(cts.Token);
// Wait for first bind attempt to fail.
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State);
// Release the port.
blocker.Stop();
// Poll for up to 3 s for the supervisor to bind.
using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (!recoveryCts.IsCancellationRequested)
{
if (supervisor.Snapshot().State == SupervisorState.Bound)
break;
await Task.Delay(50, TestContext.Current.CancellationToken);
}
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
// Verify the proxy actually serves traffic by connecting to it.
using var client = new TcpClient();
await client.ConnectAsync("127.0.0.1", listenPort, cts.Token);
// Send a minimal FC03 request (read 1 register at address 0).
var req = new byte[]
{
0x00, 0x01, // TxId
0x00, 0x00, // ProtocolId
0x00, 0x06, // Length (6)
0x01, // UnitId
0x03, // FC03
0x00, 0x00, // Start address 0
0x00, 0x01, // Qty 1
};
await client.GetStream().WriteAsync(req, cts.Token);
// Read at least 9 bytes (7 header + 2 data minimum for FC03 with 1 register).
var rsp = new byte[260];
int read = 0;
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (read < 9 && !readCts.IsCancellationRequested)
read += await client.GetStream().ReadAsync(rsp.AsMemory(read), readCts.Token);
// Verify we got a response with matching TxId.
Assert.True(read >= 9, $"Expected ≥ 9 bytes, got {read}");
Assert.Equal(0x00, rsp[0]); // TxId high
Assert.Equal(0x01, rsp[1]); // TxId low
await supervisor.StopAsync(cts.Token);
}
// ── E2E 2: RecoveryAttempts counter increments and is visible on Snapshot ─────────────
[Fact(Timeout = 5_000)]
public async Task E2E_RecoveryAttempts_CounterIncrements_Visible_OnSnapshot()
{
if (_sim.SkipReason is not null)
Assert.Skip(_sim.SkipReason);
int listenPort = PickFreePort();
// Block the port so the supervisor enters recovery.
var blocker = new TcpListener(IPAddress.Any, listenPort);
blocker.Start();
// Use short delays to get multiple recovery attempts quickly.
var profile = new RecoveryProfile
{
InitialBackoffMs = [100, 100, 100],
SteadyStateMs = 100,
};
await using var supervisor = BuildSimSupervisor(listenPort, profile);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await supervisor.StartAsync(cts.Token);
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
// Wait for multiple recovery attempts to accumulate.
await Task.Delay(600, TestContext.Current.CancellationToken); // ~6 × 100 ms attempts
var snap = supervisor.Snapshot();
Assert.Equal(SupervisorState.Recovering, snap.State);
Assert.True(snap.RecoveryAttempts >= 2,
$"Expected ≥ 2 recovery attempts after 600ms with 100ms backoff; got {snap.RecoveryAttempts}");
Assert.NotNull(snap.LastBindError);
// Release the port and verify recovery.
blocker.Stop();
using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (!recoveryCts.IsCancellationRequested)
{
if (supervisor.Snapshot().State == SupervisorState.Bound)
break;
await Task.Delay(50, TestContext.Current.CancellationToken);
}
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
// RecoveryAttempts must still be the accumulated value (not reset to 0).
var afterSnap = supervisor.Snapshot();
Assert.True(afterSnap.RecoveryAttempts >= snap.RecoveryAttempts,
$"RecoveryAttempts should accumulate; was {snap.RecoveryAttempts}, now {afterSnap.RecoveryAttempts}");
// LastBindError should be cleared after a successful bind.
Assert.Null(afterSnap.LastBindError);
await supervisor.StopAsync(cts.Token);
}
}