using System.Net; using System.Net.Sockets; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Multiplexing; using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; namespace Mbproxy.Tests.Proxy.Multiplexing; /// /// Tests for the backend keepalive heartbeat and the helper. /// The heartbeat tests run the real against a stub backend /// (real sockets, no simulator) with a deliberately short BackendHeartbeatIdleMs. /// [Trait("Category", "Unit")] public sealed class KeepaliveTests { // ── Helpers ──────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static async Task ReadExactAsync(Socket socket, int count, CancellationToken ct) { var buf = new byte[count]; int read = 0; while (read < count) { int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); if (n == 0) throw new IOException("EOF"); read += n; } return buf; } private static async Task ReadOneFrameAsync(Socket socket, CancellationToken ct) { var header = await ReadExactAsync(socket, 7, ct); ushort length = (ushort)((header[4] << 8) | header[5]); int bodyLen = length - 1; var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty(); var frame = new byte[7 + bodyLen]; Buffer.BlockCopy(header, 0, frame, 0, 7); if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); return frame; } private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1) => [ (byte)(txId >> 8), (byte)(txId & 0xFF), 0x00, 0x00, 0x00, 0x06, unitId, 0x03, (byte)(start >> 8), (byte)(start & 0xFF), (byte)(qty >> 8), (byte)(qty & 0xFF), ]; private static byte[] BuildFc03Response(ushort txId, byte unitId, ushort register) { // Body = FC(1) + byteCount(1) + data(2) = 4. MBAP length = UnitId(1) + body(4) = 5. var frame = new byte[7 + 4]; frame[0] = (byte)(txId >> 8); frame[1] = (byte)(txId & 0xFF); frame[2] = 0; frame[3] = 0; frame[4] = 0; frame[5] = 5; // length frame[6] = unitId; frame[7] = 0x03; frame[8] = 2; // byte count frame[9] = (byte)(register >> 8); frame[10] = (byte)(register & 0xFF); return frame; } private static PerPlcContext MakeContext(string name) => new() { PlcName = name, TagMap = Mbproxy.Bcd.BcdTagMap.Empty, Counters = new ProxyCounters(), Logger = NullLogger.Instance, }; /// /// Stub backend that echoes FC03 responses (including the synthetic heartbeat probe, /// which is itself an FC03). When is set it reads and drains /// requests but never responds — used to drive heartbeat timeouts. /// private sealed class StubBackend : IAsyncDisposable { public int Port { get; } public volatile bool Silent; private int _requestCount; public int RequestCount => Volatile.Read(ref _requestCount); private readonly TcpListener _listener; private readonly CancellationTokenSource _cts = new(); private readonly List _clientTasks = new(); public StubBackend(int port, bool silent = false) { Port = port; Silent = silent; _listener = new TcpListener(IPAddress.Loopback, port); _listener.Start(); _ = AcceptLoop(); } private async Task AcceptLoop() { try { while (!_cts.IsCancellationRequested) { Socket s = await _listener.AcceptSocketAsync(_cts.Token); var task = Task.Run(() => HandleAsync(s)); lock (_clientTasks) _clientTasks.Add(task); } } catch { /* shutdown */ } } private async Task HandleAsync(Socket s) { try { while (!_cts.IsCancellationRequested) { var req = await ReadOneFrameAsync(s, _cts.Token); if (req.Length < 8) break; Interlocked.Increment(ref _requestCount); if (Silent) continue; ushort txId = (ushort)((req[0] << 8) | req[1]); byte unitId = req[6]; byte fc = req[7]; if (fc != 0x03) break; await s.SendAsync(BuildFc03Response(txId, unitId, 0x1234), SocketFlags.None, _cts.Token); } } catch { /* normal */ } finally { try { s.Dispose(); } catch { } } } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); try { _listener.Stop(); } catch { } Task[] snap; lock (_clientTasks) snap = _clientTasks.ToArray(); try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } _cts.Dispose(); } } private static PlcMultiplexer BuildMux(PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx) => new( plc, connOpts, new BcdPduPipeline(), ctx, NullLogger.Instance, backendConnectPipeline: null); private static async Task<(Socket client, UpstreamPipe pipe, TcpListener listener)> ConnectClientAsync(PlcMultiplexer mux, string plcName) { int proxyPort = PickFreePort(); var listener = new TcpListener(IPAddress.Loopback, proxyPort); listener.Start(); var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; await client.ConnectAsync(IPAddress.Loopback, proxyPort); var upstream = await listener.AcceptSocketAsync(); var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); return (client, pipe, listener); } // ── SocketKeepalive helper ───────────────────────────────────────────────── [Fact] public void SocketKeepalive_Apply_Enabled_TurnsOnKeepAlive() { using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); SocketKeepalive.Apply(socket, new KeepaliveOptions { Enabled = true, TcpIdleTimeMs = 30000, TcpProbeIntervalMs = 5000, TcpProbeCount = 4, }); int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!; keepAlive.ShouldNotBe(0, "SO_KEEPALIVE must be enabled after Apply"); } [Fact] public void SocketKeepalive_Apply_Disabled_IsNoOp() { using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); SocketKeepalive.Apply(socket, new KeepaliveOptions { Enabled = false }); int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!; keepAlive.ShouldBe(0, "Apply with Enabled=false must not touch the socket"); } // ── Backend heartbeat ────────────────────────────────────────────────────── [Fact] public async Task Heartbeat_FiresOnIdleBackend_AndIsAnswered_NoCascade() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; var connOpts = new ConnectionOptions { Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 600 }, }; await using var mux = BuildMux(plc, connOpts, ctx); var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); try { // One real round-trip brings the backend up and starts the heartbeat loop. await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); // Idle the connection past the heartbeat threshold a few times over. long sent = 0; for (int i = 0; i < 60; i++) { sent = ctx.Counters.Snapshot().BackendHeartbeatsSent; if (sent >= 1) break; await Task.Delay(100, TestContext.Current.CancellationToken); } sent.ShouldBeGreaterThanOrEqualTo(1, "an idle backend must receive at least one heartbeat probe"); var snap = ctx.Counters.Snapshot(); snap.BackendHeartbeatsFailed.ShouldBe(0, "an answered heartbeat must not count as failed"); snap.BackendIdleDisconnects.ShouldBe(0, "an answered heartbeat must not tear the backend down"); // The client connection survived — a fresh request still round-trips. await client.SendAsync(BuildFc03ReadFrame(0x0002, 0, 1), SocketFlags.None); var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0002); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } [Fact] public async Task Heartbeat_SuppressedByRealTraffic() { int backendPort = PickFreePort(); await using var backend = new StubBackend(backendPort); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; // Idle threshold well above the request cadence below. var connOpts = new ConnectionOptions { Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 1500 }, }; await using var mux = BuildMux(plc, connOpts, ctx); var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); try { // Steady real traffic every ~200 ms for ~2.4 s. Each round-trip refreshes the // activity timestamp, so the 1500 ms idle threshold is never reached. for (ushort i = 1; i <= 12; i++) { await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); await Task.Delay(200, TestContext.Current.CancellationToken); } ctx.Counters.Snapshot().BackendHeartbeatsSent .ShouldBe(0, "real traffic must keep resetting the idle timer so no heartbeat fires"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } [Fact] public async Task Heartbeat_Timeout_TearsDownBackend_AndCascades() { int backendPort = PickFreePort(); // Silent from the start: the backend accepts the TCP connection and drains every // frame (including the heartbeat) but never replies. await using var backend = new StubBackend(backendPort, silent: true); var ctx = MakeContext("PLC1"); var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 500, Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 700 }, }; await using var mux = BuildMux(plc, connOpts, ctx); var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); try { // First request brings the backend TCP connection up and starts the heartbeat // loop. It will itself time out with 0x0B (the backend never answers) — drain // and ignore that frame. await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None); try { _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) .WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); } catch { /* 0x0B or socket close — not what this test asserts */ } // The heartbeat fires on the idle socket, never gets answered, and the watchdog // times it out — which tears the backend down. long failed = 0, idleDisc = 0; for (int i = 0; i < 80; i++) { var snap = ctx.Counters.Snapshot(); failed = snap.BackendHeartbeatsFailed; idleDisc = snap.BackendIdleDisconnects; if (failed >= 1 && idleDisc >= 1) break; await Task.Delay(100, TestContext.Current.CancellationToken); } failed.ShouldBeGreaterThanOrEqualTo(1, "an unanswered heartbeat must count as failed"); idleDisc.ShouldBeGreaterThanOrEqualTo(1, "a failed heartbeat must trigger a backend idle-disconnect"); ctx.Counters.Snapshot().BackendHeartbeatsSent .ShouldBeGreaterThanOrEqualTo(1, "a heartbeat must have been sent before it could fail"); } finally { client.Dispose(); await pipe.DisposeAsync(); listener.Stop(); } } }