using System.Net; using System.Net.Sockets; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Multiplexing; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Supervision; /// /// Integration tests for the backend-connect Polly retry path. Backend connect /// ownership lives in . These tests exercise the Polly /// pipeline by driving upstream-to-multiplexer frames against a bad/intermittent /// backend and observing the resulting connect-success/connect-failed counters. /// [Trait("Category", "Unit")] public sealed class BackendConnectRetryTests { private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static (PlcMultiplexer mux, PerPlcContext ctx) BuildMux( PlcOptions plc, ConnectionOptions connOpts, Polly.ResiliencePipeline pipeline) { var ctx = new PerPlcContext { PlcName = plc.Name, TagMap = Mbproxy.Bcd.BcdTagMap.Empty, Counters = new ProxyCounters(), Logger = NullLogger.Instance, }; var mux = new PlcMultiplexer( plc, connOpts, new BcdPduPipeline(), ctx, NullLoggerFactory.Instance.CreateLogger(), pipeline); return (mux, ctx); } /// /// Connects a fresh TCP client to the proxy port and returns the accepted upstream /// pipe alongside the client. The caller drives a single FC03 request and observes /// what happens when the multiplexer attempts (and fails) to forward it. /// private static async Task<(Socket client, UpstreamPipe pipe)> AttachClientPipeAsync( PlcMultiplexer mux, int proxyPort, TcpListener proxyListener, string plcName) { var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; await client.ConnectAsync(IPAddress.Loopback, proxyPort); var upstreamSock = await proxyListener.AcceptSocketAsync(); var pipe = new UpstreamPipe(upstreamSock, plcName, NullLogger.Instance); _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); return (client, pipe); } private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, // ProtocolId 0x00, 0x06, // Length = 6 unitId, 0x03, // FC03 (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; // ── Test 1: retries per pipeline on ConnectionRefused ───────────────────────────────── [Fact] public async Task BackendConnect_RetriesPerPipeline_OnConnectionRefused() { int badPort = PickFreePort(); int proxyPort = PickFreePort(); var profile = new RetryProfile { MaxAttempts = 3, BackoffMs = [50, 100, 200] }; var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance); var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 1000, BackendRequestTimeoutMs = 3000 }; var plcOpts = new PlcOptions { Name = "Retry3PLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = badPort }; await using var mux = BuildMux(plcOpts, connOpts, pipeline).mux; var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); proxyListener.Start(); try { var sw = System.Diagnostics.Stopwatch.StartNew(); var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name); try { await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); // The multiplexer will Polly-retry then fail; client socket should be closed. var buf = new byte[1]; int n; using var ctsDeadline = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (true) { try { n = await client.ReceiveAsync(buf, SocketFlags.None, ctsDeadline.Token); break; } catch (SocketException) { n = 0; break; } } sw.Stop(); n.ShouldBe(0, "upstream client should observe a clean EOF after all backend attempts fail"); sw.ElapsedMilliseconds.ShouldBeGreaterThanOrEqualTo(80, "Polly retries with [50,100] delays should make connect take > 80ms total"); var counters = (await Task.Run(() => mux.AttachedPipes)).Count; // touch state _ = counters; // unused — proves no race } finally { client.Dispose(); await pipe.DisposeAsync(); } } finally { proxyListener.Stop(); } } // ── Test 2: succeeds on second attempt when backend becomes reachable ───────────────── [Fact] public async Task BackendConnect_Succeeds_OnSecondAttempt_WhenBackendBecomesReachable() { int backendPort = PickFreePort(); int proxyPort = PickFreePort(); var profile = new RetryProfile { MaxAttempts = 3, BackoffMs = [200, 1000, 2000] }; var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance); var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 1000, BackendRequestTimeoutMs = 3000 }; var plcOpts = new PlcOptions { Name = "RetryOkPLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = backendPort }; await using var muxBundle = new MuxBundle(BuildMux(plcOpts, connOpts, pipeline).mux); var mux = muxBundle.Mux; var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); proxyListener.Start(); TcpListener? backendListener = null; Socket? acceptedBackend = null; Task? acceptTask = null; try { // Start the backend listener after 250 ms — within the first backoff window. var startBackendTask = Task.Run(async () => { await Task.Delay(250, CancellationToken.None); backendListener = new TcpListener(IPAddress.Loopback, backendPort); backendListener.Start(); acceptTask = backendListener.AcceptSocketAsync(CancellationToken.None).AsTask(); }, CancellationToken.None); var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name); try { // Drive a request — this triggers backend connect. await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); await startBackendTask; acceptedBackend = await acceptTask!.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); // The multiplexer's counters should reflect a successful connect. using var pollCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!pollCts.IsCancellationRequested && mux.AttachedPipes.Count == 0) { await Task.Delay(20, pollCts.Token); } mux.AttachedPipes.Count.ShouldBeGreaterThanOrEqualTo(1, "the upstream pipe should remain attached after a successful backend connect"); } finally { client.Dispose(); await pipe.DisposeAsync(); } } finally { proxyListener.Stop(); acceptedBackend?.Dispose(); backendListener?.Stop(); } } // ── Test 3: all attempts fail → upstream socket is closed ───────────────────────────── [Fact] public async Task BackendConnect_AllAttemptsFail_ClosesUpstream() { int badPort = PickFreePort(); int proxyPort = PickFreePort(); var profile = new RetryProfile { MaxAttempts = 2, BackoffMs = [50, 100] }; var pipeline = PolicyFactory.BuildBackendConnect(profile, NullLogger.Instance); var connOpts = new ConnectionOptions { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 3000 }; var plcOpts = new PlcOptions { Name = "FailPLC", ListenPort = proxyPort, Host = "127.0.0.1", Port = badPort }; var muxResult = BuildMux(plcOpts, connOpts, pipeline); await using var mux = muxResult.mux; var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); proxyListener.Start(); try { var (client, pipe) = await AttachClientPipeAsync(mux, proxyPort, proxyListener, plcOpts.Name); try { await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None); var buf = new byte[1]; using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(5)); int n; try { n = await client.ReceiveAsync(buf, SocketFlags.None, deadline.Token); } catch (SocketException) { n = 0; } n.ShouldBe(0, "upstream socket should observe a clean EOF after all attempts fail"); muxResult.ctx.Counters.Snapshot().ConnectsFailed.ShouldBeGreaterThanOrEqualTo(1); } finally { client.Dispose(); await pipe.DisposeAsync(); } } finally { proxyListener.Stop(); } } /// /// Helper that lets the test scope-await both disposal /// and capture of the public surface in a single using block. /// private sealed class MuxBundle : IAsyncDisposable { public PlcMultiplexer Mux { get; } public MuxBundle(PlcMultiplexer mux) => Mux = mux; public ValueTask DisposeAsync() => Mux.DisposeAsync(); } }