151 lines
5.3 KiB
Python
151 lines
5.3 KiB
Python
"""python-snap7 S7 server for integration tests.
|
|
|
|
Reads a JSON profile from argv[1], allocates bytearrays for each declared area
|
|
(DB / MB / EB / AB), poke-seeds values at declared offsets, then starts the
|
|
snap7 Server on the configured port + blocks until Ctrl+C. Shape intentionally
|
|
mirrors the pymodbus `serve.ps1 + *.json` pattern one directory over so
|
|
someone familiar with the Modbus fixture can read this without re-learning.
|
|
|
|
The snap7.server.Server class is the MIT-licensed S7 PLC emulator wrapped by
|
|
python-snap7 (https://github.com/gijzelaerr/python-snap7). Its own docstring
|
|
admits "protocol compliance testing rather than full industrial-grade
|
|
functionality" — good enough for ISO-on-TCP wire-level round-trip but NOT
|
|
for S7-1500 Optimized-DB symbolic access, SCL variant-specific behaviour, or
|
|
PG/OP/S7-Basic session differentiation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ctypes
|
|
import json
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# python-snap7 installs as `snap7` package; Server class lives under `snap7.server`.
|
|
import snap7
|
|
from snap7.type import SrvArea
|
|
|
|
|
|
# Map JSON area names → SrvArea enum values. PE = inputs (I/E), PA = outputs
|
|
# (Q/A), MK = memory (M), DB = data blocks, TM = timers, CT = counters.
|
|
AREA_MAP: dict[str, int] = {
|
|
"PE": SrvArea.PE,
|
|
"PA": SrvArea.PA,
|
|
"MK": SrvArea.MK,
|
|
"DB": SrvArea.DB,
|
|
"TM": SrvArea.TM,
|
|
"CT": SrvArea.CT,
|
|
}
|
|
|
|
|
|
def seed_buffer(buf: bytearray, seeds: list[dict]) -> None:
|
|
"""Poke seed values into the area buffer at declared byte offsets.
|
|
|
|
Each seed is {"offset": int, "type": str, "value": int|float|bool|str}
|
|
where type ∈ {u8, i8, u16, i16, u32, i32, f32, bool, ascii}. Endianness is
|
|
big-endian (Siemens wire format).
|
|
"""
|
|
for seed in seeds:
|
|
off = int(seed["offset"])
|
|
t = seed["type"]
|
|
v = seed["value"]
|
|
if t == "u8":
|
|
buf[off] = int(v) & 0xFF
|
|
elif t == "i8":
|
|
buf[off] = int(v) & 0xFF
|
|
elif t == "u16":
|
|
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=False)
|
|
elif t == "i16":
|
|
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=True)
|
|
elif t == "u32":
|
|
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=False)
|
|
elif t == "i32":
|
|
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=True)
|
|
elif t == "f32":
|
|
import struct
|
|
buf[off:off + 4] = struct.pack(">f", float(v))
|
|
elif t == "bool":
|
|
bit = int(seed.get("bit", 0))
|
|
if bool(v):
|
|
buf[off] |= (1 << bit)
|
|
else:
|
|
buf[off] &= ~(1 << bit) & 0xFF
|
|
elif t == "ascii":
|
|
# Siemens STRING type: byte 0 = max length, byte 1 = current length,
|
|
# bytes 2+ = payload. Seeds supply the payload text; we fill max/cur.
|
|
payload = str(v).encode("ascii")
|
|
max_len = int(seed.get("max_len", 254))
|
|
buf[off] = max_len
|
|
buf[off + 1] = len(payload)
|
|
buf[off + 2:off + 2 + len(payload)] = payload
|
|
else:
|
|
raise ValueError(f"Unknown seed type '{t}'")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="python-snap7 S7 server for integration tests")
|
|
parser.add_argument("profile", help="Path to profile JSON")
|
|
parser.add_argument("--port", type=int, default=1102, help="TCP port (default 1102 non-privileged)")
|
|
args = parser.parse_args()
|
|
|
|
profile_path = Path(args.profile)
|
|
if not profile_path.is_file():
|
|
print(f"profile not found: {profile_path}", file=sys.stderr)
|
|
return 1
|
|
|
|
with profile_path.open() as f:
|
|
profile = json.load(f)
|
|
|
|
server = snap7.server.Server()
|
|
# Keep bytearray refs alive for the server's lifetime — snap7 doesn't copy
|
|
# the buffer, it takes a pointer. Letting GC collect would corrupt reads.
|
|
buffers: list[bytearray] = []
|
|
|
|
for area_decl in profile.get("areas", []):
|
|
area_name = area_decl["area"]
|
|
if area_name not in AREA_MAP:
|
|
print(f"unknown area '{area_name}' (expected one of {list(AREA_MAP)})", file=sys.stderr)
|
|
return 1
|
|
index = int(area_decl.get("index", 0)) # DB number for DB area, 0 for MK/PE/PA
|
|
size = int(area_decl["size"])
|
|
buf = bytearray(size)
|
|
seed_buffer(buf, area_decl.get("seeds", []))
|
|
buffers.append(buf)
|
|
# register_area takes (area, index, c-array); we wrap the bytearray
|
|
# into a ctypes char array so the native lib can take &buf[0].
|
|
arr_type = ctypes.c_char * size
|
|
arr = arr_type.from_buffer(buf)
|
|
server.register_area(AREA_MAP[area_name], index, arr)
|
|
print(f" seeded {area_name}{index} size={size} seeds={len(area_decl.get('seeds', []))}")
|
|
|
|
port = int(args.port)
|
|
print(f"Starting python-snap7 server on TCP {port} (Ctrl+C to stop)")
|
|
server.start(tcp_port=port)
|
|
|
|
stop = {"sig": False}
|
|
def _handle(*_a):
|
|
stop["sig"] = True
|
|
signal.signal(signal.SIGINT, _handle)
|
|
try:
|
|
signal.signal(signal.SIGTERM, _handle)
|
|
except Exception:
|
|
pass # SIGTERM not on all platforms
|
|
|
|
try:
|
|
while not stop["sig"]:
|
|
time.sleep(0.25)
|
|
finally:
|
|
print("stopping python-snap7 server")
|
|
try:
|
|
server.stop()
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|