Files
lmxopcua/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/Docker/server.py
2026-04-26 06:50:26 -04:00

180 lines
6.6 KiB
Python

"""python-snap7 S7 server for integration tests.
Reads a JSON profile from argv[1], allocates bytearrays for each declared area
(DB / MB / EB / AB), poke-seeds values at declared offsets, then starts the
snap7 Server on the configured port + blocks until Ctrl+C. Shape intentionally
mirrors the pymodbus `serve.ps1 + *.json` pattern one directory over so
someone familiar with the Modbus fixture can read this without re-learning.
The snap7.server.Server class is the MIT-licensed S7 PLC emulator wrapped by
python-snap7 (https://github.com/gijzelaerr/python-snap7). Its own docstring
admits "protocol compliance testing rather than full industrial-grade
functionality" — good enough for ISO-on-TCP wire-level round-trip but NOT
for S7-1500 Optimized-DB symbolic access, SCL variant-specific behaviour, or
PG/OP/S7-Basic session differentiation.
"""
from __future__ import annotations
import argparse
import ctypes
import json
import signal
import sys
import time
from pathlib import Path
# python-snap7 installs as `snap7` package; Server class lives under `snap7.server`.
import snap7
from snap7.type import SrvArea
# Map JSON area names → SrvArea enum values. PE = inputs (I/E), PA = outputs
# (Q/A), MK = memory (M), DB = data blocks, TM = timers, CT = counters.
AREA_MAP: dict[str, int] = {
"PE": SrvArea.PE,
"PA": SrvArea.PA,
"MK": SrvArea.MK,
"DB": SrvArea.DB,
"TM": SrvArea.TM,
"CT": SrvArea.CT,
}
def seed_buffer(buf: bytearray, seeds: list[dict]) -> None:
"""Poke seed values into the area buffer at declared byte offsets.
Each seed is {"offset": int, "type": str, "value": int|float|bool|str}
where type ∈ {u8, i8, u16, i16, u32, i32, f32, bool, ascii, udt_layout}.
Endianness is big-endian (Siemens wire format).
PR-S7-D2: ``udt_layout`` is a meta-seed-type that flattens an ordered list
of UDT members into per-member primitive seeds at member-byte offsets
relative to the parent's ``offset``. Shape:
{
"offset": 400, "type": "udt_layout",
"members": [
{"name": "Pressure", "offset": 0, "type": "f32", "value": 12.5},
{"name": "Status", "offset": 4, "type": "i16", "value": 7},
{"name": "Enabled", "offset": 6, "type": "bool", "value": true, "bit": 0}
]
}
Members reuse the same primitive seed types so the simulator stays
one-pass — ``udt_layout`` is sugar that lets the JSON profile read like
the UDT layout the .NET driver fan-outs into.
"""
for seed in seeds:
# PR-S7-D2: expand udt_layout meta-seeds inline before the per-type
# dispatch so members hit the same primitive paths as a flat seed list.
if seed.get("type") == "udt_layout":
base = int(seed["offset"])
members = seed.get("members", [])
expanded = []
for m in members:
child = dict(m)
child["offset"] = base + int(m.get("offset", 0))
expanded.append(child)
seed_buffer(buf, expanded)
continue
off = int(seed["offset"])
t = seed["type"]
v = seed["value"]
if t == "u8":
buf[off] = int(v) & 0xFF
elif t == "i8":
buf[off] = int(v) & 0xFF
elif t == "u16":
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=False)
elif t == "i16":
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=True)
elif t == "u32":
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=False)
elif t == "i32":
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=True)
elif t == "f32":
import struct
buf[off:off + 4] = struct.pack(">f", float(v))
elif t == "bool":
bit = int(seed.get("bit", 0))
if bool(v):
buf[off] |= (1 << bit)
else:
buf[off] &= ~(1 << bit) & 0xFF
elif t == "ascii":
# Siemens STRING type: byte 0 = max length, byte 1 = current length,
# bytes 2+ = payload. Seeds supply the payload text; we fill max/cur.
payload = str(v).encode("ascii")
max_len = int(seed.get("max_len", 254))
buf[off] = max_len
buf[off + 1] = len(payload)
buf[off + 2:off + 2 + len(payload)] = payload
else:
raise ValueError(f"Unknown seed type '{t}'")
def main() -> int:
parser = argparse.ArgumentParser(description="python-snap7 S7 server for integration tests")
parser.add_argument("profile", help="Path to profile JSON")
parser.add_argument("--port", type=int, default=1102, help="TCP port (default 1102 non-privileged)")
args = parser.parse_args()
profile_path = Path(args.profile)
if not profile_path.is_file():
print(f"profile not found: {profile_path}", file=sys.stderr)
return 1
with profile_path.open() as f:
profile = json.load(f)
server = snap7.server.Server()
# Keep bytearray refs alive for the server's lifetime — snap7 doesn't copy
# the buffer, it takes a pointer. Letting GC collect would corrupt reads.
buffers: list[bytearray] = []
for area_decl in profile.get("areas", []):
area_name = area_decl["area"]
if area_name not in AREA_MAP:
print(f"unknown area '{area_name}' (expected one of {list(AREA_MAP)})", file=sys.stderr)
return 1
index = int(area_decl.get("index", 0)) # DB number for DB area, 0 for MK/PE/PA
size = int(area_decl["size"])
buf = bytearray(size)
seed_buffer(buf, area_decl.get("seeds", []))
buffers.append(buf)
# register_area takes (area, index, c-array); we wrap the bytearray
# into a ctypes char array so the native lib can take &buf[0].
arr_type = ctypes.c_char * size
arr = arr_type.from_buffer(buf)
server.register_area(AREA_MAP[area_name], index, arr)
print(f" seeded {area_name}{index} size={size} seeds={len(area_decl.get('seeds', []))}")
port = int(args.port)
print(f"Starting python-snap7 server on TCP {port} (Ctrl+C to stop)")
server.start(tcp_port=port)
stop = {"sig": False}
def _handle(*_a):
stop["sig"] = True
signal.signal(signal.SIGINT, _handle)
try:
signal.signal(signal.SIGTERM, _handle)
except Exception:
pass # SIGTERM not on all platforms
try:
while not stop["sig"]:
time.sleep(0.25)
finally:
print("stopping python-snap7 server")
try:
server.stop()
except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())