using System.Net; using System.Net.Sockets; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Polly; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Supervision; /// /// Integration tests for using real sockets. /// No simulator required — these tests drive bind/recover cycles directly. /// [Trait("Category", "Unit")] public sealed class SupervisorTests { // ── Helpers ─────────────────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static PlcOptions MakePlcOptions(int listenPort) => new() { Name = "TestPLC", ListenPort = listenPort, Host = "127.0.0.1", Port = 502, }; private static ConnectionOptions MakeConnectionOptions() => new() { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 3000, }; /// /// Builds a recovery pipeline with very short delays (suitable for tests). /// private static ResiliencePipeline FastRecoveryPipeline(int initialMs = 100, int steadyMs = 100) { var profile = new RecoveryProfile { InitialBackoffMs = [initialMs, initialMs], SteadyStateMs = steadyMs, }; return PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance); } private static PlcListenerSupervisor BuildSupervisor( int port, ResiliencePipeline? pipeline = null) { ILoggerFactory loggerFactory = NullLoggerFactory.Instance; return new PlcListenerSupervisor( plc: MakePlcOptions(port), connectionOptions: MakeConnectionOptions(), pipeline: new NoopPduPipeline(), listenerLogger: loggerFactory.CreateLogger(), multiplexerLogger: loggerFactory.CreateLogger(), pipeLogger: loggerFactory.CreateLogger("Mbproxy.Proxy.UpstreamPipe.Test"), perPlcContext: null, recoveryPipeline: pipeline ?? FastRecoveryPipeline(), logger: loggerFactory.CreateLogger(), backendConnectPipeline: null); } // ── Test 1: starts listener and transitions to Bound ───────────────────────────────── [Fact] public async Task Supervisor_StartsListener_AndTransitionsToBound() { int port = PickFreePort(); await using var supervisor = BuildSupervisor(port); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await supervisor.StartAsync(cts.Token); // Wait for initial bind attempt to complete. await supervisor.WaitForInitialBindAttemptAsync(cts.Token); var snapshot = supervisor.Snapshot(); Assert.Equal(SupervisorState.Bound, snapshot.State); Assert.Null(snapshot.LastBindError); Assert.Equal(0, snapshot.RecoveryAttempts); await supervisor.StopAsync(cts.Token); Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State); } // ── Test 2: port in use → transitions to Recovering ────────────────────────────────── [Fact] public async Task Supervisor_StartFails_WhenPortInUse_TransitionsToRecovering() { int port = PickFreePort(); // Occupy the port BEFORE the supervisor tries to bind. var blocker = new TcpListener(IPAddress.Any, port); blocker.Start(); try { await using var supervisor = BuildSupervisor(port); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await supervisor.StartAsync(cts.Token); // Wait up to 2 s for the supervisor to attempt and fail the bind. using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token); var snapshot = supervisor.Snapshot(); Assert.Equal(SupervisorState.Recovering, snapshot.State); Assert.NotNull(snapshot.LastBindError); Assert.True(snapshot.RecoveryAttempts >= 1, $"Expected RecoveryAttempts >= 1, got {snapshot.RecoveryAttempts}"); await supervisor.StopAsync(cts.Token); } finally { blocker.Stop(); } } // ── Test 3: recovers when port frees ───────────────────────────────────────────────── [Fact] public async Task Supervisor_Recovers_WhenPortFrees() { int port = PickFreePort(); // Occupy the port. var blocker = new TcpListener(IPAddress.Any, port); blocker.Start(); // Use a fast initial backoff of 200 ms so recovery is quick. var pipeline = FastRecoveryPipeline(initialMs: 200, steadyMs: 200); await using var supervisor = BuildSupervisor(port, pipeline); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); await supervisor.StartAsync(cts.Token); // Wait for the supervisor to enter Recovering. using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token); Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State); // Release the port — the supervisor should bind on its next retry (≤ 200 ms + slack). blocker.Stop(); // Poll for up to 3 s for the supervisor to reach Bound. using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); while (!recoveryCts.IsCancellationRequested) { if (supervisor.Snapshot().State == SupervisorState.Bound) break; await Task.Delay(50, TestContext.Current.CancellationToken); } Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State); Assert.True(supervisor.Snapshot().RecoveryAttempts >= 1, "RecoveryAttempts should be ≥ 1 after at least one failed bind"); await supervisor.StopAsync(cts.Token); } // ── Test 4: runtime fault triggers recovery ────────────────────────────────────────── /// /// Genuinely faults the running listener mid-life by stopping its underlying /// via reflection (the only externally-observable hook /// to force the accept loop's to throw /// ). The supervisor must observe the fault, /// transition to , and re-bind on the next /// Polly attempt — emitting one mbproxy.listener.recovered event and bumping /// RecoveryAttempts. /// [Fact] public async Task Supervisor_RuntimeFault_OnRunningListener_RecoversAndRebinds() { int port = PickFreePort(); // Fast retry so the test completes in a few hundred ms. var pipeline = FastRecoveryPipeline(initialMs: 100, steadyMs: 100); await using var supervisor = BuildSupervisor(port, pipeline); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); await supervisor.StartAsync(cts.Token); await supervisor.WaitForInitialBindAttemptAsync(cts.Token); Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State); // Force the accept loop to fault by reaching into PlcListener._listener via // reflection and calling Stop(). PlcListener.RunAsync's catch handler observes // the resulting ObjectDisposedException, returns normally without cancellation, // and the supervisor's "RunAsync returned without cancellation" branch fires — // transitioning to Recovering and re-throwing into Polly. var supervisorType = typeof(PlcListenerSupervisor); var currentListenerField = supervisorType.GetField("_currentListener", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; var listener = currentListenerField.GetValue(supervisor); listener.ShouldNotBeNull("_currentListener must be set after Bound"); var listenerType = listener.GetType(); var innerListenerField = listenerType.GetField("_listener", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; var inner = (TcpListener?)innerListenerField.GetValue(listener); inner.ShouldNotBeNull("PlcListener._listener must be set after StartAsync"); inner!.Stop(); // Wait for re-bind: state must return to Bound and recoveryAttempts must be ≥ 1. bool recovered = false; for (int i = 0; i < 50; i++) { var snap = supervisor.Snapshot(); if (snap.State == SupervisorState.Bound && snap.RecoveryAttempts >= 1) { recovered = true; break; } await Task.Delay(50, cts.Token); } recovered.ShouldBeTrue("supervisor must rebind after the underlying listener is faulted"); var final = supervisor.Snapshot(); final.State.ShouldBe(SupervisorState.Bound); final.RecoveryAttempts.ShouldBeGreaterThanOrEqualTo(1); await supervisor.StopAsync(cts.Token); Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State); } // ── Test 5: StopAsync while in Recovering does not hang ────────────────────────────── [Fact] public async Task Supervisor_Stop_CleanlyTransitionsTo_Stopped_AndCancelsRetry() { int port = PickFreePort(); // Occupy the port so the supervisor stays in Recovering. var blocker = new TcpListener(IPAddress.Any, port); blocker.Start(); try { // Use a very long steady-state delay to prove StopAsync cuts through it. var profile = new RecoveryProfile { InitialBackoffMs = [100], // short initial SteadyStateMs = 30_000, // 30 s — if StopAsync doesn't cancel, test times out }; var pipeline = PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance); await using var supervisor = BuildSupervisor(port, pipeline); using var runCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await supervisor.StartAsync(runCts.Token); // Wait for the supervisor to enter Recovering (failed first bind). using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token); Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State); // Wait a tiny bit to ensure Polly has started the steady-state delay. await Task.Delay(250, TestContext.Current.CancellationToken); // StopAsync must return within ~2 s, NOT wait out the 30 s backoff. using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await supervisor.StopAsync(stopCts.Token); Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State); } finally { blocker.Stop(); } } // ── Test 6: RecoveryAttempts accumulates over lifetime ─────────────────────────────── [Fact] public async Task Supervisor_RecoveryAttempts_AccumulateOverLifetime() { int port = PickFreePort(); // Occupy the port initially. var blocker = new TcpListener(IPAddress.Any, port); blocker.Start(); var pipeline = FastRecoveryPipeline(initialMs: 100, steadyMs: 100); await using var supervisor = BuildSupervisor(port, pipeline); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); await supervisor.StartAsync(cts.Token); // Wait for first recovery attempt. await supervisor.WaitForInitialBindAttemptAsync(cts.Token); Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State); // Wait for a couple more retry cycles (each ~100 ms). await Task.Delay(400, TestContext.Current.CancellationToken); int midCount = supervisor.Snapshot().RecoveryAttempts; Assert.True(midCount >= 1, $"Expected ≥ 1 recovery attempt, got {midCount}"); // Now release the port so the supervisor can recover. blocker.Stop(); await Task.Delay(500, TestContext.Current.CancellationToken); // Verify RecoveryAttempts did NOT reset to 0 after recovery. // It should still show the same value or higher (if another retry happened). int afterCount = supervisor.Snapshot().RecoveryAttempts; Assert.True(afterCount >= midCount, $"RecoveryAttempts should accumulate (was {midCount}, now {afterCount})"); await supervisor.StopAsync(cts.Token); } }