diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 12e2421..2d2eec9 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -37,7 +37,7 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta private CancellationTokenSource? _probeCts; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = - transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); + transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect)); private IModbusTransport? _transport; private DriverHealth _health = new(DriverState.Unknown, null, null); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index 4bc1db4..e05c44d 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -45,6 +45,17 @@ public sealed class ModbusDriverOptions /// by shortening the tag's StringLength or splitting it into multiple tags). /// public ushort MaxRegistersPerWrite { get; init; } = 123; + + /// + /// When true (default) the built-in detects + /// mid-transaction socket failures (, + /// ) and transparently reconnects + + /// retries the PDU exactly once. Required for DL205/DL260 because the H2-ECOM100 + /// does not send TCP keepalives — intermediate NAT / firewall devices silently close + /// idle sockets and the first send after the drop would otherwise surface as a + /// connection error to the caller even though the PLC is up. + /// + public bool AutoReconnect { get; init; } = true; } public sealed class ModbusProbeOptions diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusTcpTransport.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusTcpTransport.cs index 381e011..662e232 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusTcpTransport.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusTcpTransport.cs @@ -8,22 +8,40 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// support concurrent transactions, but the single-flight model keeps the wire trace /// easy to diagnose and avoids interleaved-response correlation bugs. /// +/// +/// +/// Survives mid-transaction socket drops: when a send/read fails with a socket-level +/// error (, , ) +/// the transport disposes the dead socket, reconnects, and retries the PDU exactly +/// once. Deliberately limited to a single retry — further failures bubble up so the +/// driver's health surface reflects the real state instead of masking a dead PLC. +/// +/// +/// Why this matters for DL205/DL260: the AutomationDirect H2-ECOM100 does NOT send +/// TCP keepalives per docs/v2/dl205.md §behavioral-oddities, so any NAT/firewall +/// between the gateway and PLC can silently close an idle socket after 2-5 minutes. +/// Also enables OS-level SO_KEEPALIVE so the driver's own side detects a stuck +/// socket in reasonable time even when the application is mostly idle. +/// +/// public sealed class ModbusTcpTransport : IModbusTransport { private readonly string _host; private readonly int _port; private readonly TimeSpan _timeout; + private readonly bool _autoReconnect; private readonly SemaphoreSlim _gate = new(1, 1); private TcpClient? _client; private NetworkStream? _stream; private ushort _nextTx; private bool _disposed; - public ModbusTcpTransport(string host, int port, TimeSpan timeout) + public ModbusTcpTransport(string host, int port, TimeSpan timeout, bool autoReconnect = true) { _host = host; _port = port; _timeout = timeout; + _autoReconnect = autoReconnect; } public async Task ConnectAsync(CancellationToken ct) @@ -39,12 +57,34 @@ public sealed class ModbusTcpTransport : IModbusTransport var target = ipv4 ?? (addresses.Length > 0 ? addresses[0] : System.Net.IPAddress.Loopback); _client = new TcpClient(target.AddressFamily); + EnableKeepAlive(_client); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(_timeout); await _client.ConnectAsync(target, _port, cts.Token).ConfigureAwait(false); _stream = _client.GetStream(); } + /// + /// Enable SO_KEEPALIVE with aggressive probe timing. DL205/DL260 doesn't send keepalives + /// itself; having the OS probe the socket every ~30s lets the driver notice a dead PLC + /// or broken NAT path long before the default 2-hour Windows idle timeout fires. + /// Non-fatal if the underlying OS rejects the option (some older Linux / container + /// sandboxes don't expose the fine-grained timing levers — the driver still works, + /// application-level probe still detects problems). + /// + private static void EnableKeepAlive(TcpClient client) + { + try + { + client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 30); + client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 10); + client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3); + } + catch { /* best-effort; older OSes may not expose the granular knobs */ } + } + public async Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) { if (_disposed) throw new ObjectDisposedException(nameof(ModbusTcpTransport)); @@ -53,43 +93,18 @@ public sealed class ModbusTcpTransport : IModbusTransport await _gate.WaitAsync(ct).ConfigureAwait(false); try { - var txId = ++_nextTx; - - // MBAP: [TxId(2)][Proto=0(2)][Length(2)][UnitId(1)] + PDU - var adu = new byte[7 + pdu.Length]; - adu[0] = (byte)(txId >> 8); - adu[1] = (byte)(txId & 0xFF); - // protocol id already zero - var len = (ushort)(1 + pdu.Length); // unit id + pdu - adu[4] = (byte)(len >> 8); - adu[5] = (byte)(len & 0xFF); - adu[6] = unitId; - Buffer.BlockCopy(pdu, 0, adu, 7, pdu.Length); - - using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - cts.CancelAfter(_timeout); - await _stream.WriteAsync(adu.AsMemory(), cts.Token).ConfigureAwait(false); - await _stream.FlushAsync(cts.Token).ConfigureAwait(false); - - var header = new byte[7]; - await ReadExactlyAsync(_stream, header, cts.Token).ConfigureAwait(false); - var respTxId = (ushort)((header[0] << 8) | header[1]); - if (respTxId != txId) - throw new InvalidDataException($"Modbus TxId mismatch: expected {txId} got {respTxId}"); - var respLen = (ushort)((header[4] << 8) | header[5]); - if (respLen < 1) throw new InvalidDataException($"Modbus response length too small: {respLen}"); - var respPdu = new byte[respLen - 1]; - await ReadExactlyAsync(_stream, respPdu, cts.Token).ConfigureAwait(false); - - // Exception PDU: function code has high bit set. - if ((respPdu[0] & 0x80) != 0) + try { - var fc = (byte)(respPdu[0] & 0x7F); - var ex = respPdu[1]; - throw new ModbusException(fc, ex, $"Modbus exception fc={fc} code={ex}"); + return await SendOnceAsync(unitId, pdu, ct).ConfigureAwait(false); + } + catch (Exception ex) when (_autoReconnect && IsSocketLevelFailure(ex)) + { + // Mid-transaction drop: tear down the dead socket, reconnect, resend. Single + // retry — if it fails again, let it propagate so health/status reflect reality. + await TearDownAsync().ConfigureAwait(false); + await ConnectAsync(ct).ConfigureAwait(false); + return await SendOnceAsync(unitId, pdu, ct).ConfigureAwait(false); } - - return respPdu; } finally { @@ -97,6 +112,68 @@ public sealed class ModbusTcpTransport : IModbusTransport } } + private async Task SendOnceAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + if (_stream is null) throw new InvalidOperationException("Transport not connected"); + var txId = ++_nextTx; + + // MBAP: [TxId(2)][Proto=0(2)][Length(2)][UnitId(1)] + PDU + var adu = new byte[7 + pdu.Length]; + adu[0] = (byte)(txId >> 8); + adu[1] = (byte)(txId & 0xFF); + // protocol id already zero + var len = (ushort)(1 + pdu.Length); // unit id + pdu + adu[4] = (byte)(len >> 8); + adu[5] = (byte)(len & 0xFF); + adu[6] = unitId; + Buffer.BlockCopy(pdu, 0, adu, 7, pdu.Length); + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(_timeout); + await _stream.WriteAsync(adu.AsMemory(), cts.Token).ConfigureAwait(false); + await _stream.FlushAsync(cts.Token).ConfigureAwait(false); + + var header = new byte[7]; + await ReadExactlyAsync(_stream, header, cts.Token).ConfigureAwait(false); + var respTxId = (ushort)((header[0] << 8) | header[1]); + if (respTxId != txId) + throw new InvalidDataException($"Modbus TxId mismatch: expected {txId} got {respTxId}"); + var respLen = (ushort)((header[4] << 8) | header[5]); + if (respLen < 1) throw new InvalidDataException($"Modbus response length too small: {respLen}"); + var respPdu = new byte[respLen - 1]; + await ReadExactlyAsync(_stream, respPdu, cts.Token).ConfigureAwait(false); + + // Exception PDU: function code has high bit set. + if ((respPdu[0] & 0x80) != 0) + { + var fc = (byte)(respPdu[0] & 0x7F); + var ex = respPdu[1]; + throw new ModbusException(fc, ex, $"Modbus exception fc={fc} code={ex}"); + } + + return respPdu; + } + + /// + /// Distinguish socket-layer failures (eligible for reconnect-and-retry) from + /// protocol-layer failures (must propagate — retrying the same PDU won't help if the + /// PLC just returned exception 02 Illegal Data Address). + /// + private static bool IsSocketLevelFailure(Exception ex) => + ex is EndOfStreamException + || ex is IOException + || ex is SocketException + || ex is ObjectDisposedException; + + private async Task TearDownAsync() + { + try { if (_stream is not null) await _stream.DisposeAsync().ConfigureAwait(false); } + catch { /* best-effort */ } + _stream = null; + try { _client?.Dispose(); } catch { } + _client = null; + } + private static async Task ReadExactlyAsync(Stream s, byte[] buf, CancellationToken ct) { var read = 0; diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusTcpReconnectTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusTcpReconnectTests.cs new file mode 100644 index 0000000..1f29c29 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusTcpReconnectTests.cs @@ -0,0 +1,146 @@ +using System.Net; +using System.Net.Sockets; +using Shouldly; +using Xunit; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +/// +/// Exercises against a real TCP listener that can close +/// its socket mid-session on demand. Verifies the PR 53 reconnect-on-drop behavior: after +/// the "first" socket is forcibly torn down, the next SendAsync must re-establish the +/// connection and complete the PDU without bubbling an error to the caller. +/// +[Trait("Category", "Unit")] +public sealed class ModbusTcpReconnectTests +{ + /// + /// Minimal in-process Modbus-TCP stub. Accepts one TCP connection at a time, reads an + /// MBAP + PDU, replies with a canned FC03 response echoing the request quantity of + /// zeroed bytes, then optionally closes the socket to simulate a NAT/firewall drop. + /// + private sealed class FlakeyModbusServer : IAsyncDisposable + { + private readonly TcpListener _listener; + public int Port => ((IPEndPoint)_listener.LocalEndpoint).Port; + public int DropAfterNTransactions { get; set; } = int.MaxValue; + private readonly CancellationTokenSource _stop = new(); + private int _txCount; + + public FlakeyModbusServer() + { + _listener = new TcpListener(IPAddress.Loopback, 0); + _listener.Start(); + _ = Task.Run(AcceptLoopAsync); + } + + private async Task AcceptLoopAsync() + { + while (!_stop.IsCancellationRequested) + { + TcpClient? client = null; + try { client = await _listener.AcceptTcpClientAsync(_stop.Token); } + catch { return; } + + _ = Task.Run(() => ServeAsync(client!)); + } + } + + private async Task ServeAsync(TcpClient client) + { + try + { + using var _ = client; + var stream = client.GetStream(); + while (!_stop.IsCancellationRequested && client.Connected) + { + var header = new byte[7]; + if (!await ReadExactly(stream, header)) return; + var len = (ushort)((header[4] << 8) | header[5]); + var pdu = new byte[len - 1]; + if (!await ReadExactly(stream, pdu)) return; + + var fc = pdu[0]; + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var respPdu = new byte[2 + qty * 2]; + respPdu[0] = fc; + respPdu[1] = (byte)(qty * 2); + // data bytes stay 0 + + var respLen = (ushort)(1 + respPdu.Length); + var adu = new byte[7 + respPdu.Length]; + adu[0] = header[0]; adu[1] = header[1]; + adu[4] = (byte)(respLen >> 8); adu[5] = (byte)(respLen & 0xFF); + adu[6] = header[6]; + Buffer.BlockCopy(respPdu, 0, adu, 7, respPdu.Length); + await stream.WriteAsync(adu); + await stream.FlushAsync(); + + _txCount++; + if (_txCount >= DropAfterNTransactions) + { + // Simulate NAT/firewall silent close: slam the socket without a + // protocol-level goodbye, which is what DL260 + an intermediate + // middlebox would look like from the client's perspective. + client.Client.Shutdown(SocketShutdown.Both); + client.Close(); + return; + } + } + } + catch { /* best-effort */ } + } + + private static async Task ReadExactly(NetworkStream s, byte[] buf) + { + var read = 0; + while (read < buf.Length) + { + var n = await s.ReadAsync(buf.AsMemory(read)); + if (n == 0) return false; + read += n; + } + return true; + } + + public async ValueTask DisposeAsync() + { + _stop.Cancel(); + _listener.Stop(); + await Task.CompletedTask; + } + } + + [Fact] + public async Task Transport_recovers_from_mid_session_drop_and_retries_successfully() + { + await using var server = new FlakeyModbusServer { DropAfterNTransactions = 1 }; + await using var transport = new ModbusTcpTransport("127.0.0.1", server.Port, TimeSpan.FromSeconds(2), autoReconnect: true); + await transport.ConnectAsync(TestContext.Current.CancellationToken); + + // First transaction succeeds; server then closes the socket. + var pdu = new byte[] { 0x03, 0x00, 0x00, 0x00, 0x01 }; + var first = await transport.SendAsync(unitId: 1, pdu, TestContext.Current.CancellationToken); + first[0].ShouldBe((byte)0x03); + + // Second transaction: the connection is dead, but auto-reconnect must transparently + // spin up a new socket, resend, and produce a valid response. Before PR 53 this would + // surface as EndOfStreamException / IOException to the caller. + var second = await transport.SendAsync(unitId: 1, pdu, TestContext.Current.CancellationToken); + second[0].ShouldBe((byte)0x03); + } + + [Fact] + public async Task Transport_without_AutoReconnect_propagates_drop_to_caller() + { + await using var server = new FlakeyModbusServer { DropAfterNTransactions = 1 }; + await using var transport = new ModbusTcpTransport("127.0.0.1", server.Port, TimeSpan.FromSeconds(2), autoReconnect: false); + await transport.ConnectAsync(TestContext.Current.CancellationToken); + + var pdu = new byte[] { 0x03, 0x00, 0x00, 0x00, 0x01 }; + _ = await transport.SendAsync(unitId: 1, pdu, TestContext.Current.CancellationToken); + + await Should.ThrowAsync(async () => + await transport.SendAsync(unitId: 1, pdu, TestContext.Current.CancellationToken)); + } +}