Files
Joseph Doherty 1a2856526a mbproxy: strip historical phase/wave/plan references from source comments
Comments described the *history* of how the code arrived (phase numbers,
wave IDs, review IDs, dated TODOs) instead of what it does today. That
scaffolding rotted as the codebase evolved. Cleaned 60 source files +
.gitignore; behaviour unchanged (387/387 tests still pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 13:04:30 -04:00

324 lines
14 KiB
C#

using System.Net;
using System.Net.Sockets;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Polly;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Supervision;
/// <summary>
/// Integration tests for <see cref="PlcListenerSupervisor"/> using real sockets.
/// No simulator required — these tests drive bind/recover cycles directly.
/// </summary>
[Trait("Category", "Unit")]
public sealed class SupervisorTests
{
// ── Helpers ───────────────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static PlcOptions MakePlcOptions(int listenPort) => new()
{
Name = "TestPLC",
ListenPort = listenPort,
Host = "127.0.0.1",
Port = 502,
};
private static ConnectionOptions MakeConnectionOptions() => new()
{
BackendConnectTimeoutMs = 500,
BackendRequestTimeoutMs = 3000,
};
/// <summary>
/// Builds a recovery pipeline with very short delays (suitable for tests).
/// </summary>
private static ResiliencePipeline FastRecoveryPipeline(int initialMs = 100, int steadyMs = 100)
{
var profile = new RecoveryProfile
{
InitialBackoffMs = [initialMs, initialMs],
SteadyStateMs = steadyMs,
};
return PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance);
}
private static PlcListenerSupervisor BuildSupervisor(
int port,
ResiliencePipeline? pipeline = null)
{
ILoggerFactory loggerFactory = NullLoggerFactory.Instance;
return new PlcListenerSupervisor(
plc: MakePlcOptions(port),
connectionOptions: MakeConnectionOptions(),
pipeline: new NoopPduPipeline(),
listenerLogger: loggerFactory.CreateLogger<PlcListener>(),
multiplexerLogger: loggerFactory.CreateLogger<Mbproxy.Proxy.Multiplexing.PlcMultiplexer>(),
pipeLogger: loggerFactory.CreateLogger("Mbproxy.Proxy.UpstreamPipe.Test"),
perPlcContext: null,
recoveryPipeline: pipeline ?? FastRecoveryPipeline(),
logger: loggerFactory.CreateLogger<PlcListenerSupervisor>(),
backendConnectPipeline: null);
}
// ── Test 1: starts listener and transitions to Bound ─────────────────────────────────
[Fact]
public async Task Supervisor_StartsListener_AndTransitionsToBound()
{
int port = PickFreePort();
await using var supervisor = BuildSupervisor(port);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await supervisor.StartAsync(cts.Token);
// Wait for initial bind attempt to complete.
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
var snapshot = supervisor.Snapshot();
Assert.Equal(SupervisorState.Bound, snapshot.State);
Assert.Null(snapshot.LastBindError);
Assert.Equal(0, snapshot.RecoveryAttempts);
await supervisor.StopAsync(cts.Token);
Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State);
}
// ── Test 2: port in use → transitions to Recovering ──────────────────────────────────
[Fact]
public async Task Supervisor_StartFails_WhenPortInUse_TransitionsToRecovering()
{
int port = PickFreePort();
// Occupy the port BEFORE the supervisor tries to bind.
var blocker = new TcpListener(IPAddress.Any, port);
blocker.Start();
try
{
await using var supervisor = BuildSupervisor(port);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await supervisor.StartAsync(cts.Token);
// Wait up to 2 s for the supervisor to attempt and fail the bind.
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token);
var snapshot = supervisor.Snapshot();
Assert.Equal(SupervisorState.Recovering, snapshot.State);
Assert.NotNull(snapshot.LastBindError);
Assert.True(snapshot.RecoveryAttempts >= 1,
$"Expected RecoveryAttempts >= 1, got {snapshot.RecoveryAttempts}");
await supervisor.StopAsync(cts.Token);
}
finally
{
blocker.Stop();
}
}
// ── Test 3: recovers when port frees ─────────────────────────────────────────────────
[Fact]
public async Task Supervisor_Recovers_WhenPortFrees()
{
int port = PickFreePort();
// Occupy the port.
var blocker = new TcpListener(IPAddress.Any, port);
blocker.Start();
// Use a fast initial backoff of 200 ms so recovery is quick.
var pipeline = FastRecoveryPipeline(initialMs: 200, steadyMs: 200);
await using var supervisor = BuildSupervisor(port, pipeline);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await supervisor.StartAsync(cts.Token);
// Wait for the supervisor to enter Recovering.
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token);
Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State);
// Release the port — the supervisor should bind on its next retry (≤ 200 ms + slack).
blocker.Stop();
// Poll for up to 3 s for the supervisor to reach Bound.
using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (!recoveryCts.IsCancellationRequested)
{
if (supervisor.Snapshot().State == SupervisorState.Bound)
break;
await Task.Delay(50, TestContext.Current.CancellationToken);
}
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
Assert.True(supervisor.Snapshot().RecoveryAttempts >= 1,
"RecoveryAttempts should be ≥ 1 after at least one failed bind");
await supervisor.StopAsync(cts.Token);
}
// ── Test 4: runtime fault triggers recovery ──────────────────────────────────────────
/// <summary>
/// Genuinely faults the running listener mid-life by stopping its underlying
/// <see cref="TcpListener"/> via reflection (the only externally-observable hook
/// to force the accept loop's <see cref="Socket.AcceptAsync"/> to throw
/// <see cref="ObjectDisposedException"/>). The supervisor must observe the fault,
/// transition to <see cref="SupervisorState.Recovering"/>, and re-bind on the next
/// Polly attempt — emitting one <c>mbproxy.listener.recovered</c> event and bumping
/// <c>RecoveryAttempts</c>.
/// </summary>
[Fact]
public async Task Supervisor_RuntimeFault_OnRunningListener_RecoversAndRebinds()
{
int port = PickFreePort();
// Fast retry so the test completes in a few hundred ms.
var pipeline = FastRecoveryPipeline(initialMs: 100, steadyMs: 100);
await using var supervisor = BuildSupervisor(port, pipeline);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await supervisor.StartAsync(cts.Token);
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
Assert.Equal(SupervisorState.Bound, supervisor.Snapshot().State);
// Force the accept loop to fault by reaching into PlcListener._listener via
// reflection and calling Stop(). PlcListener.RunAsync's catch handler observes
// the resulting ObjectDisposedException, returns normally without cancellation,
// and the supervisor's "RunAsync returned without cancellation" branch fires —
// transitioning to Recovering and re-throwing into Polly.
var supervisorType = typeof(PlcListenerSupervisor);
var currentListenerField = supervisorType.GetField("_currentListener",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
var listener = currentListenerField.GetValue(supervisor);
listener.ShouldNotBeNull("_currentListener must be set after Bound");
var listenerType = listener.GetType();
var innerListenerField = listenerType.GetField("_listener",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
var inner = (TcpListener?)innerListenerField.GetValue(listener);
inner.ShouldNotBeNull("PlcListener._listener must be set after StartAsync");
inner!.Stop();
// Wait for re-bind: state must return to Bound and recoveryAttempts must be ≥ 1.
bool recovered = false;
for (int i = 0; i < 50; i++)
{
var snap = supervisor.Snapshot();
if (snap.State == SupervisorState.Bound && snap.RecoveryAttempts >= 1)
{
recovered = true;
break;
}
await Task.Delay(50, cts.Token);
}
recovered.ShouldBeTrue("supervisor must rebind after the underlying listener is faulted");
var final = supervisor.Snapshot();
final.State.ShouldBe(SupervisorState.Bound);
final.RecoveryAttempts.ShouldBeGreaterThanOrEqualTo(1);
await supervisor.StopAsync(cts.Token);
Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State);
}
// ── Test 5: StopAsync while in Recovering does not hang ──────────────────────────────
[Fact]
public async Task Supervisor_Stop_CleanlyTransitionsTo_Stopped_AndCancelsRetry()
{
int port = PickFreePort();
// Occupy the port so the supervisor stays in Recovering.
var blocker = new TcpListener(IPAddress.Any, port);
blocker.Start();
try
{
// Use a very long steady-state delay to prove StopAsync cuts through it.
var profile = new RecoveryProfile
{
InitialBackoffMs = [100], // short initial
SteadyStateMs = 30_000, // 30 s — if StopAsync doesn't cancel, test times out
};
var pipeline = PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance);
await using var supervisor = BuildSupervisor(port, pipeline);
using var runCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await supervisor.StartAsync(runCts.Token);
// Wait for the supervisor to enter Recovering (failed first bind).
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await supervisor.WaitForInitialBindAttemptAsync(waitCts.Token);
Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State);
// Wait a tiny bit to ensure Polly has started the steady-state delay.
await Task.Delay(250, TestContext.Current.CancellationToken);
// StopAsync must return within ~2 s, NOT wait out the 30 s backoff.
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await supervisor.StopAsync(stopCts.Token);
Assert.Equal(SupervisorState.Stopped, supervisor.Snapshot().State);
}
finally
{
blocker.Stop();
}
}
// ── Test 6: RecoveryAttempts accumulates over lifetime ───────────────────────────────
[Fact]
public async Task Supervisor_RecoveryAttempts_AccumulateOverLifetime()
{
int port = PickFreePort();
// Occupy the port initially.
var blocker = new TcpListener(IPAddress.Any, port);
blocker.Start();
var pipeline = FastRecoveryPipeline(initialMs: 100, steadyMs: 100);
await using var supervisor = BuildSupervisor(port, pipeline);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await supervisor.StartAsync(cts.Token);
// Wait for first recovery attempt.
await supervisor.WaitForInitialBindAttemptAsync(cts.Token);
Assert.Equal(SupervisorState.Recovering, supervisor.Snapshot().State);
// Wait for a couple more retry cycles (each ~100 ms).
await Task.Delay(400, TestContext.Current.CancellationToken);
int midCount = supervisor.Snapshot().RecoveryAttempts;
Assert.True(midCount >= 1, $"Expected ≥ 1 recovery attempt, got {midCount}");
// Now release the port so the supervisor can recover.
blocker.Stop();
await Task.Delay(500, TestContext.Current.CancellationToken);
// Verify RecoveryAttempts did NOT reset to 0 after recovery.
// It should still show the same value or higher (if another retry happened).
int afterCount = supervisor.Snapshot().RecoveryAttempts;
Assert.True(afterCount >= midCount,
$"RecoveryAttempts should accumulate (was {midCount}, now {afterCount})");
await supervisor.StopAsync(cts.Token);
}
}