using System.Net;
using System.Net.Sockets;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Multiplexing;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Supervision;
///
/// Integration tests for the backend-connect Polly retry path. Backend connect
/// ownership lives in . These tests exercise the Polly
/// pipeline by driving upstream-to-multiplexer frames against a bad/intermittent
/// backend and observing the resulting connect-success/connect-failed counters.
///
[Trait("Category", "Unit")]
public sealed class BackendConnectRetryTests
{
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static (PlcMultiplexer mux, PerPlcContext ctx) BuildMux(
PlcOptions plc,
ConnectionOptions connOpts,
Polly.ResiliencePipeline pipeline)
{
var ctx = new PerPlcContext
{
PlcName = plc.Name,
TagMap = Mbproxy.Bcd.BcdTagMap.Empty,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
};
var mux = new PlcMultiplexer(
plc,
connOpts,
new BcdPduPipeline(),
ctx,
NullLoggerFactory.Instance.CreateLogger(),
pipeline);
return (mux, ctx);
}
///
/// Connects a fresh TCP client to the proxy port and returns the accepted upstream
/// pipe alongside the client. The caller drives a single FC03 request and observes
/// what happens when the multiplexer attempts (and fails) to forward it.
///
private static async Task<(Socket client, UpstreamPipe pipe)> AttachClientPipeAsync(
PlcMultiplexer mux, int proxyPort, TcpListener proxyListener, string plcName)
{
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
var upstreamSock = await proxyListener.AcceptSocketAsync();
var pipe = new UpstreamPipe(upstreamSock, plcName, NullLogger.Instance);
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
return (client, pipe);
}
private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1)
=>
[
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00, // ProtocolId
0x00, 0x06, // Length = 6
unitId,
0x03, // FC03
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
// ── Test 1: retries per pipeline on ConnectionRefused ─────────────────────────────────
[Fact]
public async Task BackendConnect_RetriesPerPipeline_OnConnectionRefused()
{
int badPort = PickFreePort();
int proxyPort = PickFreePort();
var profile = new RetryProfile { MaxAttempts = 3, BackoffMs = [50, 100, 200] };
var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance);
var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 1000, BackendRequestTimeoutMs = 3000 };
var plcOpts = new PlcOptions { Name = "Retry3PLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = badPort };
await using var mux = BuildMux(plcOpts, connOpts, pipeline).mux;
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
try
{
var sw = System.Diagnostics.Stopwatch.StartNew();
var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name);
try
{
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
// The multiplexer will Polly-retry then fail; client socket should be closed.
var buf = new byte[1];
int n;
using var ctsDeadline = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (true)
{
try
{
n = await client.ReceiveAsync(buf, SocketFlags.None, ctsDeadline.Token);
break;
}
catch (SocketException) { n = 0; break; }
}
sw.Stop();
n.ShouldBe(0, "upstream client should observe a clean EOF after all backend attempts fail");
sw.ElapsedMilliseconds.ShouldBeGreaterThanOrEqualTo(80,
"Polly retries with [50,100] delays should make connect take > 80ms total");
var counters = (await Task.Run(() => mux.AttachedPipes)).Count; // touch state
_ = counters; // unused — proves no race
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
}
}
finally
{
proxyListener.Stop();
}
}
// ── Test 2: succeeds on second attempt when backend becomes reachable ─────────────────
[Fact]
public async Task BackendConnect_Succeeds_OnSecondAttempt_WhenBackendBecomesReachable()
{
int backendPort = PickFreePort();
int proxyPort = PickFreePort();
var profile = new RetryProfile { MaxAttempts = 3, BackoffMs = [200, 1000, 2000] };
var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance);
var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 1000, BackendRequestTimeoutMs = 3000 };
var plcOpts = new PlcOptions { Name = "RetryOkPLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = backendPort };
await using var muxBundle = new MuxBundle(BuildMux(plcOpts, connOpts, pipeline).mux);
var mux = muxBundle.Mux;
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
TcpListener? backendListener = null;
Socket? acceptedBackend = null;
Task? acceptTask = null;
try
{
// Start the backend listener after 250 ms — within the first backoff window.
var startBackendTask = Task.Run(async () =>
{
await Task.Delay(250, CancellationToken.None);
backendListener = new TcpListener(IPAddress.Loopback, backendPort);
backendListener.Start();
acceptTask = backendListener.AcceptSocketAsync(CancellationToken.None).AsTask();
}, CancellationToken.None);
var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name);
try
{
// Drive a request — this triggers backend connect.
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
await startBackendTask;
acceptedBackend = await acceptTask!.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
// The multiplexer's counters should reflect a successful connect.
using var pollCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!pollCts.IsCancellationRequested
&& mux.AttachedPipes.Count == 0)
{
await Task.Delay(20, pollCts.Token);
}
mux.AttachedPipes.Count.ShouldBeGreaterThanOrEqualTo(1,
"the upstream pipe should remain attached after a successful backend connect");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
}
}
finally
{
proxyListener.Stop();
acceptedBackend?.Dispose();
backendListener?.Stop();
}
}
// ── Test 3: all attempts fail → upstream socket is closed ─────────────────────────────
[Fact]
public async Task BackendConnect_AllAttemptsFail_ClosesUpstream()
{
int badPort = PickFreePort();
int proxyPort = PickFreePort();
var profile = new RetryProfile { MaxAttempts = 2, BackoffMs = [50, 100] };
var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance);
var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 3000 };
var plcOpts = new PlcOptions { Name = "FailPLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = badPort };
var muxResult = BuildMux(plcOpts, connOpts, pipeline);
await using var mux = muxResult.mux;
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
try
{
var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name);
try
{
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
var buf = new byte[1];
using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(5));
int n;
try
{
n = await client.ReceiveAsync(buf, SocketFlags.None, deadline.Token);
}
catch (SocketException)
{
n = 0;
}
n.ShouldBe(0, "upstream socket should observe a clean EOF after all attempts fail");
muxResult.ctx.Counters.Snapshot().ConnectsFailed.ShouldBeGreaterThanOrEqualTo(1);
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
}
}
finally
{
proxyListener.Stop();
}
}
///
/// Helper that lets the test scope-await both disposal
/// and capture of the public surface in a single using block.
///
private sealed class MuxBundle : IAsyncDisposable
{
public PlcMultiplexer Mux { get; }
public MuxBundle(PlcMultiplexer mux) => Mux = mux;
public ValueTask DisposeAsync() => Mux.DisposeAsync();
}
}