using System.Net.Sockets; namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// /// Concrete Modbus TCP transport. Wraps a single and serializes /// requests so at most one transaction is in-flight at a time — Modbus servers typically /// support concurrent transactions, but the single-flight model keeps the wire trace /// easy to diagnose and avoids interleaved-response correlation bugs. /// public sealed class ModbusTcpTransport : IModbusTransport { private readonly string _host; private readonly int _port; private readonly TimeSpan _timeout; private readonly SemaphoreSlim _gate = new(1, 1); private TcpClient? _client; private NetworkStream? _stream; private ushort _nextTx; private bool _disposed; public ModbusTcpTransport(string host, int port, TimeSpan timeout) { _host = host; _port = port; _timeout = timeout; } public async Task ConnectAsync(CancellationToken ct) { _client = new TcpClient(); using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(_timeout); await _client.ConnectAsync(_host, _port, cts.Token).ConfigureAwait(false); _stream = _client.GetStream(); } public async Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) { if (_disposed) throw new ObjectDisposedException(nameof(ModbusTcpTransport)); if (_stream is null) throw new InvalidOperationException("Transport not connected"); await _gate.WaitAsync(ct).ConfigureAwait(false); try { var txId = ++_nextTx; // MBAP: [TxId(2)][Proto=0(2)][Length(2)][UnitId(1)] + PDU var adu = new byte[7 + pdu.Length]; adu[0] = (byte)(txId >> 8); adu[1] = (byte)(txId & 0xFF); // protocol id already zero var len = (ushort)(1 + pdu.Length); // unit id + pdu adu[4] = (byte)(len >> 8); adu[5] = (byte)(len & 0xFF); adu[6] = unitId; Buffer.BlockCopy(pdu, 0, adu, 7, pdu.Length); using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(_timeout); await _stream.WriteAsync(adu.AsMemory(), cts.Token).ConfigureAwait(false); await _stream.FlushAsync(cts.Token).ConfigureAwait(false); var header = new byte[7]; await ReadExactlyAsync(_stream, header, cts.Token).ConfigureAwait(false); var respTxId = (ushort)((header[0] << 8) | header[1]); if (respTxId != txId) throw new InvalidDataException($"Modbus TxId mismatch: expected {txId} got {respTxId}"); var respLen = (ushort)((header[4] << 8) | header[5]); if (respLen < 1) throw new InvalidDataException($"Modbus response length too small: {respLen}"); var respPdu = new byte[respLen - 1]; await ReadExactlyAsync(_stream, respPdu, cts.Token).ConfigureAwait(false); // Exception PDU: function code has high bit set. if ((respPdu[0] & 0x80) != 0) { var fc = (byte)(respPdu[0] & 0x7F); var ex = respPdu[1]; throw new ModbusException(fc, ex, $"Modbus exception fc={fc} code={ex}"); } return respPdu; } finally { _gate.Release(); } } private static async Task ReadExactlyAsync(Stream s, byte[] buf, CancellationToken ct) { var read = 0; while (read < buf.Length) { var n = await s.ReadAsync(buf.AsMemory(read), ct).ConfigureAwait(false); if (n == 0) throw new EndOfStreamException("Modbus socket closed mid-response"); read += n; } } public async ValueTask DisposeAsync() { if (_disposed) return; _disposed = true; try { if (_stream is not null) await _stream.DisposeAsync().ConfigureAwait(false); } catch { /* best-effort */ } _client?.Dispose(); _gate.Dispose(); } }