Files
lmxopcua/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus.IntegrationTests/Docker/exception_injector.py
Joseph Doherty a25593a9c6 chore: organize solution into module folders (Core/Server/Drivers/Client/Tooling)
Group all 69 projects into category subfolders under src/ and tests/ so the
Rider Solution Explorer mirrors the module structure. Folders: Core, Server,
Drivers (with a nested Driver CLIs subfolder), Client, Tooling.

- Move every project folder on disk with git mv (history preserved as renames).
- Recompute relative paths in 57 .csproj files: cross-category ProjectReferences,
  the lib/ HintPath+None refs in Driver.Historian.Wonderware, and the external
  mxaccessgw refs in Driver.Galaxy and its test project.
- Rebuild ZB.MOM.WW.OtOpcUa.slnx with nested solution folders.
- Re-prefix project paths in functional scripts (e2e, compliance, smoke SQL,
  integration, install).

Build green (0 errors); unit tests pass. Docs left for a separate pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 01:55:28 -04:00

262 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Minimal Modbus/TCP server that supports per-address + per-function-code
exception injection — the missing piece of the pymodbus simulator, which
only naturally emits exception code 02 (Illegal Data Address) via its
"invalid" list and 03 (Illegal Data Value) via spec-enforced length caps.
Integration tests against this fixture drive the driver's
`MapModbusExceptionToStatus` end-to-end over the wire for codes 01, 04,
05, 06, 0A, 0B — the ones the pymodbus simulator can't be configured to
return.
Wire protocol — straight Modbus/TCP (spec chapter 7.1):
MBAP header (7 bytes): [tx_id:u16 BE][proto=0:u16][length:u16][unit_id:u8]
then length-1 bytes of PDU. Length covers unit_id + PDU.
Supported function codes (enough for the driver's RMW + read paths):
01 Read Coils, 02 Read Discrete Inputs,
03 Read Holding Registers, 04 Read Input Registers,
05 Write Single Coil, 06 Write Single Register,
15 Write Multiple Coils, 16 Write Multiple Registers.
Config JSON schema (see exception_injection.json):
{
"listen": { "host": "0.0.0.0", "port": 5020 },
"seeds": { "hr": { "<addr>": <uint16>, ... },
"ir": { "<addr>": <uint16>, ... },
"co": { "<addr>": <0|1>, ... },
"di": { "<addr>": <0|1>, ... } },
"rules": [ { "fc": <int>, "address": <int>, "exception": <int>,
"description": "..." }, ... ]
}
Rules match on (fc, starting address). A matching rule wins and the server
responds with the PDU `[fc | 0x80, exception_code]`.
Zero runtime dependencies outside the Python stdlib so the Docker image
stays tiny.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import struct
import sys
from dataclasses import dataclass
log = logging.getLogger("exception_injector")
@dataclass(frozen=True)
class Rule:
fc: int
address: int
exception: int
description: str = ""
class Store:
"""In-memory data store backing non-injected reads + writes."""
def __init__(self, seeds: dict[str, dict[str, int]]) -> None:
self.hr: dict[int, int] = {int(k): int(v) for k, v in seeds.get("hr", {}).items()}
self.ir: dict[int, int] = {int(k): int(v) for k, v in seeds.get("ir", {}).items()}
self.co: dict[int, int] = {int(k): int(v) for k, v in seeds.get("co", {}).items()}
self.di: dict[int, int] = {int(k): int(v) for k, v in seeds.get("di", {}).items()}
def read_bits(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` bits LSB-first into the Modbus bit response body."""
bits = [table.get(addr + i, 0) & 1 for i in range(count)]
out = bytearray((count + 7) // 8)
for i, b in enumerate(bits):
if b:
out[i // 8] |= 1 << (i % 8)
return bytes(out)
def read_regs(self, table: dict[int, int], addr: int, count: int) -> bytes:
"""Pack `count` uint16 BE into the Modbus register response body."""
return b"".join(struct.pack(">H", table.get(addr + i, 0) & 0xFFFF) for i in range(count))
class Server:
EXC_ILLEGAL_FUNCTION = 0x01
EXC_ILLEGAL_DATA_ADDRESS = 0x02
EXC_ILLEGAL_DATA_VALUE = 0x03
def __init__(self, store: Store, rules: list[Rule]) -> None:
self._store = store
# Index rules by (fc, address) for O(1) lookup.
self._rules: dict[tuple[int, int], Rule] = {(r.fc, r.address): r for r in rules}
def lookup_rule(self, fc: int, address: int) -> Rule | None:
return self._rules.get((fc, address))
def exception_pdu(self, fc: int, code: int) -> bytes:
return bytes([fc | 0x80, code & 0xFF])
def handle_pdu(self, pdu: bytes) -> bytes:
if not pdu:
return self.exception_pdu(0, self.EXC_ILLEGAL_FUNCTION)
fc = pdu[0]
# Reads: FC 01/02/03/04 — [fc u8][addr u16][quantity u16]
if fc in (0x01, 0x02, 0x03, 0x04):
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
log.info("inject fc=%d addr=%d -> exception 0x%02X (%s)",
fc, addr, rule.exception, rule.description)
return self.exception_pdu(fc, rule.exception)
# Spec caps — FC01/02 allow 1..2000 bits; FC03/04 allow 1..125 regs.
if fc in (0x01, 0x02):
if not 1 <= count <= 2000:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_bits(
self._store.co if fc == 0x01 else self._store.di, addr, count)
return bytes([fc, len(body)]) + body
if not 1 <= count <= 125:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
body = self._store.read_regs(
self._store.hr if fc == 0x03 else self._store.ir, addr, count)
return bytes([fc, len(body)]) + body
# FC05 — [fc u8][addr u16][value u16] where value is 0xFF00=ON or 0x0000=OFF.
if fc == 0x05:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
if value == 0xFF00:
self._store.co[addr] = 1
elif value == 0x0000:
self._store.co[addr] = 0
else:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
return pdu # FC05 echoes the request on success.
# FC06 — [fc u8][addr u16][value u16].
if fc == 0x06:
if len(pdu) != 5:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, value = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
self._store.hr[addr] = value
return pdu # FC06 echoes on success.
# FC15 — [fc u8][addr u16][count u16][byte_count u8][values...]
if fc == 0x0F:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
# Happy-path ignore-the-data, ack with standard response.
return struct.pack(">BHH", fc, addr, count)
# FC16 — [fc u8][addr u16][count u16][byte_count u8][u16 values...]
if fc == 0x10:
if len(pdu) < 6:
return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE)
addr, count = struct.unpack(">HH", pdu[1:5])
rule = self.lookup_rule(fc, addr)
if rule is not None:
return self.exception_pdu(fc, rule.exception)
byte_count = pdu[5]
data = pdu[6:6 + byte_count]
for i in range(count):
self._store.hr[addr + i] = struct.unpack(">H", data[i * 2:i * 2 + 2])[0]
return struct.pack(">BHH", fc, addr, count)
return self.exception_pdu(fc, self.EXC_ILLEGAL_FUNCTION)
async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
peer = writer.get_extra_info("peername")
log.info("client connected from %s", peer)
try:
while True:
hdr = await reader.readexactly(7)
tx_id, proto, length, unit_id = struct.unpack(">HHHB", hdr)
if length < 1:
return
pdu = await reader.readexactly(length - 1)
resp = self.handle_pdu(pdu)
out = struct.pack(">HHHB", tx_id, proto, len(resp) + 1, unit_id) + resp
writer.write(out)
await writer.drain()
except asyncio.IncompleteReadError:
log.info("client %s disconnected", peer)
except Exception: # pylint: disable=broad-except
log.exception("unexpected error serving %s", peer)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception: # pylint: disable=broad-except
pass
def load_config(path: str) -> tuple[Store, list[Rule], str, int]:
with open(path, "r", encoding="utf-8") as fh:
raw = json.load(fh)
listen = raw.get("listen", {})
host = listen.get("host", "0.0.0.0")
port = int(listen.get("port", 5020))
store = Store(raw.get("seeds", {}))
rules = [
Rule(
fc=int(r["fc"]),
address=int(r["address"]),
exception=int(r["exception"]),
description=str(r.get("description", "")),
)
for r in raw.get("rules", [])
]
return store, rules, host, port
async def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--config", required=True, help="Path to exception-injection JSON config.")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s")
store, rules, host, port = load_config(args.config)
server = Server(store, rules)
listener = await asyncio.start_server(server.handle_connection, host, port)
log.info("exception-injector listening on %s:%d with %d rule(s)", host, port, len(rules))
for r in rules:
log.info(" rule: fc=%d addr=%d -> exception 0x%02X (%s)",
r.fc, r.address, r.exception, r.description)
async with listener:
await listener.serve_forever()
return 0
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main(sys.argv[1:])))
except KeyboardInterrupt:
sys.exit(0)