Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty
18b3e24710 Phase 3 PR 22 — Modbus ISubscribable via polling overlay. Modbus has no push model at the wire protocol (unlike MXAccess's OnDataChange callback or OPC UA's own Subscription service), so the driver layers a per-subscription polling loop on top of the existing IReadable path: SubscribeAsync returns an opaque ModbusSubscriptionHandle, starts a background Task.Run that sleeps for the requested publishing interval (floored to 100ms so a misconfigured sub-10ms request doesn't hammer the PLC), reads every subscribed tag through the same FC01/03/04 path the one-shot ReadAsync uses, diffs the returned DataValueSnapshot against the last known value per tag, and raises OnDataChange exactly when (a) it's the first poll (initial-data push per OPC UA Part 4 convention) or (b) boxed value changed or (c) StatusCode changed — stable values don't generate event traffic after the initial push, matching the v1 MXAccess OnDataChange shape. SubscriptionState record holds the handle + tag list + interval + per-subscription CancellationTokenSource + ConcurrentDictionary<string,DataValueSnapshot> LastValues; UnsubscribeAsync removes the state from _subscriptions and cancels the CTS, stopping the polling loop cleanly. Multiple overlapping subscriptions each get their own polling Task so a slow PLC on one subscription can't stall the others. ShutdownAsync cancels every active subscription CTS before tearing down the transport so the driver doesn't leave orphaned polling tasks pumping requests against a disposed socket. Transient poll errors are swallowed inside the loop (the loop continues to the next tick) — the driver's health surface reflects the last-known Degraded state from the underlying ReadAsync path. OperationCanceledException is caught separately to exit the loop silently on unsubscribe/shutdown. Tests (6 new ModbusSubscriptionTests): Initial_poll_raises_OnDataChange_for_every_subscribed_tag asserts the initial-data push fires once per tag in the subscribe call (2 tags → 2 events with FullReference='Level' and FullReference='Temp'); Unchanged_values_do_not_raise_after_initial_poll lets the loop run ~5 cycles at 100ms with a stable register value, asserts only the initial push fires (no event spam on stable tags); Value_change_between_polls_raises_OnDataChange mutates the fake register bank between poll ticks and asserts a second event fires with the new value (verified via e.Snapshot.Value.ShouldBe((short)200)); Unsubscribe_stops_the_polling_loop captures the event count right after UnsubscribeAsync, mutates a register that would have triggered a change if polling continued, asserts the count stays the same after 400ms; SubscribeAsync_floors_intervals_below_100ms passes a 10ms interval + asserts only 1 event fires across 300ms (if the floor weren't enforced we'd see 30 — the test asserts the floor semantically by counting events on stable data); Multiple_subscriptions_fire_independently creates two subs on different tags, unsubscribes only one, mutates the other's tag, asserts only the surviving sub emits while the unsubscribed one stays at its pre-unsubscribe count. FakeTransport in this test file is scoped to FC03 only since that's all the subscription path exercises — keeps the test doubles minimal and the failure modes obvious. WaitForCountAsync helper polls a ConcurrentQueue up to a deadline, makes the tests tolerant of scheduler jitter on slow CI runners. Full solution: 0 errors, 195 tests pass (6 new subscription + 9 existing Modbus + 180 pre-existing). ModbusDriver now implements IDriver + ITagDiscovery + IReadable + IWritable + ISubscribable — five of the eight capability interfaces. IAlarmSource + IHistoryProvider remain unimplemented because Modbus has no wire-level alarm or history semantics; IHostConnectivityProbe is a plausible future addition (treat transport disconnect as a Stopped signal) but needs the socket-level connection-state tracking plumbed through IModbusTransport which is its own PR.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:03:39 -04:00
f6a12dafe9 Merge pull request 'Phase 3 PR 21 — Modbus TCP driver (first native-protocol greenfield)' (#20) from phase-3-pr21-modbus-driver into v2 2026-04-18 11:58:20 -04:00
Joseph Doherty
058c3dddd3 Phase 3 PR 21 — Modbus TCP driver: first native-protocol greenfield for v2. New src/Driver.Modbus project with ModbusDriver implementing IDriver + ITagDiscovery + IReadable + IWritable. Validates the driver-agnostic abstractions (IAddressSpaceBuilder, DriverAttributeInfo, DataValueSnapshot, WriteRequest/WriteResult) generalize beyond Galaxy — nothing Galaxy-specific is used here. ModbusDriverOptions carries Host/Port/UnitId/Timeout + a pre-declared tag list (Modbus has no discovery protocol — tags are configuration). IModbusTransport abstracts the socket layer so tests swap in-memory fakes; concrete ModbusTcpTransport speaks the MBAP ADU (TxId + Protocol=0 + Length + UnitId + PDU) over TcpClient, serializes requests through a semaphore for single-flight in-order responses, validates the response TxId matches, surfaces server exception PDUs as ModbusException with function code + exception code. DiscoverAsync streams one folder per driver with a BaseDataVariable per tag + DriverAttributeInfo that flags writable tags as SecurityClassification.Operate vs ViewOnly for read-only regions. ReadAsync routes per-tag by ModbusRegion: FC01 for Coils, FC02 for DiscreteInputs, FC03 for HoldingRegisters, FC04 for InputRegisters; register values decoded through System.Buffers.Binary.BinaryPrimitives (BigEndian for single-register Int16/UInt16 + two-register Int32/UInt32/Float32 per standard modbus word-swap conventions). WriteAsync uses FC05 (Write Single Coil with 0xFF00/0x0000 encoding) for booleans, FC06 (Write Single Register) for 16-bit types, FC16 (Write Multiple Registers) for 32-bit types. Unknown tag → BadNodeIdUnknown; write to InputRegister or DiscreteInput or Writable=false tag → BadNotWritable; exception during transport → BadInternalError + driver health Degraded. Subscriptions + Historian + Alarms deliberately out of scope — Modbus has no push model (subscribe would be a polling overlay, additive PR) and no history/alarm semantics at the protocol level. Tests (9 new ModbusDriverTests): InitializeAsync connects + populates the tag map + sets health=Healthy; Read Int16 from HoldingRegister returns BigEndian value; Read Float32 spans two registers BigEndian (IEEE 754 single for 25.5f round-trips exactly); Read Coil returns boolean from the bit-packed response; unknown tag name returns BadNodeIdUnknown without an exception; Write UInt16 round-trips via FC06; Write Float32 uses FC16 (two-register write verified by decoding back through the fake register bank); Write to InputRegister returns BadNotWritable; Discover streams one folder + one variable per tag with correct DriverDataType mapping (Int16/Int32→Int32, UInt16/UInt32→Int32, Float32→Float32, Bool→Boolean). FakeTransport simulates a 256-register/256-coil bank + implements the 7 function codes the driver uses. slnx updated with the new src + tests entries. Full solution post-add: 0 errors, 189 tests pass (9 Modbus + 180 pre-existing). IDriver abstraction validated against a fundamentally different protocol — Modbus TCP has no AlarmExtension, no ScanState, no IPC boundary, no historian, no LDAP — and the same builder/reader/writer contract plugged straight in. Future PRs on this driver: ISubscribable via a polling loop, IHostConnectivityProbe for dead-device detection, PLC-specific data-type extensions (Int64/BCD/string-in-registers).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:55:21 -04:00
52791952dd Merge pull request 'Phase 3 PR 20 — lmx-followups.md' (#19) from phase-3-pr20-lmx-followups into v2 2026-04-18 11:50:38 -04:00
Joseph Doherty
860deb8e0d Phase 3 PR 20 — lmx-followups.md: track remaining Galaxy-bridge tasks after PR 19 (HistoryReadAtTime/Events Proxy wiring, write-gating by role, Admin UI cert trust, live-LDAP integration test, full-stack Galaxy smoke, multi-driver test, per-host dashboard). Documents what each item depends on, the shipped surface it builds on, and the minimal to-do so a future session can pick any one off in isolation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:43:15 -04:00
f5e7173de3 Merge pull request 'Phase 3 PR 19 — LDAP user identity + Basic256Sha256 security profile' (#18) from phase-3-pr19-ldap-security into v2 2026-04-18 11:36:18 -04:00
10 changed files with 1168 additions and 0 deletions

View File

@@ -8,6 +8,7 @@
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ZB.MOM.WW.OtOpcUa.Driver.Modbus.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Client.Shared/ZB.MOM.WW.OtOpcUa.Client.Shared.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Client.CLI/ZB.MOM.WW.OtOpcUa.Client.CLI.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Client.UI/ZB.MOM.WW.OtOpcUa.Client.UI.csproj"/>
@@ -22,6 +23,7 @@
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.E2E/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.E2E.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Client.CLI.Tests/ZB.MOM.WW.OtOpcUa.Client.CLI.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Client.UI.Tests/ZB.MOM.WW.OtOpcUa.Client.UI.Tests.csproj"/>

106
docs/v2/lmx-followups.md Normal file
View File

@@ -0,0 +1,106 @@
# LMX Galaxy bridge — remaining follow-ups
State after PR 19: the Galaxy driver is functionally at v1 parity through the
`IDriver` abstraction; the OPC UA server runs with LDAP-authenticated
Basic256Sha256 endpoints and alarms are observable through
`AlarmConditionState.ReportEvent`. The items below are what remains LMX-
specific before the stack can fully replace the v1 deployment, in
rough priority order.
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents`
**Status**: Host-side IPC shipped (PR 10 + PR 11). Proxy consumer not written.
PR 10 added `HistoryReadAtTimeRequest/Response` on the IPC wire and
`MxAccessGalaxyBackend.HistoryReadAtTimeAsync` delegates to
`HistorianDataSource.ReadAtTimeAsync`. PR 11 did the same for events
(`HistoryReadEventsRequest/Response` + `GalaxyHistoricalEvent`). The Proxy
side (`GalaxyProxyDriver`) doesn't call those yet — `Core.Abstractions.IHistoryProvider`
only exposes `ReadRawAsync` + `ReadProcessedAsync`.
**To do**:
- Extend `IHistoryProvider` with `ReadAtTimeAsync(string, DateTime[], …)` and
`ReadEventsAsync(string?, DateTime, DateTime, int, …)`.
- `GalaxyProxyDriver` calls the new IPC message kinds.
- `DriverNodeManager` wires the new capability methods onto `HistoryRead`
`AtTime` + `Events` service handlers.
- Integration test: OPC UA client calls `HistoryReadAtTime` / `HistoryReadEvents`,
value flows through IPC to the Host's `HistorianDataSource`, back to the client.
## 2. Write-gating by role
**Status**: `RoleBasedIdentity.Roles` populated on the session (PR 19) but
`DriverNodeManager.OnWriteValue` doesn't consult it.
CLAUDE.md defines the role set: `ReadOnly` / `WriteOperate` / `WriteTune` /
`WriteConfigure` / `AlarmAck`. Each `DriverAttributeInfo.SecurityClassification`
maps to a required role for writes.
**To do**:
- Add a `RoleRequirements` table: `SecurityClassification` → required role.
- `OnWriteValue` reads `context.UserIdentity` → cast to `RoleBasedIdentity`
→ check role membership before calling `IWritable.WriteAsync`. Return
`BadUserAccessDenied` on miss.
- Unit test against a fake `ISystemContext` with varying role sets.
## 3. Admin UI client-cert trust management
**Status**: Server side auto-accepts untrusted client certs when the
`AutoAcceptUntrustedClientCertificates` option is true (dev default).
Production deployments want operator-controlled trust via the Admin UI.
**To do**:
- Surface the server's rejected-certificate store in the Admin UI.
- Page to move certs between `rejected` / `trusted`.
- Flip `AutoAcceptUntrustedClientCertificates` to false once Admin UI is the
trust gate.
## 4. Live-LDAP integration test
**Status**: PR 19 unit-tested the auth-flow shape; the live bind path is
exercised only by the pre-existing `Admin.Tests/LdapLiveBindTests.cs` which
uses the same Novell library against a running GLAuth at `localhost:3893`.
**To do**:
- Add `OpcUaServerIntegrationTests.Valid_username_authenticates_against_live_ldap`
with the same skip-when-unreachable guard.
- Assert `session.Identity` on the server side carries the expected role
after bind — requires exposing a test hook or reading identity from a
new `IHostConnectivityProbe`-style "whoami" variable in the address space.
## 5. Full Galaxy live-service smoke test against the merged v2 stack
**Status**: Individual pieces have live smoke tests (PR 5 MXAccess, PR 13
probe manager, PR 14 alarm tracker), but the full loop — OPC UA client →
`OtOpcUaServer``GalaxyProxyDriver` (in-process) → named-pipe to
Galaxy.Host subprocess → live MXAccess runtime → real Galaxy objects — has
no single end-to-end smoke test.
**To do**:
- Test that spawns the full topology, discovers a deployed Galaxy object,
subscribes to one of its attributes, writes a value back, and asserts the
write round-tripped through MXAccess. Skip when ArchestrA isn't running.
## 6. Second driver instance on the same server
**Status**: `DriverHost.RegisterAsync` supports multiple drivers; the OPC UA
server creates one `DriverNodeManager` per driver and isolates their
subtrees under distinct namespace URIs. Not proven with two active
`GalaxyProxyDriver` instances pointing at different Galaxies.
**To do**:
- Integration test that registers two driver instances, each with a distinct
`DriverInstanceId` + endpoint in its own session, asserts nodes from both
appear under the correct subtrees, alarm events land on the correct
instance's condition nodes.
## 7. Host-status per-AppEngine granularity → Admin UI dashboard
**Status**: PR 13 ships per-platform/per-AppEngine `ScanState` probing; PR 17
surfaces the resulting `OnHostStatusChanged` events through OPC UA. Admin
UI doesn't render a per-host dashboard yet.
**To do**:
- SignalR hub push of `HostStatusChangedEventArgs` to the Admin UI.
- Dashboard page showing each tracked host, current state, last transition
time, failure count.

View File

@@ -0,0 +1,25 @@
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
/// Abstraction over the Modbus TCP socket. Takes a <c>PDU</c> (function code + data, excluding
/// the 7-byte MBAP header) and returns the response PDU — the transport owns transaction-id
/// pairing, framing, and socket I/O. Tests supply in-memory fakes.
/// </summary>
public interface IModbusTransport : IAsyncDisposable
{
Task ConnectAsync(CancellationToken ct);
/// <summary>
/// Send a Modbus PDU (function code + function-specific data) and read the response PDU.
/// Throws <see cref="ModbusException"/> when the server returns an exception PDU
/// (function code + 0x80 + exception code).
/// </summary>
Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct);
}
public sealed class ModbusException(byte functionCode, byte exceptionCode, string message)
: Exception(message)
{
public byte FunctionCode { get; } = functionCode;
public byte ExceptionCode { get; } = exceptionCode;
}

View File

@@ -0,0 +1,401 @@
using System.Buffers.Binary;
using System.Text.Json;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
/// Modbus TCP implementation of <see cref="IDriver"/> + <see cref="ITagDiscovery"/> +
/// <see cref="IReadable"/> + <see cref="IWritable"/>. First native-protocol greenfield
/// driver for the v2 stack — validates the driver-agnostic <c>IAddressSpaceBuilder</c> +
/// <c>IReadable</c>/<c>IWritable</c> abstractions generalize beyond Galaxy.
/// </summary>
/// <remarks>
/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model;
/// subscriptions would need a polling loop over the declared tags — additive PR). Historian
/// + alarm capabilities are out of scope (the protocol doesn't express them).
/// </remarks>
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
{
// Active polling subscriptions. Each subscription owns a background Task that polls the
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
private long _nextSubscriptionId;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly ModbusDriverOptions _options = options;
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
private IModbusTransport? _transport;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private readonly Dictionary<string, ModbusTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
public string DriverInstanceId => driverInstanceId;
public string DriverType => "Modbus";
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
try
{
_transport = _transportFactory(_options);
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
{
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken);
await InitializeAsync(driverConfigJson, cancellationToken);
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
}
_subscriptions.Clear();
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
_transport = null;
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- ITagDiscovery ----
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var folder = builder.Folder("Modbus", "Modbus");
foreach (var t in _options.Tags)
{
folder.Variable(t.Name, t.Name, new DriverAttributeInfo(
FullName: t.Name,
DriverDataType: MapDataType(t.DataType),
IsArray: false,
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
}
return Task.CompletedTask;
}
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var transport = RequireTransport();
var now = DateTime.UtcNow;
var results = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
if (!_tagsByName.TryGetValue(fullReferences[i], out var tag))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdUnknown, null, now);
continue;
}
try
{
var value = await ReadOneAsync(transport, tag, cancellationToken).ConfigureAwait(false);
results[i] = new DataValueSnapshot(value, 0u, now, now);
_health = new DriverHealth(DriverState.Healthy, now, null);
}
catch (Exception ex)
{
results[i] = new DataValueSnapshot(null, StatusBadInternalError, null, now);
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
return results;
}
private async Task<object> ReadOneAsync(IModbusTransport transport, ModbusTagDefinition tag, CancellationToken ct)
{
switch (tag.Region)
{
case ModbusRegion.Coils:
{
var pdu = new byte[] { 0x01, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return (resp[2] & 0x01) == 1;
}
case ModbusRegion.DiscreteInputs:
{
var pdu = new byte[] { 0x02, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF), 0x00, 0x01 };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return (resp[2] & 0x01) == 1;
}
case ModbusRegion.HoldingRegisters:
case ModbusRegion.InputRegisters:
{
var quantity = RegisterCount(tag.DataType);
var fc = tag.Region == ModbusRegion.HoldingRegisters ? (byte)0x03 : (byte)0x04;
var pdu = new byte[] { fc, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
(byte)(quantity >> 8), (byte)(quantity & 0xFF) };
var resp = await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
// resp = [fc][byte-count][data...]
var data = new ReadOnlySpan<byte>(resp, 2, resp[1]);
return DecodeRegister(data, tag.DataType);
}
default:
throw new InvalidOperationException($"Unknown region {tag.Region}");
}
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
var transport = RequireTransport();
var results = new WriteResult[writes.Count];
for (var i = 0; i < writes.Count; i++)
{
var w = writes[i];
if (!_tagsByName.TryGetValue(w.FullReference, out var tag))
{
results[i] = new WriteResult(StatusBadNodeIdUnknown);
continue;
}
if (!tag.Writable || tag.Region is ModbusRegion.DiscreteInputs or ModbusRegion.InputRegisters)
{
results[i] = new WriteResult(StatusBadNotWritable);
continue;
}
try
{
await WriteOneAsync(transport, tag, w.Value, cancellationToken).ConfigureAwait(false);
results[i] = new WriteResult(0u);
}
catch (Exception)
{
results[i] = new WriteResult(StatusBadInternalError);
}
}
return results;
}
private async Task WriteOneAsync(IModbusTransport transport, ModbusTagDefinition tag, object? value, CancellationToken ct)
{
switch (tag.Region)
{
case ModbusRegion.Coils:
{
var on = Convert.ToBoolean(value);
var pdu = new byte[] { 0x05, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
on ? (byte)0xFF : (byte)0x00, 0x00 };
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
return;
}
case ModbusRegion.HoldingRegisters:
{
var bytes = EncodeRegister(value, tag.DataType);
if (bytes.Length == 2)
{
var pdu = new byte[] { 0x06, (byte)(tag.Address >> 8), (byte)(tag.Address & 0xFF),
bytes[0], bytes[1] };
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
}
else
{
// FC 16 (Write Multiple Registers) for 32-bit types
var qty = (ushort)(bytes.Length / 2);
var pdu = new byte[6 + 1 + bytes.Length];
pdu[0] = 0x10;
pdu[1] = (byte)(tag.Address >> 8); pdu[2] = (byte)(tag.Address & 0xFF);
pdu[3] = (byte)(qty >> 8); pdu[4] = (byte)(qty & 0xFF);
pdu[5] = (byte)bytes.Length;
Buffer.BlockCopy(bytes, 0, pdu, 6, bytes.Length);
await transport.SendAsync(_options.UnitId, pdu, ct).ConfigureAwait(false);
}
return;
}
default:
throw new InvalidOperationException($"Writes not supported for region {tag.Region}");
}
}
// ---- ISubscribable (polling overlay) ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var id = Interlocked.Increment(ref _nextSubscriptionId);
var cts = new CancellationTokenSource();
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
: publishingInterval;
var handle = new ModbusSubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
_subscriptions[id] = state;
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
return Task.FromResult<ISubscriptionHandle>(handle);
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
state.Cts.Cancel();
state.Cts.Dispose();
}
return Task.CompletedTask;
}
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
{
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
// current value per Part 4 convention, even if the value never changes thereafter.
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* first-read error — polling continues */ }
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* transient polling error — loop continues, health surface reflects it */ }
}
}
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
{
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < state.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
}
}
}
private sealed record SubscriptionState(
ModbusSubscriptionHandle Handle,
IReadOnlyList<string> TagReferences,
TimeSpan Interval,
CancellationTokenSource Cts)
{
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
}
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
{
public string DiagnosticId => $"modbus-sub-{Id}";
}
// ---- codec ----
internal static ushort RegisterCount(ModbusDataType t) => t switch
{
ModbusDataType.Int16 or ModbusDataType.UInt16 => 1,
ModbusDataType.Int32 or ModbusDataType.UInt32 or ModbusDataType.Float32 => 2,
_ => throw new InvalidOperationException($"Non-register data type {t}"),
};
internal static object DecodeRegister(ReadOnlySpan<byte> data, ModbusDataType t) => t switch
{
ModbusDataType.Int16 => BinaryPrimitives.ReadInt16BigEndian(data),
ModbusDataType.UInt16 => BinaryPrimitives.ReadUInt16BigEndian(data),
ModbusDataType.Int32 => BinaryPrimitives.ReadInt32BigEndian(data),
ModbusDataType.UInt32 => BinaryPrimitives.ReadUInt32BigEndian(data),
ModbusDataType.Float32 => BinaryPrimitives.ReadSingleBigEndian(data),
_ => throw new InvalidOperationException($"Non-register data type {t}"),
};
internal static byte[] EncodeRegister(object? value, ModbusDataType t)
{
switch (t)
{
case ModbusDataType.Int16:
{
var v = Convert.ToInt16(value);
var b = new byte[2];
BinaryPrimitives.WriteInt16BigEndian(b, v);
return b;
}
case ModbusDataType.UInt16:
{
var v = Convert.ToUInt16(value);
var b = new byte[2];
BinaryPrimitives.WriteUInt16BigEndian(b, v);
return b;
}
case ModbusDataType.Int32:
{
var v = Convert.ToInt32(value);
var b = new byte[4];
BinaryPrimitives.WriteInt32BigEndian(b, v);
return b;
}
case ModbusDataType.UInt32:
{
var v = Convert.ToUInt32(value);
var b = new byte[4];
BinaryPrimitives.WriteUInt32BigEndian(b, v);
return b;
}
case ModbusDataType.Float32:
{
var v = Convert.ToSingle(value);
var b = new byte[4];
BinaryPrimitives.WriteSingleBigEndian(b, v);
return b;
}
default:
throw new InvalidOperationException($"Non-register data type {t}");
}
}
private static DriverDataType MapDataType(ModbusDataType t) => t switch
{
ModbusDataType.Bool => DriverDataType.Boolean,
ModbusDataType.Int16 or ModbusDataType.Int32 => DriverDataType.Int32,
ModbusDataType.UInt16 or ModbusDataType.UInt32 => DriverDataType.Int32,
ModbusDataType.Float32 => DriverDataType.Float32,
_ => DriverDataType.Int32,
};
private IModbusTransport RequireTransport() =>
_transport ?? throw new InvalidOperationException("ModbusDriver not initialized");
private const uint StatusBadInternalError = 0x80020000u;
private const uint StatusBadNodeIdUnknown = 0x80340000u;
private const uint StatusBadNotWritable = 0x803B0000u;
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()
{
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
_transport = null;
}
}

View File

@@ -0,0 +1,39 @@
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
/// Modbus TCP driver configuration. Bound from the driver's <c>DriverConfig</c> JSON at
/// <c>DriverHost.RegisterAsync</c>. Every register the driver exposes appears in
/// <see cref="Tags"/>; names become the OPC UA browse name + full reference.
/// </summary>
public sealed class ModbusDriverOptions
{
public string Host { get; init; } = "127.0.0.1";
public int Port { get; init; } = 502;
public byte UnitId { get; init; } = 1;
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2);
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
}
/// <summary>
/// One Modbus-backed OPC UA variable. Address is zero-based (Modbus spec numbering, not
/// the documentation's 1-based coil/register conventions).
/// </summary>
/// <param name="Name">
/// Tag name, used for both the OPC UA browse name and the driver's full reference. Must be
/// unique within the driver.
/// </param>
/// <param name="Region">Coils / DiscreteInputs / InputRegisters / HoldingRegisters.</param>
/// <param name="Address">Zero-based address within the region.</param>
/// <param name="DataType">Logical data type. Int16/UInt16 = single register; Int32/UInt32/Float32 = two registers big-endian.</param>
/// <param name="Writable">When true and Region supports writes (Coils / HoldingRegisters), IWritable routes writes here.</param>
public sealed record ModbusTagDefinition(
string Name,
ModbusRegion Region,
ushort Address,
ModbusDataType DataType,
bool Writable = true);
public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters }
public enum ModbusDataType { Bool, Int16, UInt16, Int32, UInt32, Float32 }

View File

@@ -0,0 +1,113 @@
using System.Net.Sockets;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
/// Concrete Modbus TCP transport. Wraps a single <see cref="TcpClient"/> and serializes
/// requests so at most one transaction is in-flight at a time — Modbus servers typically
/// support concurrent transactions, but the single-flight model keeps the wire trace
/// easy to diagnose and avoids interleaved-response correlation bugs.
/// </summary>
public sealed class ModbusTcpTransport : IModbusTransport
{
private readonly string _host;
private readonly int _port;
private readonly TimeSpan _timeout;
private readonly SemaphoreSlim _gate = new(1, 1);
private TcpClient? _client;
private NetworkStream? _stream;
private ushort _nextTx;
private bool _disposed;
public ModbusTcpTransport(string host, int port, TimeSpan timeout)
{
_host = host;
_port = port;
_timeout = timeout;
}
public async Task ConnectAsync(CancellationToken ct)
{
_client = new TcpClient();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_timeout);
await _client.ConnectAsync(_host, _port, cts.Token).ConfigureAwait(false);
_stream = _client.GetStream();
}
public async Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
{
if (_disposed) throw new ObjectDisposedException(nameof(ModbusTcpTransport));
if (_stream is null) throw new InvalidOperationException("Transport not connected");
await _gate.WaitAsync(ct).ConfigureAwait(false);
try
{
var txId = ++_nextTx;
// MBAP: [TxId(2)][Proto=0(2)][Length(2)][UnitId(1)] + PDU
var adu = new byte[7 + pdu.Length];
adu[0] = (byte)(txId >> 8);
adu[1] = (byte)(txId & 0xFF);
// protocol id already zero
var len = (ushort)(1 + pdu.Length); // unit id + pdu
adu[4] = (byte)(len >> 8);
adu[5] = (byte)(len & 0xFF);
adu[6] = unitId;
Buffer.BlockCopy(pdu, 0, adu, 7, pdu.Length);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_timeout);
await _stream.WriteAsync(adu.AsMemory(), cts.Token).ConfigureAwait(false);
await _stream.FlushAsync(cts.Token).ConfigureAwait(false);
var header = new byte[7];
await ReadExactlyAsync(_stream, header, cts.Token).ConfigureAwait(false);
var respTxId = (ushort)((header[0] << 8) | header[1]);
if (respTxId != txId)
throw new InvalidDataException($"Modbus TxId mismatch: expected {txId} got {respTxId}");
var respLen = (ushort)((header[4] << 8) | header[5]);
if (respLen < 1) throw new InvalidDataException($"Modbus response length too small: {respLen}");
var respPdu = new byte[respLen - 1];
await ReadExactlyAsync(_stream, respPdu, cts.Token).ConfigureAwait(false);
// Exception PDU: function code has high bit set.
if ((respPdu[0] & 0x80) != 0)
{
var fc = (byte)(respPdu[0] & 0x7F);
var ex = respPdu[1];
throw new ModbusException(fc, ex, $"Modbus exception fc={fc} code={ex}");
}
return respPdu;
}
finally
{
_gate.Release();
}
}
private static async Task ReadExactlyAsync(Stream s, byte[] buf, CancellationToken ct)
{
var read = 0;
while (read < buf.Length)
{
var n = await s.ReadAsync(buf.AsMemory(read), ct).ConfigureAwait(false);
if (n == 0) throw new EndOfStreamException("Modbus socket closed mid-response");
read += n;
}
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
try
{
if (_stream is not null) await _stream.DisposeAsync().ConfigureAwait(false);
}
catch { /* best-effort */ }
_client?.Dispose();
_gate.Dispose();
}
}

View File

@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Driver.Modbus</RootNamespace>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>

View File

@@ -0,0 +1,244 @@
using System.Buffers.Binary;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
[Trait("Category", "Unit")]
public sealed class ModbusDriverTests
{
/// <summary>
/// In-memory Modbus TCP server impl that speaks the function codes the driver uses.
/// Maintains a register/coil bank so Read/Write round-trips work.
/// </summary>
private sealed class FakeTransport : IModbusTransport
{
public readonly ushort[] HoldingRegisters = new ushort[256];
public readonly ushort[] InputRegisters = new ushort[256];
public readonly bool[] Coils = new bool[256];
public readonly bool[] DiscreteInputs = new bool[256];
public bool ForceConnectFail { get; set; }
public Task ConnectAsync(CancellationToken ct)
=> ForceConnectFail ? Task.FromException(new InvalidOperationException("connect refused")) : Task.CompletedTask;
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
{
var fc = pdu[0];
return fc switch
{
0x01 => Task.FromResult(ReadBits(pdu, Coils)),
0x02 => Task.FromResult(ReadBits(pdu, DiscreteInputs)),
0x03 => Task.FromResult(ReadRegs(pdu, HoldingRegisters)),
0x04 => Task.FromResult(ReadRegs(pdu, InputRegisters)),
0x05 => Task.FromResult(WriteCoil(pdu)),
0x06 => Task.FromResult(WriteSingleReg(pdu)),
0x10 => Task.FromResult(WriteMultipleRegs(pdu)),
_ => Task.FromException<byte[]>(new ModbusException(fc, 0x01, $"fc={fc} not supported by fake")),
};
}
private byte[] ReadBits(byte[] pdu, bool[] bank)
{
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
var byteCount = (byte)((qty + 7) / 8);
var resp = new byte[2 + byteCount];
resp[0] = pdu[0];
resp[1] = byteCount;
for (var i = 0; i < qty; i++)
if (bank[addr + i]) resp[2 + (i / 8)] |= (byte)(1 << (i % 8));
return resp;
}
private byte[] ReadRegs(byte[] pdu, ushort[] bank)
{
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
var byteCount = (byte)(qty * 2);
var resp = new byte[2 + byteCount];
resp[0] = pdu[0];
resp[1] = byteCount;
for (var i = 0; i < qty; i++)
{
resp[2 + i * 2] = (byte)(bank[addr + i] >> 8);
resp[3 + i * 2] = (byte)(bank[addr + i] & 0xFF);
}
return resp;
}
private byte[] WriteCoil(byte[] pdu)
{
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
Coils[addr] = pdu[3] == 0xFF;
return pdu; // Modbus echoes the request on write success
}
private byte[] WriteSingleReg(byte[] pdu)
{
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
HoldingRegisters[addr] = (ushort)((pdu[3] << 8) | pdu[4]);
return pdu;
}
private byte[] WriteMultipleRegs(byte[] pdu)
{
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
for (var i = 0; i < qty; i++)
HoldingRegisters[addr + i] = (ushort)((pdu[6 + i * 2] << 8) | pdu[7 + i * 2]);
return new byte[] { 0x10, pdu[1], pdu[2], pdu[3], pdu[4] };
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
private static (ModbusDriver driver, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags)
{
var fake = new FakeTransport();
var opts = new ModbusDriverOptions { Host = "fake", Tags = tags };
var drv = new ModbusDriver(opts, "modbus-1", _ => fake);
return (drv, fake);
}
[Fact]
public async Task Initialize_connects_and_populates_tag_map()
{
var (drv, _) = NewDriver(
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
new ModbusTagDefinition("Run", ModbusRegion.Coils, 0, ModbusDataType.Bool));
await drv.InitializeAsync("{}", CancellationToken.None);
drv.GetHealth().State.ShouldBe(DriverState.Healthy);
}
[Fact]
public async Task Read_Int16_holding_register_returns_BigEndian_value()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 10, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
fake.HoldingRegisters[10] = 12345;
var r = await drv.ReadAsync(["Level"], CancellationToken.None);
r[0].Value.ShouldBe((short)12345);
r[0].StatusCode.ShouldBe(0u);
}
[Fact]
public async Task Read_Float32_spans_two_registers_BigEndian()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 4, ModbusDataType.Float32));
await drv.InitializeAsync("{}", CancellationToken.None);
// IEEE 754 single for 25.5f is 0x41CC0000 — [41 CC][00 00] big-endian across two regs.
var bytes = new byte[4];
BinaryPrimitives.WriteSingleBigEndian(bytes, 25.5f);
fake.HoldingRegisters[4] = (ushort)((bytes[0] << 8) | bytes[1]);
fake.HoldingRegisters[5] = (ushort)((bytes[2] << 8) | bytes[3]);
var r = await drv.ReadAsync(["Temp"], CancellationToken.None);
r[0].Value.ShouldBe(25.5f);
}
[Fact]
public async Task Read_Coil_returns_boolean()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Run", ModbusRegion.Coils, 3, ModbusDataType.Bool));
await drv.InitializeAsync("{}", CancellationToken.None);
fake.Coils[3] = true;
var r = await drv.ReadAsync(["Run"], CancellationToken.None);
r[0].Value.ShouldBe(true);
}
[Fact]
public async Task Unknown_tag_returns_BadNodeIdUnknown_not_an_exception()
{
var (drv, _) = NewDriver();
await drv.InitializeAsync("{}", CancellationToken.None);
var r = await drv.ReadAsync(["DoesNotExist"], CancellationToken.None);
r[0].StatusCode.ShouldBe(0x80340000u);
}
[Fact]
public async Task Write_UInt16_holding_register_roundtrips()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Setpoint", ModbusRegion.HoldingRegisters, 20, ModbusDataType.UInt16));
await drv.InitializeAsync("{}", CancellationToken.None);
var results = await drv.WriteAsync([new WriteRequest("Setpoint", (ushort)42000)], CancellationToken.None);
results[0].StatusCode.ShouldBe(0u);
fake.HoldingRegisters[20].ShouldBe((ushort)42000);
}
[Fact]
public async Task Write_Float32_uses_FC16_WriteMultipleRegisters()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 4, ModbusDataType.Float32));
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.WriteAsync([new WriteRequest("Temp", 25.5f)], CancellationToken.None);
// Decode back through the fake bank to check the two-register shape.
var raw = new byte[4];
raw[0] = (byte)(fake.HoldingRegisters[4] >> 8);
raw[1] = (byte)(fake.HoldingRegisters[4] & 0xFF);
raw[2] = (byte)(fake.HoldingRegisters[5] >> 8);
raw[3] = (byte)(fake.HoldingRegisters[5] & 0xFF);
BinaryPrimitives.ReadSingleBigEndian(raw).ShouldBe(25.5f);
}
[Fact]
public async Task Write_to_InputRegister_returns_BadNotWritable()
{
var (drv, _) = NewDriver(new ModbusTagDefinition("Ro", ModbusRegion.InputRegisters, 0, ModbusDataType.UInt16, Writable: false));
await drv.InitializeAsync("{}", CancellationToken.None);
var r = await drv.WriteAsync([new WriteRequest("Ro", (ushort)7)], CancellationToken.None);
r[0].StatusCode.ShouldBe(0x803B0000u);
}
[Fact]
public async Task Discover_streams_one_folder_per_driver_with_a_variable_per_tag()
{
var (drv, _) = NewDriver(
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 4, ModbusDataType.Float32),
new ModbusTagDefinition("Run", ModbusRegion.Coils, 0, ModbusDataType.Bool));
await drv.InitializeAsync("{}", CancellationToken.None);
var builder = new RecordingBuilder();
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Folders.Count.ShouldBe(1);
builder.Folders[0].BrowseName.ShouldBe("Modbus");
builder.Variables.Count.ShouldBe(3);
builder.Variables.ShouldContain(v => v.BrowseName == "Level" && v.Info.DriverDataType == DriverDataType.Int32);
builder.Variables.ShouldContain(v => v.BrowseName == "Temp" && v.Info.DriverDataType == DriverDataType.Float32);
builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean);
}
// --- helpers ---
private sealed class RecordingBuilder : IAddressSpaceBuilder
{
public List<(string BrowseName, string DisplayName)> Folders { get; } = new();
public List<(string BrowseName, DriverAttributeInfo Info)> Variables { get; } = new();
public IAddressSpaceBuilder Folder(string browseName, string displayName)
{ Folders.Add((browseName, displayName)); return this; }
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo info)
{ Variables.Add((browseName, info)); return new Handle(info.FullName); }
public void AddProperty(string _, DriverDataType __, object? ___) { }
private sealed class Handle(string fullRef) : IVariableHandle
{
public string FullReference => fullRef;
public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info) => new NullSink();
}
private sealed class NullSink : IAlarmConditionSink
{
public void OnTransition(AlarmEventArgs args) { }
}
}
}

View File

@@ -0,0 +1,180 @@
using System.Collections.Concurrent;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
[Trait("Category", "Unit")]
public sealed class ModbusSubscriptionTests
{
/// <summary>
/// Lightweight fake transport the subscription tests drive through — only the FC03
/// (Read Holding Registers) path is used. Mutating <see cref="HoldingRegisters"/>
/// between polls is how each test simulates a PLC value change.
/// </summary>
private sealed class FakeTransport : IModbusTransport
{
public readonly ushort[] HoldingRegisters = new ushort[256];
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
{
if (pdu[0] != 0x03) return Task.FromException<byte[]>(new NotSupportedException("FC not supported"));
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
var resp = new byte[2 + qty * 2];
resp[0] = 0x03;
resp[1] = (byte)(qty * 2);
for (var i = 0; i < qty; i++)
{
resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8);
resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF);
}
return Task.FromResult(resp);
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags)
{
var fake = new FakeTransport();
var opts = new ModbusDriverOptions { Host = "fake", Tags = tags };
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
}
[Fact]
public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag()
{
var (drv, fake) = NewDriver(
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
fake.HoldingRegisters[0] = 100;
fake.HoldingRegisters[1] = 200;
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
events.Select(e => e.FullReference).ShouldContain("Level");
events.Select(e => e.FullReference).ShouldContain("Temp");
await drv.UnsubscribeAsync(handle, CancellationToken.None);
}
[Fact]
public async Task Unchanged_values_do_not_raise_after_initial_poll()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
fake.HoldingRegisters[0] = 100;
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time
await drv.UnsubscribeAsync(handle, CancellationToken.None);
events.Count.ShouldBe(1); // only the initial-data push, no change events after
}
[Fact]
public async Task Value_change_between_polls_raises_OnDataChange()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
fake.HoldingRegisters[0] = 100;
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
fake.HoldingRegisters[0] = 200; // simulate PLC update
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
await drv.UnsubscribeAsync(handle, CancellationToken.None);
events.Count.ShouldBeGreaterThanOrEqualTo(2);
events.Last().Snapshot.Value.ShouldBe((short)200);
}
[Fact]
public async Task Unsubscribe_stops_the_polling_loop()
{
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
await drv.UnsubscribeAsync(handle, CancellationToken.None);
var countAfterUnsub = events.Count;
fake.HoldingRegisters[0] = 999; // would trigger a change if still polling
await Task.Delay(400);
events.Count.ShouldBe(countAfterUnsub);
}
[Fact]
public async Task SubscribeAsync_floors_intervals_below_100ms()
{
var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
// 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a
// 10ms interval would produce many more events than a 100ms interval would on a stable
// value. Since the value is unchanged, we only expect the initial-data push (1 event).
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None);
await Task.Delay(300);
await drv.UnsubscribeAsync(handle, CancellationToken.None);
events.Count.ShouldBe(1);
}
[Fact]
public async Task Multiple_subscriptions_fire_independently()
{
var (drv, fake) = NewDriver(
new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
await drv.InitializeAsync("{}", CancellationToken.None);
var eventsA = new ConcurrentQueue<DataChangeEventArgs>();
var eventsB = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) =>
{
if (e.FullReference == "A") eventsA.Enqueue(e);
else if (e.FullReference == "B") eventsB.Enqueue(e);
};
var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1));
await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1));
await drv.UnsubscribeAsync(ha, CancellationToken.None);
var aCount = eventsA.Count;
fake.HoldingRegisters[1] = 77; // only B should pick this up
await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2));
eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe
eventsB.Count.ShouldBeGreaterThanOrEqualTo(2);
await drv.UnsubscribeAsync(hb, CancellationToken.None);
}
private static async Task WaitForCountAsync<T>(ConcurrentQueue<T> q, int target, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (q.Count < target && DateTime.UtcNow < deadline)
await Task.Delay(25);
}
}

View File

@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="xunit.v3" Version="1.1.0"/>
<PackageReference Include="Shouldly" Version="4.3.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Modbus\ZB.MOM.WW.OtOpcUa.Driver.Modbus.csproj"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>