Files
wwtools/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/KeepaliveTests.cs
T
Joseph Doherty 0868613890 mbproxy: add keepalive / connection monitoring
The DL205/DL260 ECOM emits no TCP keepalives, so an idle backend socket
can be silently dropped by a middlebox (switch, firewall, NAT) after
2-5 minutes. Enable OS SO_KEEPALIVE on backend and accepted upstream
sockets, and drive a periodic synthetic FC03 heartbeat on each idle
backend socket so a dead path is detected before a real client request
hits it. Controlled by Connection.Keepalive (ON by default).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 09:40:54 -04:00

367 lines
14 KiB
C#

using System.Net;
using System.Net.Sockets;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// Tests for the backend keepalive heartbeat and the <see cref="SocketKeepalive"/> helper.
/// The heartbeat tests run the real <see cref="PlcMultiplexer"/> against a stub backend
/// (real sockets, no simulator) with a deliberately short <c>BackendHeartbeatIdleMs</c>.
/// </summary>
[Trait("Category", "Unit")]
public sealed class KeepaliveTests
{
// ── Helpers ────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static async Task<byte[]> ReadExactAsync(Socket socket, int count, CancellationToken ct)
{
var buf = new byte[count];
int read = 0;
while (read < count)
{
int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
if (n == 0) throw new IOException("EOF");
read += n;
}
return buf;
}
private static async Task<byte[]> ReadOneFrameAsync(Socket socket, CancellationToken ct)
{
var header = await ReadExactAsync(socket, 7, ct);
ushort length = (ushort)((header[4] << 8) | header[5]);
int bodyLen = length - 1;
var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty<byte>();
var frame = new byte[7 + bodyLen];
Buffer.BlockCopy(header, 0, frame, 0, 7);
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
return frame;
}
private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1)
=>
[
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unitId,
0x03,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc03Response(ushort txId, byte unitId, ushort register)
{
// Body = FC(1) + byteCount(1) + data(2) = 4. MBAP length = UnitId(1) + body(4) = 5.
var frame = new byte[7 + 4];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
frame[4] = 0; frame[5] = 5; // length
frame[6] = unitId;
frame[7] = 0x03;
frame[8] = 2; // byte count
frame[9] = (byte)(register >> 8);
frame[10] = (byte)(register & 0xFF);
return frame;
}
private static PerPlcContext MakeContext(string name) => new()
{
PlcName = name,
TagMap = Mbproxy.Bcd.BcdTagMap.Empty,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
};
/// <summary>
/// Stub backend that echoes FC03 responses (including the synthetic heartbeat probe,
/// which is itself an FC03). When <see cref="Silent"/> is set it reads and drains
/// requests but never responds — used to drive heartbeat timeouts.
/// </summary>
private sealed class StubBackend : IAsyncDisposable
{
public int Port { get; }
public volatile bool Silent;
private int _requestCount;
public int RequestCount => Volatile.Read(ref _requestCount);
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly List<Task> _clientTasks = new();
public StubBackend(int port, bool silent = false)
{
Port = port;
Silent = silent;
_listener = new TcpListener(IPAddress.Loopback, port);
_listener.Start();
_ = AcceptLoop();
}
private async Task AcceptLoop()
{
try
{
while (!_cts.IsCancellationRequested)
{
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
var task = Task.Run(() => HandleAsync(s));
lock (_clientTasks) _clientTasks.Add(task);
}
}
catch { /* shutdown */ }
}
private async Task HandleAsync(Socket s)
{
try
{
while (!_cts.IsCancellationRequested)
{
var req = await ReadOneFrameAsync(s, _cts.Token);
if (req.Length < 8) break;
Interlocked.Increment(ref _requestCount);
if (Silent) continue;
ushort txId = (ushort)((req[0] << 8) | req[1]);
byte unitId = req[6];
byte fc = req[7];
if (fc != 0x03) break;
await s.SendAsync(BuildFc03Response(txId, unitId, 0x1234), SocketFlags.None, _cts.Token);
}
}
catch { /* normal */ }
finally { try { s.Dispose(); } catch { } }
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
try { _listener.Stop(); } catch { }
Task[] snap;
lock (_clientTasks) snap = _clientTasks.ToArray();
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
_cts.Dispose();
}
}
private static PlcMultiplexer BuildMux(PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx)
=> new(
plc, connOpts,
new BcdPduPipeline(),
ctx,
NullLogger<PlcMultiplexer>.Instance,
backendConnectPipeline: null);
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener listener)>
ConnectClientAsync(PlcMultiplexer mux, string plcName)
{
int proxyPort = PickFreePort();
var listener = new TcpListener(IPAddress.Loopback, proxyPort);
listener.Start();
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
var upstream = await listener.AcceptSocketAsync();
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
return (client, pipe, listener);
}
// ── SocketKeepalive helper ─────────────────────────────────────────────────
[Fact]
public void SocketKeepalive_Apply_Enabled_TurnsOnKeepAlive()
{
using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketKeepalive.Apply(socket, new KeepaliveOptions
{
Enabled = true,
TcpIdleTimeMs = 30000,
TcpProbeIntervalMs = 5000,
TcpProbeCount = 4,
});
int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!;
keepAlive.ShouldNotBe(0, "SO_KEEPALIVE must be enabled after Apply");
}
[Fact]
public void SocketKeepalive_Apply_Disabled_IsNoOp()
{
using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketKeepalive.Apply(socket, new KeepaliveOptions { Enabled = false });
int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!;
keepAlive.ShouldBe(0, "Apply with Enabled=false must not touch the socket");
}
// ── Backend heartbeat ──────────────────────────────────────────────────────
[Fact]
public async Task Heartbeat_FiresOnIdleBackend_AndIsAnswered_NoCascade()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
var connOpts = new ConnectionOptions
{
Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 600 },
};
await using var mux = BuildMux(plc, connOpts, ctx);
var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name);
try
{
// One real round-trip brings the backend up and starts the heartbeat loop.
await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
// Idle the connection past the heartbeat threshold a few times over.
long sent = 0;
for (int i = 0; i < 60; i++)
{
sent = ctx.Counters.Snapshot().BackendHeartbeatsSent;
if (sent >= 1) break;
await Task.Delay(100, TestContext.Current.CancellationToken);
}
sent.ShouldBeGreaterThanOrEqualTo(1, "an idle backend must receive at least one heartbeat probe");
var snap = ctx.Counters.Snapshot();
snap.BackendHeartbeatsFailed.ShouldBe(0, "an answered heartbeat must not count as failed");
snap.BackendIdleDisconnects.ShouldBe(0, "an answered heartbeat must not tear the backend down");
// The client connection survived — a fresh request still round-trips.
await client.SendAsync(BuildFc03ReadFrame(0x0002, 0, 1), SocketFlags.None);
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0002);
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
[Fact]
public async Task Heartbeat_SuppressedByRealTraffic()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
// Idle threshold well above the request cadence below.
var connOpts = new ConnectionOptions
{
Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 1500 },
};
await using var mux = BuildMux(plc, connOpts, ctx);
var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name);
try
{
// Steady real traffic every ~200 ms for ~2.4 s. Each round-trip refreshes the
// activity timestamp, so the 1500 ms idle threshold is never reached.
for (ushort i = 1; i <= 12; i++)
{
await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
await Task.Delay(200, TestContext.Current.CancellationToken);
}
ctx.Counters.Snapshot().BackendHeartbeatsSent
.ShouldBe(0, "real traffic must keep resetting the idle timer so no heartbeat fires");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
[Fact]
public async Task Heartbeat_Timeout_TearsDownBackend_AndCascades()
{
int backendPort = PickFreePort();
// Silent from the start: the backend accepts the TCP connection and drains every
// frame (including the heartbeat) but never replies.
await using var backend = new StubBackend(backendPort, silent: true);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
var connOpts = new ConnectionOptions
{
BackendRequestTimeoutMs = 500,
Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 700 },
};
await using var mux = BuildMux(plc, connOpts, ctx);
var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name);
try
{
// First request brings the backend TCP connection up and starts the heartbeat
// loop. It will itself time out with 0x0B (the backend never answers) — drain
// and ignore that frame.
await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None);
try
{
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken);
}
catch { /* 0x0B or socket close — not what this test asserts */ }
// The heartbeat fires on the idle socket, never gets answered, and the watchdog
// times it out — which tears the backend down.
long failed = 0, idleDisc = 0;
for (int i = 0; i < 80; i++)
{
var snap = ctx.Counters.Snapshot();
failed = snap.BackendHeartbeatsFailed;
idleDisc = snap.BackendIdleDisconnects;
if (failed >= 1 && idleDisc >= 1) break;
await Task.Delay(100, TestContext.Current.CancellationToken);
}
failed.ShouldBeGreaterThanOrEqualTo(1, "an unanswered heartbeat must count as failed");
idleDisc.ShouldBeGreaterThanOrEqualTo(1, "a failed heartbeat must trigger a backend idle-disconnect");
ctx.Counters.Snapshot().BackendHeartbeatsSent
.ShouldBeGreaterThanOrEqualTo(1, "a heartbeat must have been sent before it could fail");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
}