"""python-snap7 S7 server for integration tests. Reads a JSON profile from argv[1], allocates bytearrays for each declared area (DB / MB / EB / AB), poke-seeds values at declared offsets, then starts the snap7 Server on the configured port + blocks until Ctrl+C. Shape intentionally mirrors the pymodbus `serve.ps1 + *.json` pattern one directory over so someone familiar with the Modbus fixture can read this without re-learning. The snap7.server.Server class is the MIT-licensed S7 PLC emulator wrapped by python-snap7 (https://github.com/gijzelaerr/python-snap7). Its own docstring admits "protocol compliance testing rather than full industrial-grade functionality" — good enough for ISO-on-TCP wire-level round-trip but NOT for S7-1500 Optimized-DB symbolic access, SCL variant-specific behaviour, or PG/OP/S7-Basic session differentiation. """ from __future__ import annotations import argparse import ctypes import json import signal import sys import time from pathlib import Path # python-snap7 installs as `snap7` package; Server class lives under `snap7.server`. import snap7 from snap7.type import SrvArea # Map JSON area names → SrvArea enum values. PE = inputs (I/E), PA = outputs # (Q/A), MK = memory (M), DB = data blocks, TM = timers, CT = counters. AREA_MAP: dict[str, int] = { "PE": SrvArea.PE, "PA": SrvArea.PA, "MK": SrvArea.MK, "DB": SrvArea.DB, "TM": SrvArea.TM, "CT": SrvArea.CT, } def seed_buffer(buf: bytearray, seeds: list[dict]) -> None: """Poke seed values into the area buffer at declared byte offsets. Each seed is {"offset": int, "type": str, "value": int|float|bool|str} where type ∈ {u8, i8, u16, i16, u32, i32, f32, bool, ascii}. Endianness is big-endian (Siemens wire format). """ for seed in seeds: off = int(seed["offset"]) t = seed["type"] v = seed["value"] if t == "u8": buf[off] = int(v) & 0xFF elif t == "i8": buf[off] = int(v) & 0xFF elif t == "u16": buf[off:off + 2] = int(v).to_bytes(2, "big", signed=False) elif t == "i16": buf[off:off + 2] = int(v).to_bytes(2, "big", signed=True) elif t == "u32": buf[off:off + 4] = int(v).to_bytes(4, "big", signed=False) elif t == "i32": buf[off:off + 4] = int(v).to_bytes(4, "big", signed=True) elif t == "f32": import struct buf[off:off + 4] = struct.pack(">f", float(v)) elif t == "bool": bit = int(seed.get("bit", 0)) if bool(v): buf[off] |= (1 << bit) else: buf[off] &= ~(1 << bit) & 0xFF elif t == "ascii": # Siemens STRING type: byte 0 = max length, byte 1 = current length, # bytes 2+ = payload. Seeds supply the payload text; we fill max/cur. payload = str(v).encode("ascii") max_len = int(seed.get("max_len", 254)) buf[off] = max_len buf[off + 1] = len(payload) buf[off + 2:off + 2 + len(payload)] = payload else: raise ValueError(f"Unknown seed type '{t}'") def main() -> int: parser = argparse.ArgumentParser(description="python-snap7 S7 server for integration tests") parser.add_argument("profile", help="Path to profile JSON") parser.add_argument("--port", type=int, default=1102, help="TCP port (default 1102 non-privileged)") args = parser.parse_args() profile_path = Path(args.profile) if not profile_path.is_file(): print(f"profile not found: {profile_path}", file=sys.stderr) return 1 with profile_path.open() as f: profile = json.load(f) server = snap7.server.Server() # Keep bytearray refs alive for the server's lifetime — snap7 doesn't copy # the buffer, it takes a pointer. Letting GC collect would corrupt reads. buffers: list[bytearray] = [] for area_decl in profile.get("areas", []): area_name = area_decl["area"] if area_name not in AREA_MAP: print(f"unknown area '{area_name}' (expected one of {list(AREA_MAP)})", file=sys.stderr) return 1 index = int(area_decl.get("index", 0)) # DB number for DB area, 0 for MK/PE/PA size = int(area_decl["size"]) buf = bytearray(size) seed_buffer(buf, area_decl.get("seeds", [])) buffers.append(buf) # register_area takes (area, index, c-array); we wrap the bytearray # into a ctypes char array so the native lib can take &buf[0]. arr_type = ctypes.c_char * size arr = arr_type.from_buffer(buf) server.register_area(AREA_MAP[area_name], index, arr) print(f" seeded {area_name}{index} size={size} seeds={len(area_decl.get('seeds', []))}") port = int(args.port) print(f"Starting python-snap7 server on TCP {port} (Ctrl+C to stop)") server.start(tcp_port=port) stop = {"sig": False} def _handle(*_a): stop["sig"] = True signal.signal(signal.SIGINT, _handle) try: signal.signal(signal.SIGTERM, _handle) except Exception: pass # SIGTERM not on all platforms try: while not stop["sig"]: time.sleep(0.25) finally: print("stopping python-snap7 server") try: server.stop() except Exception: pass return 0 if __name__ == "__main__": sys.exit(main())