Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs
Joseph Doherty 18b3e24710 Phase 3 PR 22 — Modbus ISubscribable via polling overlay. Modbus has no push model at the wire protocol (unlike MXAccess's OnDataChange callback or OPC UA's own Subscription service), so the driver layers a per-subscription polling loop on top of the existing IReadable path: SubscribeAsync returns an opaque ModbusSubscriptionHandle, starts a background Task.Run that sleeps for the requested publishing interval (floored to 100ms so a misconfigured sub-10ms request doesn't hammer the PLC), reads every subscribed tag through the same FC01/03/04 path the one-shot ReadAsync uses, diffs the returned DataValueSnapshot against the last known value per tag, and raises OnDataChange exactly when (a) it's the first poll (initial-data push per OPC UA Part 4 convention) or (b) boxed value changed or (c) StatusCode changed — stable values don't generate event traffic after the initial push, matching the v1 MXAccess OnDataChange shape. SubscriptionState record holds the handle + tag list + interval + per-subscription CancellationTokenSource + ConcurrentDictionary<string,DataValueSnapshot> LastValues; UnsubscribeAsync removes the state from _subscriptions and cancels the CTS, stopping the polling loop cleanly. Multiple overlapping subscriptions each get their own polling Task so a slow PLC on one subscription can't stall the others. ShutdownAsync cancels every active subscription CTS before tearing down the transport so the driver doesn't leave orphaned polling tasks pumping requests against a disposed socket. Transient poll errors are swallowed inside the loop (the loop continues to the next tick) — the driver's health surface reflects the last-known Degraded state from the underlying ReadAsync path. OperationCanceledException is caught separately to exit the loop silently on unsubscribe/shutdown. Tests (6 new ModbusSubscriptionTests): Initial_poll_raises_OnDataChange_for_every_subscribed_tag asserts the initial-data push fires once per tag in the subscribe call (2 tags → 2 events with FullReference='Level' and FullReference='Temp'); Unchanged_values_do_not_raise_after_initial_poll lets the loop run ~5 cycles at 100ms with a stable register value, asserts only the initial push fires (no event spam on stable tags); Value_change_between_polls_raises_OnDataChange mutates the fake register bank between poll ticks and asserts a second event fires with the new value (verified via e.Snapshot.Value.ShouldBe((short)200)); Unsubscribe_stops_the_polling_loop captures the event count right after UnsubscribeAsync, mutates a register that would have triggered a change if polling continued, asserts the count stays the same after 400ms; SubscribeAsync_floors_intervals_below_100ms passes a 10ms interval + asserts only 1 event fires across 300ms (if the floor weren't enforced we'd see 30 — the test asserts the floor semantically by counting events on stable data); Multiple_subscriptions_fire_independently creates two subs on different tags, unsubscribes only one, mutates the other's tag, asserts only the surviving sub emits while the unsubscribed one stays at its pre-unsubscribe count. FakeTransport in this test file is scoped to FC03 only since that's all the subscription path exercises — keeps the test doubles minimal and the failure modes obvious. WaitForCountAsync helper polls a ConcurrentQueue up to a deadline, makes the tests tolerant of scheduler jitter on slow CI runners. Full solution: 0 errors, 195 tests pass (6 new subscription + 9 existing Modbus + 180 pre-existing). ModbusDriver now implements IDriver + ITagDiscovery + IReadable + IWritable + ISubscribable — five of the eight capability interfaces. IAlarmSource + IHistoryProvider remain unimplemented because Modbus has no wire-level alarm or history semantics; IHostConnectivityProbe is a plausible future addition (treat transport disconnect as a Stopped signal) but needs the socket-level connection-state tracking plumbed through IModbusTransport which is its own PR.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:03:39 -04:00

402 lines
17 KiB
C#

using System.Buffers.Binary;
using System.Text.Json;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
/// Modbus TCP implementation of <see cref="IDriver"/> + <see cref="ITagDiscovery"/> +
/// <see cref="IReadable"/> + <see cref="IWritable"/>. First native-protocol greenfield
/// driver for the v2 stack — validates the driver-agnostic <c>IAddressSpaceBuilder</c> +
/// <c>IReadable</c>/<c>IWritable</c> abstractions generalize beyond Galaxy.
/// </summary>
/// <remarks>
/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model;
/// subscriptions would need a polling loop over the declared tags — additive PR). Historian
/// + alarm capabilities are out of scope (the protocol doesn't express them).
/// </remarks>
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
{
// Active polling subscriptions. Each subscription owns a background Task that polls the
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
private long _nextSubscriptionId;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly ModbusDriverOptions _options = options;
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
private IModbusTransport? _transport;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private readonly Dictionary<string, ModbusTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
public string DriverInstanceId => driverInstanceId;
public string DriverType => "Modbus";
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
try
{
_transport = _transportFactory(_options);
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
{
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken);
await InitializeAsync(driverConfigJson, cancellationToken);
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
}
_subscriptions.Clear();
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
_transport = null;
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- ITagDiscovery ----
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var folder = builder.Folder("Modbus", "Modbus");
foreach (var t in _options.Tags)
{
folder.Variable(t.Name, t.Name, new DriverAttributeInfo(
FullName: t.Name,
DriverDataType: MapDataType(t.DataType),
IsArray: false,
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
}
return Task.CompletedTask;
}
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var transport = RequireTransport();
var now = DateTime.UtcNow;
var results = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
if (!_tagsByName.TryGetValue(fullReferences[i], out var tag))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdUnknown, null, now);
continue;
}
try
{
var value = await ReadOneAsync(transport, tag, cancellationToken).ConfigureAwait(false);
results[i] = new DataValueSnapshot(value, 0u, now, now);
_health = new DriverHealth(DriverState.Healthy, now, null);
}
catch (Exception ex)
{
results[i] = new DataValueSnapshot(null, StatusBadInternalError, null, now);
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
return results;
}
private async Task<object> ReadOneAsync(IModbusTransport transport, ModbusTagDefinition tag, CancellationToken ct)
{
switch (tag.Region)
{
case ModbusRegion.Coils:
{
var pdu = new byte[] { 0x01, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return (resp[2] & 0x01) == 1;
}
case ModbusRegion.DiscreteInputs:
{
var pdu = new byte[] { 0x02, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return (resp[2] & 0x01) == 1;
}
case ModbusRegion.HoldingRegisters:
case ModbusRegion.InputRegisters:
{
var quantity = RegisterCount(tag.DataType);
var fc = tag.Region == ModbusRegion.HoldingRegisters ? (byte)0x03 : (byte)0x04;
var pdu = new byte[] { fc, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
(byte)(quantity >> 8), (byte)(quantity & 0xFF) };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
// resp = [fc][byte-count][data...]
var data = new ReadOnlySpan<byte>(resp, 2, resp[1]);
return DecodeRegister(data, tag.DataType);
}
default:
throw new InvalidOperationException($"Unknown region {tag.Region}");
}
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
var transport = RequireTransport();
var results = new WriteResult[writes.Count];
for (var i = 0; i < writes.Count; i++)
{
var w = writes[i];
if (!_tagsByName.TryGetValue(w.FullReference, out var tag))
{
results[i] = new WriteResult(StatusBadNodeIdUnknown);
continue;
}
if (!tag.Writable || tag.Region is ModbusRegion.DiscreteInputs or ModbusRegion.InputRegisters)
{
results[i] = new WriteResult(StatusBadNotWritable);
continue;
}
try
{
await WriteOneAsync(transport, tag, w.Value, cancellationToken).ConfigureAwait(false);
results[i] = new WriteResult(0u);
}
catch (Exception)
{
results[i] = new WriteResult(StatusBadInternalError);
}
}
return results;
}
private async Task WriteOneAsync(IModbusTransport transport, ModbusTagDefinition tag, object? value, CancellationToken ct)
{
switch (tag.Region)
{
case ModbusRegion.Coils:
{
var on = Convert.ToBoolean(value);
var pdu = new byte[] { 0x05, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
on ? (byte)0xFF : (byte)0x00, 0x00 };
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return;
}
case ModbusRegion.HoldingRegisters:
{
var bytes = EncodeRegister(value, tag.DataType);
if (bytes.Length == 2)
{
var pdu = new byte[] { 0x06, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
bytes[0], bytes[1] };
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
}
else
{
// FC 16 (Write Multiple Registers) for 32-bit types
var qty = (ushort)(bytes.Length / 2);
var pdu = new byte[6 + 1 + bytes.Length];
pdu[0] = 0x10;
pdu[1] = (byte)(tag.Address >> 8); pdu[2] = (byte)(tag.Address & 0xFF);
pdu[3] = (byte)(qty >> 8); pdu[4] = (byte)(qty & 0xFF);
pdu[5] = (byte)bytes.Length;
Buffer.BlockCopy(bytes, 0, pdu, 6, bytes.Length);
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
}
return;
}
default:
throw new InvalidOperationException($"Writes not supported for region {tag.Region}");
}
}
// ---- ISubscribable (polling overlay) ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var id = Interlocked.Increment(ref _nextSubscriptionId);
var cts = new CancellationTokenSource();
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
: publishingInterval;
var handle = new ModbusSubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
_subscriptions[id] = state;
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
return Task.FromResult<ISubscriptionHandle>(handle);
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
state.Cts.Cancel();
state.Cts.Dispose();
}
return Task.CompletedTask;
}
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
{
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
// current value per Part 4 convention, even if the value never changes thereafter.
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* first-read error — polling continues */ }
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* transient polling error — loop continues, health surface reflects it */ }
}
}
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
{
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < state.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
}
}
}
private sealed record SubscriptionState(
ModbusSubscriptionHandle Handle,
IReadOnlyList<string> TagReferences,
TimeSpan Interval,
CancellationTokenSource Cts)
{
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
}
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
{
public string DiagnosticId => $"modbus-sub-{Id}";
}
// ---- codec ----
internal static ushort RegisterCount(ModbusDataType t) => t switch
{
ModbusDataType.Int16 or ModbusDataType.UInt16 => 1,
ModbusDataType.Int32 or ModbusDataType.UInt32 or ModbusDataType.Float32 => 2,
_ => throw new InvalidOperationException($"Non-register data type {t}"),
};
internal static object DecodeRegister(ReadOnlySpan<byte> data, ModbusDataType t) => t switch
{
ModbusDataType.Int16 => BinaryPrimitives.ReadInt16BigEndian(data),
ModbusDataType.UInt16 => BinaryPrimitives.ReadUInt16BigEndian(data),
ModbusDataType.Int32 => BinaryPrimitives.ReadInt32BigEndian(data),
ModbusDataType.UInt32 => BinaryPrimitives.ReadUInt32BigEndian(data),
ModbusDataType.Float32 => BinaryPrimitives.ReadSingleBigEndian(data),
_ => throw new InvalidOperationException($"Non-register data type {t}"),
};
internal static byte[] EncodeRegister(object? value, ModbusDataType t)
{
switch (t)
{
case ModbusDataType.Int16:
{
var v = Convert.ToInt16(value);
var b = new byte[2];
BinaryPrimitives.WriteInt16BigEndian(b, v);
return b;
}
case ModbusDataType.UInt16:
{
var v = Convert.ToUInt16(value);
var b = new byte[2];
BinaryPrimitives.WriteUInt16BigEndian(b, v);
return b;
}
case ModbusDataType.Int32:
{
var v = Convert.ToInt32(value);
var b = new byte[4];
BinaryPrimitives.WriteInt32BigEndian(b, v);
return b;
}
case ModbusDataType.UInt32:
{
var v = Convert.ToUInt32(value);
var b = new byte[4];
BinaryPrimitives.WriteUInt32BigEndian(b, v);
return b;
}
case ModbusDataType.Float32:
{
var v = Convert.ToSingle(value);
var b = new byte[4];
BinaryPrimitives.WriteSingleBigEndian(b, v);
return b;
}
default:
throw new InvalidOperationException($"Non-register data type {t}");
}
}
private static DriverDataType MapDataType(ModbusDataType t) => t switch
{
ModbusDataType.Bool => DriverDataType.Boolean,
ModbusDataType.Int16 or ModbusDataType.Int32 => DriverDataType.Int32,
ModbusDataType.UInt16 or ModbusDataType.UInt32 => DriverDataType.Int32,
ModbusDataType.Float32 => DriverDataType.Float32,
_ => DriverDataType.Int32,
};
private IModbusTransport RequireTransport() =>
_transport ?? throw new InvalidOperationException("ModbusDriver not initialized");
private const uint StatusBadInternalError = 0x80020000u;
private const uint StatusBadNodeIdUnknown = 0x80340000u;
private const uint StatusBadNotWritable = 0x803B0000u;
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()
{
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
_transport = null;
}
}