using System.Buffers.Binary; using System.Text.Json; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// /// Modbus TCP implementation of + + /// + . First native-protocol greenfield /// driver for the v2 stack — validates the driver-agnostic IAddressSpaceBuilder + /// IReadable/IWritable abstractions generalize beyond Galaxy. /// /// /// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model; /// subscriptions would need a polling loop over the declared tags — additive PR). Historian /// + alarm capabilities are out of scope (the protocol doesn't express them). /// public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, Func? transportFactory = null) : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { // Active polling subscriptions. Each subscription owns a background Task that polls the // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange // per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle. private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); private long _nextSubscriptionId; public event EventHandler? OnDataChange; public event EventHandler? OnHostStatusChanged; // Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts" // collection has at most one entry. HostName is the Host:Port string so the Admin UI can // display the PLC endpoint uniformly with Galaxy platforms/engines. private readonly object _probeLock = new(); private HostState _hostState = HostState.Unknown; private DateTime _hostStateChangedUtc = DateTime.UtcNow; private CancellationTokenSource? _probeCts; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); private IModbusTransport? _transport; private DriverHealth _health = new(DriverState.Unknown, null, null); private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase); public string DriverInstanceId => driverInstanceId; public string DriverType => "Modbus"; public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { _health = new DriverHealth(DriverState.Initializing, null, null); try { _transport = _transportFactory(_options); await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false); foreach (var t in _options.Tags) _tagsByName[t.Name] = t; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); // PR 23: kick off the probe loop once the transport is up. Initial state stays // Unknown until the first probe tick succeeds — avoids broadcasting a premature // Running transition before any register round-trip has happened. if (_options.Probe.Enabled) { _probeCts = new CancellationTokenSource(); _ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token); } } catch (Exception ex) { _health = new DriverHealth(DriverState.Faulted, null, ex.Message); throw; } } public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { await ShutdownAsync(cancellationToken); await InitializeAsync(driverConfigJson, cancellationToken); } public async Task ShutdownAsync(CancellationToken cancellationToken) { try { _probeCts?.Cancel(); } catch { } _probeCts?.Dispose(); _probeCts = null; foreach (var state in _subscriptions.Values) { try { state.Cts.Cancel(); } catch { } state.Cts.Dispose(); } _subscriptions.Clear(); if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false); _transport = null; _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); } public DriverHealth GetHealth() => _health; public long GetMemoryFootprint() => 0; public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; // ---- ITagDiscovery ---- public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(builder); var folder = builder.Folder("Modbus", "Modbus"); foreach (var t in _options.Tags) { folder.Variable(t.Name, t.Name, new DriverAttributeInfo( FullName: t.Name, DriverDataType: MapDataType(t.DataType), IsArray: false, ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, IsAlarm: false)); } return Task.CompletedTask; } // ---- IReadable ---- public async Task> ReadAsync( IReadOnlyList fullReferences, CancellationToken cancellationToken) { var transport = RequireTransport(); var now = DateTime.UtcNow; var results = new DataValueSnapshot[fullReferences.Count]; for (var i = 0; i < fullReferences.Count; i++) { if (!_tagsByName.TryGetValue(fullReferences[i], out var tag)) { results[i] = new DataValueSnapshot(null, StatusBadNodeIdUnknown, null, now); continue; } try { var value = await ReadOneAsync(transport, tag, cancellationToken).ConfigureAwait(false); results[i] = new DataValueSnapshot(value, 0u, now, now); _health = new DriverHealth(DriverState.Healthy, now, null); } catch (Exception ex) { results[i] = new DataValueSnapshot(null, StatusBadInternalError, null, now); _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); } } return results; } private async Task ReadOneAsync(IModbusTransport transport, ModbusTagDefinition tag, CancellationToken ct) { switch (tag.Region) { case ModbusRegion.Coils: { var pdu = new byte[] { 0x01, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 }; var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); return (resp[2] & 0x01) == 1; } case ModbusRegion.DiscreteInputs: { var pdu = new byte[] { 0x02, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 }; var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); return (resp[2] & 0x01) == 1; } case ModbusRegion.HoldingRegisters: case ModbusRegion.InputRegisters: { var quantity = RegisterCount(tag); var fc = tag.Region == ModbusRegion.HoldingRegisters ? (byte)0x03 : (byte)0x04; var pdu = new byte[] { fc, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), (byte)(quantity >> 8), (byte)(quantity & 0xFF) }; var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); // resp = [fc][byte-count][data...] var data = new ReadOnlySpan(resp, 2, resp[1]); return DecodeRegister(data, tag); } default: throw new InvalidOperationException($"Unknown region {tag.Region}"); } } // ---- IWritable ---- public async Task> WriteAsync( IReadOnlyList writes, CancellationToken cancellationToken) { var transport = RequireTransport(); var results = new WriteResult[writes.Count]; for (var i = 0; i < writes.Count; i++) { var w = writes[i]; if (!_tagsByName.TryGetValue(w.FullReference, out var tag)) { results[i] = new WriteResult(StatusBadNodeIdUnknown); continue; } if (!tag.Writable || tag.Region is ModbusRegion.DiscreteInputs or ModbusRegion.InputRegisters) { results[i] = new WriteResult(StatusBadNotWritable); continue; } try { await WriteOneAsync(transport, tag, w.Value, cancellationToken).ConfigureAwait(false); results[i] = new WriteResult(0u); } catch (Exception) { results[i] = new WriteResult(StatusBadInternalError); } } return results; } private async Task WriteOneAsync(IModbusTransport transport, ModbusTagDefinition tag, object? value, CancellationToken ct) { switch (tag.Region) { case ModbusRegion.Coils: { var on = Convert.ToBoolean(value); var pdu = new byte[] { 0x05, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), on ? (byte)0xFF : (byte)0x00, 0x00 }; await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); return; } case ModbusRegion.HoldingRegisters: { var bytes = EncodeRegister(value, tag); if (bytes.Length == 2) { var pdu = new byte[] { 0x06, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), bytes[0], bytes[1] }; await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); } else { // FC 16 (Write Multiple Registers) for 32-bit types var qty = (ushort)(bytes.Length / 2); var pdu = new byte[6 + 1 + bytes.Length]; pdu[0] = 0x10; pdu[1] = (byte)(tag.Address >> 8); pdu[2] = (byte)(tag.Address & 0xFF); pdu[3] = (byte)(qty >> 8); pdu[4] = (byte)(qty & 0xFF); pdu[5] = (byte)bytes.Length; Buffer.BlockCopy(bytes, 0, pdu, 6, bytes.Length); await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false); } return; } default: throw new InvalidOperationException($"Writes not supported for region {tag.Region}"); } } // ---- ISubscribable (polling overlay) ---- public Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { var id = Interlocked.Increment(ref _nextSubscriptionId); var cts = new CancellationTokenSource(); var interval = publishingInterval < TimeSpan.FromMilliseconds(100) ? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably : publishingInterval; var handle = new ModbusSubscriptionHandle(id); var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); _subscriptions[id] = state; _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); return Task.FromResult(handle); } public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) { state.Cts.Cancel(); state.Cts.Dispose(); } return Task.CompletedTask; } private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) { // Initial-data push: read every tag once at subscribe time so OPC UA clients see the // current value per Part 4 convention, even if the value never changes thereafter. try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* first-read error — polling continues */ } while (!ct.IsCancellationRequested) { try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* transient polling error — loop continues, health surface reflects it */ } } } private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) { var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); for (var i = 0; i < state.TagReferences.Count; i++) { var tagRef = state.TagReferences[i]; var current = snapshots[i]; var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; // Raise on first read (forceRaise) OR when the boxed value differs from last-known. if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) { state.LastValues[tagRef] = current; OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); } } } private sealed record SubscriptionState( ModbusSubscriptionHandle Handle, IReadOnlyList TagReferences, TimeSpan Interval, CancellationTokenSource Cts) { public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; } = new(StringComparer.OrdinalIgnoreCase); } private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle { public string DiagnosticId => $"modbus-sub-{Id}"; } // ---- IHostConnectivityProbe ---- public IReadOnlyList GetHostStatuses() { lock (_probeLock) return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; } /// /// Host identifier surfaced to IHostConnectivityProbe.GetHostStatuses and the Admin UI. /// Formatted as host:port so multiple Modbus drivers in the same server disambiguate /// by endpoint without needing the driver-instance-id in the Admin dashboard. /// public string HostName => $"{_options.Host}:{_options.Port}"; private async Task ProbeLoopAsync(CancellationToken ct) { var transport = _transport; // captured reference; disposal tears the loop down via ct while (!ct.IsCancellationRequested) { var success = false; try { using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct); probeCts.CancelAfter(_options.Probe.Timeout); var pdu = new byte[] { 0x03, (byte)(_options.Probe.ProbeAddress >> 8), (byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 }; _ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false); success = true; } catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } catch { // transport / timeout / exception PDU — treated as Stopped below } TransitionTo(success ? HostState.Running : HostState.Stopped); try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } } } private void TransitionTo(HostState newState) { HostState old; lock (_probeLock) { old = _hostState; if (old == newState) return; _hostState = newState; _hostStateChangedUtc = DateTime.UtcNow; } OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); } // ---- codec ---- /// /// How many 16-bit registers a given tag occupies. Accounts for multi-register logical /// types (Int32/Float32 = 2 regs, Int64/Float64 = 4 regs) and for strings (rounded up /// from 2 chars per register). /// internal static ushort RegisterCount(ModbusTagDefinition tag) => tag.DataType switch { ModbusDataType.Int16 or ModbusDataType.UInt16 or ModbusDataType.BitInRegister => 1, ModbusDataType.Int32 or ModbusDataType.UInt32 or ModbusDataType.Float32 => 2, ModbusDataType.Int64 or ModbusDataType.UInt64 or ModbusDataType.Float64 => 4, ModbusDataType.String => (ushort)((tag.StringLength + 1) / 2), // 2 chars per register _ => throw new InvalidOperationException($"Non-register data type {tag.DataType}"), }; /// /// Word-swap the input into the big-endian layout the decoders expect. For 2-register /// types this reverses the two words; for 4-register types it reverses the four words /// (PLC stored [hi-mid, low-mid, hi-high, low-high] → memory [hi-high, low-high, hi-mid, low-mid]). /// private static byte[] NormalizeWordOrder(ReadOnlySpan data, ModbusByteOrder order) { if (order == ModbusByteOrder.BigEndian) return data.ToArray(); var result = new byte[data.Length]; for (var word = 0; word < data.Length / 2; word++) { var srcWord = data.Length / 2 - 1 - word; result[word * 2] = data[srcWord * 2]; result[word * 2 + 1] = data[srcWord * 2 + 1]; } return result; } internal static object DecodeRegister(ReadOnlySpan data, ModbusTagDefinition tag) { switch (tag.DataType) { case ModbusDataType.Int16: return BinaryPrimitives.ReadInt16BigEndian(data); case ModbusDataType.UInt16: return BinaryPrimitives.ReadUInt16BigEndian(data); case ModbusDataType.BitInRegister: { var raw = BinaryPrimitives.ReadUInt16BigEndian(data); return (raw & (1 << tag.BitIndex)) != 0; } case ModbusDataType.Int32: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadInt32BigEndian(b); } case ModbusDataType.UInt32: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadUInt32BigEndian(b); } case ModbusDataType.Float32: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadSingleBigEndian(b); } case ModbusDataType.Int64: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadInt64BigEndian(b); } case ModbusDataType.UInt64: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadUInt64BigEndian(b); } case ModbusDataType.Float64: { var b = NormalizeWordOrder(data, tag.ByteOrder); return BinaryPrimitives.ReadDoubleBigEndian(b); } case ModbusDataType.String: { // ASCII, 2 chars per register, packed high byte = first char. // Respect the caller's StringLength (truncate nul-padded regions). var chars = new char[tag.StringLength]; for (var i = 0; i < tag.StringLength; i++) { var b = data[i]; if (b == 0) { return new string(chars, 0, i); } chars[i] = (char)b; } return new string(chars); } default: throw new InvalidOperationException($"Non-register data type {tag.DataType}"); } } internal static byte[] EncodeRegister(object? value, ModbusTagDefinition tag) { switch (tag.DataType) { case ModbusDataType.Int16: { var v = Convert.ToInt16(value); var b = new byte[2]; BinaryPrimitives.WriteInt16BigEndian(b, v); return b; } case ModbusDataType.UInt16: { var v = Convert.ToUInt16(value); var b = new byte[2]; BinaryPrimitives.WriteUInt16BigEndian(b, v); return b; } case ModbusDataType.Int32: { var v = Convert.ToInt32(value); var b = new byte[4]; BinaryPrimitives.WriteInt32BigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.UInt32: { var v = Convert.ToUInt32(value); var b = new byte[4]; BinaryPrimitives.WriteUInt32BigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.Float32: { var v = Convert.ToSingle(value); var b = new byte[4]; BinaryPrimitives.WriteSingleBigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.Int64: { var v = Convert.ToInt64(value); var b = new byte[8]; BinaryPrimitives.WriteInt64BigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.UInt64: { var v = Convert.ToUInt64(value); var b = new byte[8]; BinaryPrimitives.WriteUInt64BigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.Float64: { var v = Convert.ToDouble(value); var b = new byte[8]; BinaryPrimitives.WriteDoubleBigEndian(b, v); return NormalizeWordOrder(b, tag.ByteOrder); } case ModbusDataType.String: { var s = Convert.ToString(value) ?? string.Empty; var regs = (tag.StringLength + 1) / 2; var b = new byte[regs * 2]; for (var i = 0; i < tag.StringLength && i < s.Length; i++) b[i] = (byte)s[i]; // remaining bytes stay 0 — nul-padded per PLC convention return b; } case ModbusDataType.BitInRegister: throw new InvalidOperationException( "BitInRegister writes require a read-modify-write; not supported in PR 24 (separate follow-up)."); default: throw new InvalidOperationException($"Non-register data type {tag.DataType}"); } } private static DriverDataType MapDataType(ModbusDataType t) => t switch { ModbusDataType.Bool or ModbusDataType.BitInRegister => DriverDataType.Boolean, ModbusDataType.Int16 or ModbusDataType.Int32 => DriverDataType.Int32, ModbusDataType.UInt16 or ModbusDataType.UInt32 => DriverDataType.Int32, ModbusDataType.Int64 or ModbusDataType.UInt64 => DriverDataType.Int32, // widening to Int32 loses precision; PR 25 adds Int64 to DriverDataType ModbusDataType.Float32 => DriverDataType.Float32, ModbusDataType.Float64 => DriverDataType.Float64, ModbusDataType.String => DriverDataType.String, _ => DriverDataType.Int32, }; private IModbusTransport RequireTransport() => _transport ?? throw new InvalidOperationException("ModbusDriver not initialized"); private const uint StatusBadInternalError = 0x80020000u; private const uint StatusBadNodeIdUnknown = 0x80340000u; private const uint StatusBadNotWritable = 0x803B0000u; public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false); _transport = null; } }