#!/usr/bin/env python3 """ Minimal Modbus/TCP server that supports per-address + per-function-code exception injection — the missing piece of the pymodbus simulator, which only naturally emits exception code 02 (Illegal Data Address) via its "invalid" list and 03 (Illegal Data Value) via spec-enforced length caps. Integration tests against this fixture drive the driver's `MapModbusExceptionToStatus` end-to-end over the wire for codes 01, 04, 05, 06, 0A, 0B — the ones the pymodbus simulator can't be configured to return. Wire protocol — straight Modbus/TCP (spec chapter 7.1): MBAP header (7 bytes): [tx_id:u16 BE][proto=0:u16][length:u16][unit_id:u8] then length-1 bytes of PDU. Length covers unit_id + PDU. Supported function codes (enough for the driver's RMW + read paths): 01 Read Coils, 02 Read Discrete Inputs, 03 Read Holding Registers, 04 Read Input Registers, 05 Write Single Coil, 06 Write Single Register, 15 Write Multiple Coils, 16 Write Multiple Registers. Config JSON schema (see exception_injection.json): { "listen": { "host": "0.0.0.0", "port": 5020 }, "seeds": { "hr": { "": , ... }, "ir": { "": , ... }, "co": { "": <0|1>, ... }, "di": { "": <0|1>, ... } }, "rules": [ { "fc": , "address": , "exception": , "description": "..." }, ... ] } Rules match on (fc, starting address). A matching rule wins and the server responds with the PDU `[fc | 0x80, exception_code]`. Zero runtime dependencies outside the Python stdlib so the Docker image stays tiny. """ from __future__ import annotations import argparse import asyncio import json import logging import struct import sys from dataclasses import dataclass log = logging.getLogger("exception_injector") @dataclass(frozen=True) class Rule: fc: int address: int exception: int description: str = "" class Store: """In-memory data store backing non-injected reads + writes.""" def __init__(self, seeds: dict[str, dict[str, int]]) -> None: self.hr: dict[int, int] = {int(k): int(v) for k, v in seeds.get("hr", {}).items()} self.ir: dict[int, int] = {int(k): int(v) for k, v in seeds.get("ir", {}).items()} self.co: dict[int, int] = {int(k): int(v) for k, v in seeds.get("co", {}).items()} self.di: dict[int, int] = {int(k): int(v) for k, v in seeds.get("di", {}).items()} def read_bits(self, table: dict[int, int], addr: int, count: int) -> bytes: """Pack `count` bits LSB-first into the Modbus bit response body.""" bits = [table.get(addr + i, 0) & 1 for i in range(count)] out = bytearray((count + 7) // 8) for i, b in enumerate(bits): if b: out[i // 8] |= 1 << (i % 8) return bytes(out) def read_regs(self, table: dict[int, int], addr: int, count: int) -> bytes: """Pack `count` uint16 BE into the Modbus register response body.""" return b"".join(struct.pack(">H", table.get(addr + i, 0) & 0xFFFF) for i in range(count)) class Server: EXC_ILLEGAL_FUNCTION = 0x01 EXC_ILLEGAL_DATA_ADDRESS = 0x02 EXC_ILLEGAL_DATA_VALUE = 0x03 def __init__(self, store: Store, rules: list[Rule]) -> None: self._store = store # Index rules by (fc, address) for O(1) lookup. self._rules: dict[tuple[int, int], Rule] = {(r.fc, r.address): r for r in rules} def lookup_rule(self, fc: int, address: int) -> Rule | None: return self._rules.get((fc, address)) def exception_pdu(self, fc: int, code: int) -> bytes: return bytes([fc | 0x80, code & 0xFF]) def handle_pdu(self, pdu: bytes) -> bytes: if not pdu: return self.exception_pdu(0, self.EXC_ILLEGAL_FUNCTION) fc = pdu[0] # Reads: FC 01/02/03/04 — [fc u8][addr u16][quantity u16] if fc in (0x01, 0x02, 0x03, 0x04): if len(pdu) != 5: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) addr, count = struct.unpack(">HH", pdu[1:5]) rule = self.lookup_rule(fc, addr) if rule is not None: log.info("inject fc=%d addr=%d -> exception 0x%02X (%s)", fc, addr, rule.exception, rule.description) return self.exception_pdu(fc, rule.exception) # Spec caps — FC01/02 allow 1..2000 bits; FC03/04 allow 1..125 regs. if fc in (0x01, 0x02): if not 1 <= count <= 2000: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) body = self._store.read_bits( self._store.co if fc == 0x01 else self._store.di, addr, count) return bytes([fc, len(body)]) + body if not 1 <= count <= 125: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) body = self._store.read_regs( self._store.hr if fc == 0x03 else self._store.ir, addr, count) return bytes([fc, len(body)]) + body # FC05 — [fc u8][addr u16][value u16] where value is 0xFF00=ON or 0x0000=OFF. if fc == 0x05: if len(pdu) != 5: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) addr, value = struct.unpack(">HH", pdu[1:5]) rule = self.lookup_rule(fc, addr) if rule is not None: return self.exception_pdu(fc, rule.exception) if value == 0xFF00: self._store.co[addr] = 1 elif value == 0x0000: self._store.co[addr] = 0 else: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) return pdu # FC05 echoes the request on success. # FC06 — [fc u8][addr u16][value u16]. if fc == 0x06: if len(pdu) != 5: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) addr, value = struct.unpack(">HH", pdu[1:5]) rule = self.lookup_rule(fc, addr) if rule is not None: return self.exception_pdu(fc, rule.exception) self._store.hr[addr] = value return pdu # FC06 echoes on success. # FC15 — [fc u8][addr u16][count u16][byte_count u8][values...] if fc == 0x0F: if len(pdu) < 6: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) addr, count = struct.unpack(">HH", pdu[1:5]) rule = self.lookup_rule(fc, addr) if rule is not None: return self.exception_pdu(fc, rule.exception) # Happy-path ignore-the-data, ack with standard response. return struct.pack(">BHH", fc, addr, count) # FC16 — [fc u8][addr u16][count u16][byte_count u8][u16 values...] if fc == 0x10: if len(pdu) < 6: return self.exception_pdu(fc, self.EXC_ILLEGAL_DATA_VALUE) addr, count = struct.unpack(">HH", pdu[1:5]) rule = self.lookup_rule(fc, addr) if rule is not None: return self.exception_pdu(fc, rule.exception) byte_count = pdu[5] data = pdu[6:6 + byte_count] for i in range(count): self._store.hr[addr + i] = struct.unpack(">H", data[i * 2:i * 2 + 2])[0] return struct.pack(">BHH", fc, addr, count) return self.exception_pdu(fc, self.EXC_ILLEGAL_FUNCTION) async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: peer = writer.get_extra_info("peername") log.info("client connected from %s", peer) try: while True: hdr = await reader.readexactly(7) tx_id, proto, length, unit_id = struct.unpack(">HHHB", hdr) if length < 1: return pdu = await reader.readexactly(length - 1) resp = self.handle_pdu(pdu) out = struct.pack(">HHHB", tx_id, proto, len(resp) + 1, unit_id) + resp writer.write(out) await writer.drain() except asyncio.IncompleteReadError: log.info("client %s disconnected", peer) except Exception: # pylint: disable=broad-except log.exception("unexpected error serving %s", peer) finally: try: writer.close() await writer.wait_closed() except Exception: # pylint: disable=broad-except pass def load_config(path: str) -> tuple[Store, list[Rule], str, int]: with open(path, "r", encoding="utf-8") as fh: raw = json.load(fh) listen = raw.get("listen", {}) host = listen.get("host", "0.0.0.0") port = int(listen.get("port", 5020)) store = Store(raw.get("seeds", {})) rules = [ Rule( fc=int(r["fc"]), address=int(r["address"]), exception=int(r["exception"]), description=str(r.get("description", "")), ) for r in raw.get("rules", []) ] return store, rules, host, port async def main(argv: list[str]) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--config", required=True, help="Path to exception-injection JSON config.") args = parser.parse_args(argv) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s") store, rules, host, port = load_config(args.config) server = Server(store, rules) listener = await asyncio.start_server(server.handle_connection, host, port) log.info("exception-injector listening on %s:%d with %d rule(s)", host, port, len(rules)) for r in rules: log.info(" rule: fc=%d addr=%d -> exception 0x%02X (%s)", r.fc, r.address, r.exception, r.description) async with listener: await listener.serve_forever() return 0 if __name__ == "__main__": try: sys.exit(asyncio.run(main(sys.argv[1:]))) except KeyboardInterrupt: sys.exit(0)