Files
lmxopcua/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests/Docker/exception_injector.py
Joseph Doherty 96940aeb24 Modbus exception-injection profile — closes the end-to-end test gap for exception codes 0x01/0x03/0x04/0x05/0x06/0x0A/0x0B. pymodbus simulator naturally emits only 0x02 (Illegal Data Address on reads outside configured ranges) + 0x03 (Illegal Data Value on over-length); the driver's MapModbusExceptionToStatus table translates eight codes, but only 0x02 had integration-level coverage (via DL205's unmapped-register test). Unit tests lock the translation function in isolation but an integration test was missing for everything else. This PR lands wire-level coverage for the remaining seven codes without depending on device-specific quirks to naturally produce them.
New exception_injector.py — standalone pure-Python-stdlib Modbus/TCP server shipped alongside the pymodbus image. Speaks the wire protocol directly (MBAP header parse + FC 01/02/03/04/05/06/15/16 dispatch + store-backed happy-path reads/writes + spec-enforced length caps) and looks up each (fc, starting-address) against a rules list loaded from JSON; a matching rule makes the server respond [fc|0x80, exception_code] instead of the normal response. Zero runtime dependencies outside the stdlib — the Dockerfile just COPY's the script into /fixtures/ alongside the pymodbus profile JSONs, no new pip install needed. ~200 lines. New exception_injection.json profile carries rules for every exception code on FC03 (addresses 1000-1007, one per code), FC06 (2000-2001 for CPU-PROGRAM-mode and busy), and FC16 (3000 for server failure). New exception_injection compose profile binds :5020 like every other service + runs python /fixtures/exception_injector.py --config /fixtures/exception_injection.json.

New ExceptionInjectionTests.cs in Modbus.IntegrationTests — 11 tests. Eight FC03-read theories exercise every exception code 0x01/0x02/0x03/0x04/0x05/0x06/0x0A/0x0B asserting the driver's expected OPC UA StatusCode mapping (BadNotSupported/BadOutOfRange/BadOutOfRange/BadDeviceFailure/BadDeviceFailure/BadDeviceFailure/BadCommunicationError/BadCommunicationError). Two FC06-write theories cover the write path for 0x04 (Server Failure, CPU in PROGRAM mode) + 0x06 (Server Busy). One sanity-check read at address 5 confirms the injector isn't globally broken + non-injected reads round-trip cleanly with Value=5/StatusCode=Good. All tests follow the MODBUS_SIM_PROFILE=exception_injection skip guard so they no-op on a fresh clone without Docker running.

Docker/README.md gains an §Exception injection section explaining what pymodbus can and cannot emit, what the injector does, where the rules live, and how to append new ones. docs/drivers/Modbus-Test-Fixture.md follow-up item #2 (extend pymodbus profiles to inject exceptions) gets a shipped strikethrough with the new coverage inventory; the unit-level section adds ExceptionInjectionTests next to DL205ExceptionCodeTests so the split-of-responsibilities is explicit (DL205 test = natural out-of-range via dl205 profile, ExceptionInjectionTests = every other code via the injector).

Test baselines: Modbus unit 182/182 green (unchanged); Modbus integration with exception_injection profile live 11/11 new tests green. Existing DL205/S7/Mitsubishi integration tests unaffected since they skip on MODBUS_SIM_PROFILE mismatch.

Found + fixed during validation: a stale native pymodbus simulator from April 18 was still listening on port 5020 on IPv6 localhost (Windows was load-balancing between it + the Docker IPv4 forward, making injected exceptions intermittently come back as pymodbus's default 0x02). Killed the leftover. Documented the debugging path in the commit as a note for anyone who hits the same "my tests see exception 0x02 but the injector log has no request" symptom.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 15:11:32 -04:00

262 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Minimal Modbus/TCP server that supports per-address + per-function-code
exception injection — the missing piece of the pymodbus simulator, which
only naturally emits exception code 02 (Illegal Data Address) via its
"invalid" list and 03 (Illegal Data Value) via spec-enforced length caps.
Integration tests against this fixture drive the driver's
`MapModbusExceptionToStatus` end-to-end over the wire for codes 01, 04,
05, 06, 0A, 0B — the ones the pymodbus simulator can't be configured to
return.
Wire protocol — straight Modbus/TCP (spec chapter 7.1):
MBAP header (7 bytes): [tx_id:u16 BE][proto=0:u16][length:u16][unit_id:u8]
then length-1 bytes of PDU. Length covers unit_id + PDU.
Supported function codes (enough for the driver's RMW + read paths):
01 Read Coils, 02 Read Discrete Inputs,
03 Read Holding Registers, 04 Read Input Registers,
05 Write Single Coil, 06 Write Single Register,
15 Write Multiple Coils, 16 Write Multiple Registers.
Config JSON schema (see exception_injection.json):
{
"listen": { "host": "0.0.0.0", "port": 5020 },
"seeds": { "hr": { "<addr>": <uint16>, ... },
"ir": { "<addr>": <uint16>, ... },
"co": { "<addr>": <0|1>, ... },
"di": { "<addr>": <0|1>, ... } },
"rules": [ { "fc": <int>, "address": <int>, "exception": <int>,
"description": "..." }, ... ]
}
Rules match on (fc, starting address). A matching rule wins and the server
responds with the PDU `[fc | 0x80, exception_code]`.
Zero runtime dependencies outside the Python stdlib so the Docker image
stays tiny.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import struct
import sys
from dataclasses import dataclass
log = logging.getLogger("exception_injector")
@dataclass(frozen=True)
class Rule:
fc: int
address: int
exception: int
description: str = ""
class Store:
"""In-memory data store backing non-injected reads + writes."""
def __init__(self, seeds: dict[str, dict[str, int]]) -> None:
self.hr: dict[int, int] = {int(k): int(v) for k, v in seeds.get("hr", {}).items()}
self.ir: dict[int, int] = {int(k): int(v) for k, v in seeds.get("ir", {}).items()}
self.co: dict[int, int] = {int(k): int(v) for k, v in seeds.get("co", {}).items()}
self.di: dict[int, int] = {int(k): int(v) for k, v in seeds.get("di", {}).items()}
def read_bits(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` bits LSB-first into the Modbus bit response body."""
bits = [table.get(addr + i, 0) & 1 for i in range(count)]
out = bytearray((count + 7) // 8)
for i, b in enumerate(bits):
if b:
out[i // 8] |= 1 << (i % 8)
return bytes(out)
def read_regs(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` uint16 BE into the Modbus register response body."""
return b"".join(struct.pack(">H", table.get(addr + i, 0) & 0xFFFF) for i in range(count))
class Server:
EXC_ILLEGAL_FUNCTION = 0x01
EXC_ILLEGAL_DATA_ADDRESS = 0x02
EXC_ILLEGAL_DATA_VALUE = 0x03
def __init__(self, store: Store, rules: list[Rule]) -> None:
self._store = store
# Index rules by (fc, address) for O(1) lookup.
self._rules: dict[tuple[int, int], Rule] = {(r.fc, r.address): r for r in rules}
def lookup_rule(self, fc: int, address: int) -> Rule | None:
return self._rules.get((fc, address))
def exception_pdu(self, fc: int, code: int) -> bytes:
return bytes([fc | 0x80, code & 0xFF])
def handle_pdu(self, pdu: bytes) -> bytes:
if not pdu:
return self.exception_pdu(0, self.EXC_ILLEGAL_FUNCTION)
fc = pdu[0]
# Reads: FC 01/02/03/04 — [fc u8][addr u16][quantity u16]
if fc in (0x01, 0x02, 0x03, 0x04):
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
log.info("inject fc=%d addr=%d -> exception 0x%02X (%s)",
fc, addr, rule.exception, rule.description)
return self.exception_pdu(fc, rule.exception)
# Spec caps — FC01/02 allow 1..2000 bits; FC03/04 allow 1..125 regs.
if fc in (0x01, 0x02):
if not 1 <= count <= 2000:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_bits(
self._store.co if fc == 0x01 else self._store.di, addr, count)
return bytes([fc, len(body)]) + body
if not 1 <= count <= 125:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_regs(
self._store.hr if fc == 0x03 else self._store.ir, addr, count)
return bytes([fc, len(body)]) + body
# FC05 — [fc u8][addr u16][value u16] where value is 0xFF00=ON or 0x0000=OFF.
if fc == 0x05:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
if value == 0xFF00:
self._store.co[addr] = 1
elif value == 0x0000:
self._store.co[addr] = 0
else:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
return pdu # FC05 echoes the request on success.
# FC06 — [fc u8][addr u16][value u16].
if fc == 0x06:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
self._store.hr[addr] = value
return pdu # FC06 echoes on success.
# FC15 — [fc u8][addr u16][count u16][byte_count u8][values...]
if fc == 0x0F:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
# Happy-path ignore-the-data, ack with standard response.
return struct.pack(">BHH", fc, addr, count)
# FC16 — [fc u8][addr u16][count u16][byte_count u8][u16 values...]
if fc == 0x10:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
byte_count = pdu[5]
data = pdu[6:6 + byte_count]
for i in range(count):
self._store.hr[addr + i] = struct.unpack(">H", data[i * 2:i * 2 + 2])[0]
return struct.pack(">BHH", fc, addr, count)
return self.exception_pdu(fc, self.EXC_ILLEGAL_FUNCTION)
async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
peer = writer.get_extra_info("peername")
log.info("client connected from %s", peer)
try:
while True:
hdr = await reader.readexactly(7)
tx_id, proto, length, unit_id = struct.unpack(">HHHB", hdr)
if length < 1:
return
pdu = await reader.readexactly(length - 1)
resp = self.handle_pdu(pdu)
out = struct.pack(">HHHB", tx_id, proto, len(resp) + 1, unit_id) + resp
writer.write(out)
await writer.drain()
except asyncio.IncompleteReadError:
log.info("client %s disconnected", peer)
except Exception: # pylint: disable=broad-except
log.exception("unexpected error serving %s", peer)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception: # pylint: disable=broad-except
pass
def load_config(path: str) -> tuple[Store, list[Rule], str, int]:
with open(path, "r", encoding="utf-8") as fh:
raw = json.load(fh)
listen = raw.get("listen", {})
host = listen.get("host", "0.0.0.0")
port = int(listen.get("port", 5020))
store = Store(raw.get("seeds", {}))
rules = [
Rule(
fc=int(r["fc"]),
address=int(r["address"]),
exception=int(r["exception"]),
description=str(r.get("description", "")),
)
for r in raw.get("rules", [])
]
return store, rules, host, port
async def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--config", required=True, help="Path to exception-injection JSON config.")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s")
store, rules, host, port = load_config(args.config)
server = Server(store, rules)
listener = await asyncio.start_server(server.handle_connection, host, port)
log.info("exception-injector listening on %s:%d with %d rule(s)", host, port, len(rules))
for r in rules:
log.info(" rule: fc=%d addr=%d -> exception 0x%02X (%s)",
r.fc, r.address, r.exception, r.description)
async with listener:
await listener.serve_forever()
return 0
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main(sys.argv[1:])))
except KeyboardInterrupt:
sys.exit(0)