Modbus exception-injection profile — wire-level coverage for codes 0x01/0x03/0x04/0x05/0x06/0x0A/0x0B #174

Merged
dohertj2 merged 1 commits from modbus-exception-injection-profile into v2 2026-04-20 15:14:02 -04:00
7 changed files with 490 additions and 5 deletions

View File

@@ -34,7 +34,8 @@ shaped (neither is a Modbus-side concept).
- `DL205SmokeTests` — FC16 write → FC03 read round-trip on holding register
- `DL205CoilMappingTests` — Y-output / C-relay / X-input address mapping
(octal → Modbus offset)
- `DL205ExceptionCodeTests` — Modbus exception → OPC UA StatusCode mapping
- `DL205ExceptionCodeTests` — Modbus exception 0x02 → OPC UA `BadOutOfRange` against the dl205 profile (natural out-of-range path)
- `ExceptionInjectionTests` — every other exception code in the mapping table (0x01 / 0x03 / 0x04 / 0x05 / 0x06 / 0x0A / 0x0B) against the `exception_injection` profile on both read + write paths
- `DL205FloatCdabQuirkTests` — CDAB word-swap float encoding
- `DL205StringQuirkTests` — packed-string V-memory layout
- `DL205VMemoryQuirkTests` — V-memory octal addressing
@@ -103,8 +104,13 @@ Not a Modbus concept. Driver doesn't implement `IAlarmSource` or
1. Add `MODBUS_SIM_ENDPOINT` override documentation to
`docs/v2/test-data-sources.md` so operators can point the suite at a lab rig.
2. Extend `pymodbus` profiles to inject exception responses — a JSON flag per
register saying "next read returns exception 0x04."
2. ~~Extend `pymodbus` profiles to inject exception responses~~**shipped**
via the `exception_injection` compose profile + standalone
`exception_injector.py` server. Rules in
`Docker/profiles/exception_injection.json` map `(fc, address)` to an
exception code; `ExceptionInjectionTests` exercises every code in
`MapModbusExceptionToStatus` (0x01 / 0x02 / 0x03 / 0x04 / 0x05 / 0x06 /
0x0A / 0x0B) end-to-end on both read (FC03) and write (FC06) paths.
3. Add an FX5U profile once a lab rig is available; the scaffolding is in place.
## Key fixture / config files

View File

@@ -15,6 +15,12 @@ RUN pip install --no-cache-dir "pymodbus[simulator]==3.13.0"
WORKDIR /fixtures
COPY profiles/ /fixtures/
# Standalone exception-injection server (pure Python stdlib — no pymodbus
# dependency). Speaks raw Modbus/TCP and emits arbitrary exception codes
# per rules in exception_injection.json. Drives the `exception_injection`
# compose profile. See Docker/README.md §exception injection.
COPY exception_injector.py /fixtures/
EXPOSE 5020
# Default to the standard profile; docker-compose.yml overrides per service.

View File

@@ -9,9 +9,10 @@ nothing else.
| File | Purpose |
|---|---|
| [`Dockerfile`](Dockerfile) | `python:3.12-slim-bookworm` + `pymodbus[simulator]==3.13.0` + the four profile JSONs |
| [`docker-compose.yml`](docker-compose.yml) | One service per profile (`standard` / `dl205` / `mitsubishi` / `s7_1500`); all bind `:5020` so only one runs at a time |
| [`Dockerfile`](Dockerfile) | `python:3.12-slim-bookworm` + `pymodbus[simulator]==3.13.0` + every profile JSON + `exception_injector.py` |
| [`docker-compose.yml`](docker-compose.yml) | One service per profile (`standard` / `dl205` / `mitsubishi` / `s7_1500` / `exception_injection`); all bind `:5020` so only one runs at a time |
| [`profiles/*.json`](profiles/) | Same seed-register definitions the native launcher uses — canonical source |
| [`exception_injector.py`](exception_injector.py) | Pure-stdlib Modbus/TCP server that emits arbitrary exception codes per rule — used by the `exception_injection` profile |
## Run
@@ -29,6 +30,10 @@ docker compose -f tests\ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests\Docker\
# Siemens S7-1500 MB_SERVER quirks
docker compose -f tests\ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests\Docker\docker-compose.yml --profile s7_1500 up
# Exception-injection — end-to-end coverage of every Modbus exception code
# (01/02/03/04/05/06/0A/0B), not just the 02 + 03 pymodbus emits naturally
docker compose -f tests\ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests\Docker\docker-compose.yml --profile exception_injection up
```
Detached + stop:
@@ -61,6 +66,36 @@ dotnet test tests\ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests
records a `SkipReason` when unreachable, so tests stay green on a fresh
clone without Docker running.
## Exception injection
pymodbus's simulator naturally emits only Modbus exception codes `0x02`
(Illegal Data Address, on reads outside its configured ranges) and
`0x03` (Illegal Data Value, on over-length requests). The driver's
`MapModbusExceptionToStatus` table translates eight codes: `0x01`,
`0x02`, `0x03`, `0x04`, `0x05`, `0x06`, `0x0A`, `0x0B`. Unit tests
lock the translation function; the integration side previously only
proved the wire-to-status path for `0x02`.
The `exception_injection` profile runs
[`exception_injector.py`](exception_injector.py) — a tiny standalone
Modbus/TCP server written against the Python stdlib (zero
dependencies outside what's in the base image). It speaks the wire
protocol directly (FC 01/02/03/04/05/06/15/16) and looks up each
incoming `(fc, address)` against the rules in
[`profiles/exception_injection.json`](profiles/exception_injection.json);
a matching rule makes the server reply with
`[fc | 0x80, exception_code]` instead of the normal response.
Current rules (see the JSON file for the canonical list):
- `FC03 @1000..1007` — one per exception code (`0x01`/`0x02`/`0x03`/`0x04`/`0x05`/`0x06`/`0x0A`/`0x0B`)
- `FC06 @2000..2001``0x04` Server Failure, `0x06` Server Busy (write-path coverage)
- `FC16 @3000``0x04` Server Failure (multi-register write path)
Adding more coverage is append-only: drop a new `{fc, address,
exception, description}` entry into the JSON, restart the service,
add an `[InlineData]` row in `ExceptionInjectionTests`.
## References
- [`docs/drivers/Modbus-Test-Fixture.md`](../../../docs/drivers/Modbus-Test-Fixture.md) — coverage map + gap inventory

View File

@@ -77,3 +77,24 @@ services:
"--modbus_device", "dev",
"--json_file", "/fixtures/s7_1500.json"
]
# Exception-injection profile. Runs the standalone pure-stdlib Modbus/TCP
# server shipped as exception_injector.py instead of the pymodbus
# simulator — pymodbus naturally emits only exception codes 02 + 03, and
# this profile extends integration coverage to the other codes the
# driver's MapModbusExceptionToStatus table handles (01, 04, 05, 06,
# 0A, 0B). Rules are driven by exception_injection.json.
exception_injection:
profiles: ["exception_injection"]
image: otopcua-pymodbus:3.13.0
build:
context: .
dockerfile: Dockerfile
container_name: otopcua-modbus-exception-injector
restart: "no"
ports:
- "5020:5020"
command: [
"python", "/fixtures/exception_injector.py",
"--config", "/fixtures/exception_injection.json"
]

View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""
Minimal Modbus/TCP server that supports per-address + per-function-code
exception injection — the missing piece of the pymodbus simulator, which
only naturally emits exception code 02 (Illegal Data Address) via its
"invalid" list and 03 (Illegal Data Value) via spec-enforced length caps.
Integration tests against this fixture drive the driver's
`MapModbusExceptionToStatus` end-to-end over the wire for codes 01, 04,
05, 06, 0A, 0B — the ones the pymodbus simulator can't be configured to
return.
Wire protocol — straight Modbus/TCP (spec chapter 7.1):
MBAP header (7 bytes): [tx_id:u16 BE][proto=0:u16][length:u16][unit_id:u8]
then length-1 bytes of PDU. Length covers unit_id + PDU.
Supported function codes (enough for the driver's RMW + read paths):
01 Read Coils, 02 Read Discrete Inputs,
03 Read Holding Registers, 04 Read Input Registers,
05 Write Single Coil, 06 Write Single Register,
15 Write Multiple Coils, 16 Write Multiple Registers.
Config JSON schema (see exception_injection.json):
{
"listen": { "host": "0.0.0.0", "port": 5020 },
"seeds": { "hr": { "<addr>": <uint16>, ... },
"ir": { "<addr>": <uint16>, ... },
"co": { "<addr>": <0|1>, ... },
"di": { "<addr>": <0|1>, ... } },
"rules": [ { "fc": <int>, "address": <int>, "exception": <int>,
"description": "..." }, ... ]
}
Rules match on (fc, starting address). A matching rule wins and the server
responds with the PDU `[fc | 0x80, exception_code]`.
Zero runtime dependencies outside the Python stdlib so the Docker image
stays tiny.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import struct
import sys
from dataclasses import dataclass
log = logging.getLogger("exception_injector")
@dataclass(frozen=True)
class Rule:
fc: int
address: int
exception: int
description: str = ""
class Store:
"""In-memory data store backing non-injected reads + writes."""
def __init__(self, seeds: dict[str, dict[str, int]]) -> None:
self.hr: dict[int, int] = {int(k): int(v) for k, v in seeds.get("hr", {}).items()}
self.ir: dict[int, int] = {int(k): int(v) for k, v in seeds.get("ir", {}).items()}
self.co: dict[int, int] = {int(k): int(v) for k, v in seeds.get("co", {}).items()}
self.di: dict[int, int] = {int(k): int(v) for k, v in seeds.get("di", {}).items()}
def read_bits(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` bits LSB-first into the Modbus bit response body."""
bits = [table.get(addr + i, 0) & 1 for i in range(count)]
out = bytearray((count + 7) // 8)
for i, b in enumerate(bits):
if b:
out[i // 8] |= 1 << (i % 8)
return bytes(out)
def read_regs(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` uint16 BE into the Modbus register response body."""
return b"".join(struct.pack(">H", table.get(addr + i, 0) & 0xFFFF) for i in range(count))
class Server:
EXC_ILLEGAL_FUNCTION = 0x01
EXC_ILLEGAL_DATA_ADDRESS = 0x02
EXC_ILLEGAL_DATA_VALUE = 0x03
def __init__(self, store: Store, rules: list[Rule]) -> None:
self._store = store
# Index rules by (fc, address) for O(1) lookup.
self._rules: dict[tuple[int, int], Rule] = {(r.fc, r.address): r for r in rules}
def lookup_rule(self, fc: int, address: int) -> Rule | None:
return self._rules.get((fc, address))
def exception_pdu(self, fc: int, code: int) -> bytes:
return bytes([fc | 0x80, code & 0xFF])
def handle_pdu(self, pdu: bytes) -> bytes:
if not pdu:
return self.exception_pdu(0, self.EXC_ILLEGAL_FUNCTION)
fc = pdu[0]
# Reads: FC 01/02/03/04 — [fc u8][addr u16][quantity u16]
if fc in (0x01, 0x02, 0x03, 0x04):
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
log.info("inject fc=%d addr=%d -> exception 0x%02X (%s)",
fc, addr, rule.exception, rule.description)
return self.exception_pdu(fc, rule.exception)
# Spec caps — FC01/02 allow 1..2000 bits; FC03/04 allow 1..125 regs.
if fc in (0x01, 0x02):
if not 1 <= count <= 2000:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_bits(
self._store.co if fc == 0x01 else self._store.di, addr, count)
return bytes([fc, len(body)]) + body
if not 1 <= count <= 125:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_regs(
self._store.hr if fc == 0x03 else self._store.ir, addr, count)
return bytes([fc, len(body)]) + body
# FC05 — [fc u8][addr u16][value u16] where value is 0xFF00=ON or 0x0000=OFF.
if fc == 0x05:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
if value == 0xFF00:
self._store.co[addr] = 1
elif value == 0x0000:
self._store.co[addr] = 0
else:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
return pdu # FC05 echoes the request on success.
# FC06 — [fc u8][addr u16][value u16].
if fc == 0x06:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
self._store.hr[addr] = value
return pdu # FC06 echoes on success.
# FC15 — [fc u8][addr u16][count u16][byte_count u8][values...]
if fc == 0x0F:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
# Happy-path ignore-the-data, ack with standard response.
return struct.pack(">BHH", fc, addr, count)
# FC16 — [fc u8][addr u16][count u16][byte_count u8][u16 values...]
if fc == 0x10:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
byte_count = pdu[5]
data = pdu[6:6 + byte_count]
for i in range(count):
self._store.hr[addr + i] = struct.unpack(">H", data[i * 2:i * 2 + 2])[0]
return struct.pack(">BHH", fc, addr, count)
return self.exception_pdu(fc, self.EXC_ILLEGAL_FUNCTION)
async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
peer = writer.get_extra_info("peername")
log.info("client connected from %s", peer)
try:
while True:
hdr = await reader.readexactly(7)
tx_id, proto, length, unit_id = struct.unpack(">HHHB", hdr)
if length < 1:
return
pdu = await reader.readexactly(length - 1)
resp = self.handle_pdu(pdu)
out = struct.pack(">HHHB", tx_id, proto, len(resp) + 1, unit_id) + resp
writer.write(out)
await writer.drain()
except asyncio.IncompleteReadError:
log.info("client %s disconnected", peer)
except Exception: # pylint: disable=broad-except
log.exception("unexpected error serving %s", peer)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception: # pylint: disable=broad-except
pass
def load_config(path: str) -> tuple[Store, list[Rule], str, int]:
with open(path, "r", encoding="utf-8") as fh:
raw = json.load(fh)
listen = raw.get("listen", {})
host = listen.get("host", "0.0.0.0")
port = int(listen.get("port", 5020))
store = Store(raw.get("seeds", {}))
rules = [
Rule(
fc=int(r["fc"]),
address=int(r["address"]),
exception=int(r["exception"]),
description=str(r.get("description", "")),
)
for r in raw.get("rules", [])
]
return store, rules, host, port
async def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--config", required=True, help="Path to exception-injection JSON config.")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s")
store, rules, host, port = load_config(args.config)
server = Server(store, rules)
listener = await asyncio.start_server(server.handle_connection, host, port)
log.info("exception-injector listening on %s:%d with %d rule(s)", host, port, len(rules))
for r in rules:
log.info(" rule: fc=%d addr=%d -> exception 0x%02X (%s)",
r.fc, r.address, r.exception, r.description)
async with listener:
await listener.serve_forever()
return 0
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main(sys.argv[1:])))
except KeyboardInterrupt:
sys.exit(0)

View File

@@ -0,0 +1,34 @@
{
"_comment": "Modbus exception-injection profile — feeds exception_injector.py (not pymodbus). Rules match by (fc, address). HR[0-31] are address-as-value for the happy-path reads; HR[1000..1010] + coils[2000..2010] carry per-exception-code rules. Every code in the driver's MapModbusExceptionToStatus table that pymodbus can't naturally emit has a dedicated slot. See Docker/README.md §exception injection.",
"listen": { "host": "0.0.0.0", "port": 5020 },
"seeds": {
"hr": {
"0": 0, "1": 1, "2": 2, "3": 3,
"4": 4, "5": 5, "6": 6, "7": 7,
"8": 8, "9": 9, "10": 10, "11": 11,
"12": 12, "13": 13, "14": 14, "15": 15,
"16": 16, "17": 17, "18": 18, "19": 19,
"20": 20, "21": 21, "22": 22, "23": 23,
"24": 24, "25": 25, "26": 26, "27": 27,
"28": 28, "29": 29, "30": 30, "31": 31
}
},
"rules": [
{ "fc": 3, "address": 1000, "exception": 1, "description": "FC03 @1000 -> Illegal Function (0x01)" },
{ "fc": 3, "address": 1001, "exception": 2, "description": "FC03 @1001 -> Illegal Data Address (0x02)" },
{ "fc": 3, "address": 1002, "exception": 3, "description": "FC03 @1002 -> Illegal Data Value (0x03)" },
{ "fc": 3, "address": 1003, "exception": 4, "description": "FC03 @1003 -> Server Failure (0x04)" },
{ "fc": 3, "address": 1004, "exception": 5, "description": "FC03 @1004 -> Acknowledge (0x05)" },
{ "fc": 3, "address": 1005, "exception": 6, "description": "FC03 @1005 -> Server Busy (0x06)" },
{ "fc": 3, "address": 1006, "exception": 10, "description": "FC03 @1006 -> Gateway Path Unavailable (0x0A)" },
{ "fc": 3, "address": 1007, "exception": 11, "description": "FC03 @1007 -> Gateway Target No Response (0x0B)" },
{ "fc": 6, "address": 2000, "exception": 4, "description": "FC06 @2000 -> Server Failure (0x04, e.g. CPU in PROGRAM mode)" },
{ "fc": 6, "address": 2001, "exception": 6, "description": "FC06 @2001 -> Server Busy (0x06)" },
{ "fc": 16, "address": 3000, "exception": 4, "description": "FC16 @3000 -> Server Failure (0x04)" }
]
}

View File

@@ -0,0 +1,122 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests;
/// <summary>
/// End-to-end verification that the driver's <c>MapModbusExceptionToStatus</c>
/// translation is wire-correct for every exception code in the mapping table —
/// not just 0x02, which is the only code the pymodbus simulator naturally emits.
/// Drives the standalone <c>exception_injector.py</c> server (<c>exception_injection</c>
/// compose profile) at each of the rule addresses in
/// <c>Docker/profiles/exception_injection.json</c> and asserts the driver surfaces
/// the expected OPC UA StatusCode.
/// </summary>
/// <remarks>
/// Why integration coverage on top of the unit tests: the unit tests prove the
/// translation function is correct; these prove the driver wires it through on
/// the read + write paths unchanged, after the MBAP header + PDU round-trip
/// (where a subtle framing bug could swallow or misclassify the exception).
/// </remarks>
[Collection(ModbusSimulatorCollection.Name)]
[Trait("Category", "Integration")]
[Trait("Device", "ExceptionInjection")]
public sealed class ExceptionInjectionTests(ModbusSimulatorFixture sim)
{
private const uint StatusGood = 0u;
private const uint StatusBadOutOfRange = 0x803C0000u;
private const uint StatusBadNotSupported = 0x803D0000u;
private const uint StatusBadDeviceFailure = 0x80550000u;
private const uint StatusBadCommunicationError = 0x80050000u;
private void SkipUnlessInjectorLive()
{
if (sim.SkipReason is not null) Assert.Skip(sim.SkipReason);
var profile = Environment.GetEnvironmentVariable("MODBUS_SIM_PROFILE");
if (!string.Equals(profile, "exception_injection", StringComparison.OrdinalIgnoreCase))
Assert.Skip("MODBUS_SIM_PROFILE != exception_injection — skipping. " +
"Start the fixture with --profile exception_injection.");
}
private async Task<IReadOnlyList<DataValueSnapshot>> ReadSingleAsync(int address, string tagName)
{
var opts = new ModbusDriverOptions
{
Host = sim.Host,
Port = sim.Port,
UnitId = 1,
Timeout = TimeSpan.FromSeconds(2),
Tags =
[
new ModbusTagDefinition(tagName,
ModbusRegion.HoldingRegisters, Address: (ushort)address,
DataType: ModbusDataType.UInt16, Writable: false),
],
Probe = new ModbusProbeOptions { Enabled = false },
};
await using var driver = new ModbusDriver(opts, driverInstanceId: "modbus-exc");
await driver.InitializeAsync("{}", TestContext.Current.CancellationToken);
return await driver.ReadAsync([tagName], TestContext.Current.CancellationToken);
}
[Theory]
[InlineData(1000, StatusBadNotSupported, "exc 0x01 (Illegal Function) -> BadNotSupported")]
[InlineData(1001, StatusBadOutOfRange, "exc 0x02 (Illegal Data Address) -> BadOutOfRange")]
[InlineData(1002, StatusBadOutOfRange, "exc 0x03 (Illegal Data Value) -> BadOutOfRange")]
[InlineData(1003, StatusBadDeviceFailure, "exc 0x04 (Server Failure) -> BadDeviceFailure")]
[InlineData(1004, StatusBadDeviceFailure, "exc 0x05 (Acknowledge / long op) -> BadDeviceFailure")]
[InlineData(1005, StatusBadDeviceFailure, "exc 0x06 (Server Busy) -> BadDeviceFailure")]
[InlineData(1006, StatusBadCommunicationError, "exc 0x0A (Gateway Path Unavailable) -> BadCommunicationError")]
[InlineData(1007, StatusBadCommunicationError, "exc 0x0B (Gateway Target No Response) -> BadCommunicationError")]
public async Task FC03_read_at_injection_address_surfaces_expected_status(
int address, uint expectedStatus, string scenario)
{
SkipUnlessInjectorLive();
var results = await ReadSingleAsync(address, $"Injected_{address}");
results[0].StatusCode.ShouldBe(expectedStatus, scenario);
}
[Fact]
public async Task FC03_read_at_non_injected_address_returns_Good()
{
// Sanity: HR[0..31] are seeded with address-as-value in the profile. A read at
// one of those addresses must come back Good (0) — otherwise the injector is
// misbehaving and every other assertion in this class is uninformative.
SkipUnlessInjectorLive();
var results = await ReadSingleAsync(address: 5, tagName: "Healthy_5");
results[0].StatusCode.ShouldBe(StatusGood);
results[0].Value.ShouldBe((ushort)5);
}
[Theory]
[InlineData(2000, StatusBadDeviceFailure, "exc 0x04 on FC06 -> BadDeviceFailure (CPU in PROGRAM mode)")]
[InlineData(2001, StatusBadDeviceFailure, "exc 0x06 on FC06 -> BadDeviceFailure (Server Busy)")]
public async Task FC06_write_at_injection_address_surfaces_expected_status(
int address, uint expectedStatus, string scenario)
{
SkipUnlessInjectorLive();
var tag = $"InjectedWrite_{address}";
var opts = new ModbusDriverOptions
{
Host = sim.Host,
Port = sim.Port,
UnitId = 1,
Timeout = TimeSpan.FromSeconds(2),
Tags =
[
new ModbusTagDefinition(tag,
ModbusRegion.HoldingRegisters, Address: (ushort)address,
DataType: ModbusDataType.UInt16, Writable: true),
],
Probe = new ModbusProbeOptions { Enabled = false },
};
await using var driver = new ModbusDriver(opts, driverInstanceId: "modbus-exc-write");
await driver.InitializeAsync("{}", TestContext.Current.CancellationToken);
var writes = await driver.WriteAsync(
[new WriteRequest(tag, (ushort)42)],
TestContext.Current.CancellationToken);
writes[0].StatusCode.ShouldBe(expectedStatus, scenario);
}
}