56eee3c563
Adds the mbproxy service end-to-end. Phases 00-08 implement the production-ready single-listener / 1:1-backend transparent Modbus TCP proxy with bidirectional BCD rewriting for the ~54-PLC DL205/DL260 fleet. Phase 9 replaces the connection layer with a single backend socket per PLC plus MBAP TxId rewriting, lifting the H2-ECOM100's 4-concurrent-client cap as an operational ceiling. Phase 9 additions of note: - PlcMultiplexer + UpstreamPipe + TxIdAllocator + CorrelationMap - InFlightRequest with IReadOnlyList<InterestedParty> (load-bearing for Phase 10 read coalescing — do not collapse to a single field) - Per-request watchdog: surfaces Modbus exception 0x0B to upstream on BackendRequestTimeoutMs, defending against lost responses, dead-PLC paths, and pymodbus 3.13.0's concurrent-multiplexed- request bug (its ServerRequestHandler.last_pdu state race) - Status DTO + HTML gain inFlight / maxInFlight / txIdWraps / disconnectCascades / queueDepth (Tier 1.6 in docs/kpi.md) Tests: 263 unit + 38 E2E. Multiplexer correctness under truly concurrent backend traffic is proved against a stub backend in PlcMultiplexerTests; MultiplexerE2ETests paces requests so pymodbus 3.13's single-PDU framer stays in known-good mode. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
212 lines
8.0 KiB
C#
212 lines
8.0 KiB
C#
using System.Net;
|
||
using System.Net.Sockets;
|
||
using Mbproxy.Options;
|
||
using Mbproxy.Proxy;
|
||
using Mbproxy.Proxy.Supervision;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Logging.Abstractions;
|
||
using Polly;
|
||
using Xunit;
|
||
|
||
namespace Mbproxy.Tests.Proxy.Supervision;
|
||
|
||
/// <summary>
|
||
/// End-to-end supervisor tests that run the proxy against the DL205 simulator.
|
||
/// These tests verify supervisor-level behaviour (recovery, counters) with a real
|
||
/// Modbus backend rather than a bare socket.
|
||
/// </summary>
|
||
[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))]
|
||
[Trait("Category", "E2E")]
|
||
public sealed class SupervisorE2ETests
|
||
{
|
||
private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim;
|
||
|
||
public SupervisorE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim)
|
||
{
|
||
_sim = sim;
|
||
}
|
||
|
||
// ── Helpers ───────────────────────────────────────────────────────────────────────────
|
||
|
||
private static int PickFreePort()
|
||
{
|
||
var l = new TcpListener(IPAddress.Loopback, 0);
|
||
l.Start();
|
||
int port = ((IPEndPoint)l.LocalEndpoint).Port;
|
||
l.Stop();
|
||
return port;
|
||
}
|
||
|
||
private PlcListenerSupervisor BuildSimSupervisor(
|
||
int listenPort,
|
||
RecoveryProfile? recoveryProfile = null)
|
||
{
|
||
var profile = recoveryProfile ?? new RecoveryProfile
|
||
{
|
||
InitialBackoffMs = [200, 200],
|
||
SteadyStateMs = 200,
|
||
};
|
||
|
||
ILoggerFactory loggerFactory = NullLoggerFactory.Instance;
|
||
|
||
var plcOpts = new PlcOptions
|
||
{
|
||
Name = "SimPLC",
|
||
ListenPort = listenPort,
|
||
Host = _sim.Host,
|
||
Port = _sim.Port,
|
||
};
|
||
var connOpts = new ConnectionOptions
|
||
{
|
||
BackendConnectTimeoutMs = 3000,
|
||
BackendRequestTimeoutMs = 3000,
|
||
};
|
||
|
||
var recoveryPipeline = PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance);
|
||
var backendPipeline = PolicyFactory.BuildBackendConnect(
|
||
new RetryProfile { MaxAttempts = 2, BackoffMs = [100, 500] },
|
||
NullLogger.Instance);
|
||
|
||
return new PlcListenerSupervisor(
|
||
plc: plcOpts,
|
||
connectionOptions: connOpts,
|
||
pipeline: new NoopPduPipeline(),
|
||
listenerLogger: loggerFactory.CreateLogger<PlcListener>(),
|
||
multiplexerLogger: loggerFactory.CreateLogger<Mbproxy.Proxy.Multiplexing.PlcMultiplexer>(),
|
||
pipeLogger: loggerFactory.CreateLogger("Mbproxy.Proxy.UpstreamPipe.Test"),
|
||
perPlcContext: null,
|
||
recoveryPipeline: recoveryPipeline,
|
||
logger: loggerFactory.CreateLogger<PlcListenerSupervisor>(),
|
||
backendConnectPipeline: backendPipeline);
|
||
}
|
||
|
||
// ── E2E 1: Recovery when blocking listener releases port ──────────────────────────────
|
||
|
||
[Fact(Timeout = 5_000)]
|
||
public async Task E2E_Recovery_When_BlockingListenerReleasesPort()
|
||
{
|
||
if (_sim.SkipReason is not null)
|
||
Assert.Skip(_sim.SkipReason);
|
||
|
||
int listenPort = PickFreePort();
|
||
|
||
// Block the port before starting the supervisor.
|
||
var blocker = new TcpListener(IPAddress.Any, listenPort);
|
||
blocker.Start();
|
||
|
||
await using var supervisor = BuildSimSupervisor(listenPort);
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||
|
||
await supervisor.StartAsync(cts.Token);
|
||
|
||
// Wait for first bind attempt to fail.
|
||
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
|
||
Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State);
|
||
|
||
// Release the port.
|
||
blocker.Stop();
|
||
|
||
// Poll for up to 3 s for the supervisor to bind.
|
||
using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||
while (!recoveryCts.IsCancellationRequested)
|
||
{
|
||
if (supervisor.Snapshot().State == SupervisorState.Bound)
|
||
break;
|
||
await Task.Delay(50, TestContext.Current.CancellationToken);
|
||
}
|
||
|
||
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
|
||
|
||
// Verify the proxy actually serves traffic by connecting to it.
|
||
using var client = new TcpClient();
|
||
await client.ConnectAsync("127.0.0.1", listenPort, cts.Token);
|
||
|
||
// Send a minimal FC03 request (read 1 register at address 0).
|
||
var req = new byte[]
|
||
{
|
||
0x00, 0x01, // TxId
|
||
0x00, 0x00, // ProtocolId
|
||
0x00, 0x06, // Length (6)
|
||
0x01, // UnitId
|
||
0x03, // FC03
|
||
0x00, 0x00, // Start address 0
|
||
0x00, 0x01, // Qty 1
|
||
};
|
||
await client.GetStream().WriteAsync(req, cts.Token);
|
||
|
||
// Read at least 9 bytes (7 header + 2 data minimum for FC03 with 1 register).
|
||
var rsp = new byte[260];
|
||
int read = 0;
|
||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||
while (read < 9 && !readCts.IsCancellationRequested)
|
||
read += await client.GetStream().ReadAsync(rsp.AsMemory(read), readCts.Token);
|
||
|
||
// Verify we got a response with matching TxId.
|
||
Assert.True(read >= 9, $"Expected ≥ 9 bytes, got {read}");
|
||
Assert.Equal(0x00, rsp[0]); // TxId high
|
||
Assert.Equal(0x01, rsp[1]); // TxId low
|
||
|
||
await supervisor.StopAsync(cts.Token);
|
||
}
|
||
|
||
// ── E2E 2: RecoveryAttempts counter increments and is visible on Snapshot ─────────────
|
||
|
||
[Fact(Timeout = 5_000)]
|
||
public async Task E2E_RecoveryAttempts_CounterIncrements_Visible_OnSnapshot()
|
||
{
|
||
if (_sim.SkipReason is not null)
|
||
Assert.Skip(_sim.SkipReason);
|
||
|
||
int listenPort = PickFreePort();
|
||
|
||
// Block the port so the supervisor enters recovery.
|
||
var blocker = new TcpListener(IPAddress.Any, listenPort);
|
||
blocker.Start();
|
||
|
||
// Use short delays to get multiple recovery attempts quickly.
|
||
var profile = new RecoveryProfile
|
||
{
|
||
InitialBackoffMs = [100, 100, 100],
|
||
SteadyStateMs = 100,
|
||
};
|
||
|
||
await using var supervisor = BuildSimSupervisor(listenPort, profile);
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
|
||
|
||
await supervisor.StartAsync(cts.Token);
|
||
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
|
||
|
||
// Wait for multiple recovery attempts to accumulate.
|
||
await Task.Delay(600, TestContext.Current.CancellationToken); // ~6 × 100 ms attempts
|
||
|
||
var snap = supervisor.Snapshot();
|
||
Assert.Equal(SupervisorState.Recovering, snap.State);
|
||
Assert.True(snap.RecoveryAttempts >= 2,
|
||
$"Expected ≥ 2 recovery attempts after 600ms with 100ms backoff; got {snap.RecoveryAttempts}");
|
||
Assert.NotNull(snap.LastBindError);
|
||
|
||
// Release the port and verify recovery.
|
||
blocker.Stop();
|
||
|
||
using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||
while (!recoveryCts.IsCancellationRequested)
|
||
{
|
||
if (supervisor.Snapshot().State == SupervisorState.Bound)
|
||
break;
|
||
await Task.Delay(50, TestContext.Current.CancellationToken);
|
||
}
|
||
|
||
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
|
||
|
||
// RecoveryAttempts must still be the accumulated value (not reset to 0).
|
||
var afterSnap = supervisor.Snapshot();
|
||
Assert.True(afterSnap.RecoveryAttempts >= snap.RecoveryAttempts,
|
||
$"RecoveryAttempts should accumulate; was {snap.RecoveryAttempts}, now {afterSnap.RecoveryAttempts}");
|
||
|
||
// LastBindError should be cleared after a successful bind.
|
||
Assert.Null(afterSnap.LastBindError);
|
||
|
||
await supervisor.StopAsync(cts.Token);
|
||
}
|
||
}
|