Files
lmxopcua/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/PythonSnap7/server.py
Joseph Doherty 1d3544f18e S7 integration fixture — python-snap7 server closes the wire-level coverage gap (#216) + per-driver fixture coverage docs for every driver in the fleet. Closes #216. Two shipments in one PR because the docs landed as I surveyed each driver's fixture + the S7 work is the first wire-level-gap closer pulled from that survey.
S7 integration — AbCip/Modbus already have real-simulator integration suites; S7 had zero wire-level coverage despite being a Tier-A driver (all unit tests mocked IS7Client). Picked python-snap7's `snap7.server.Server` over raw Snap7 C library because `pip install` beats per-OS binary-pin maintenance, the package ships a Python __main__ shim that mirrors our existing pymodbus serve.ps1 + *.json pattern structurally, and the python-snap7 project is actively maintained. New project `tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/` with four moving parts: (a) `Snap7ServerFixture` — collection-scoped TCP probe on `localhost:1102` that sets `SkipReason` when the simulator's not running, matching the `ModbusSimulatorFixture` shape one directory over (same S7_SIM_ENDPOINT env var override convention for pointing at a real S7 CPU on port 102); (b) `PythonSnap7/` — `serve.ps1` wrapper + `server.py` shim + `s7_1500.json` seed profile + `README.md` documenting install / run / known limitations; (c) `S7_1500/S7_1500Profile.cs` — driver-side `S7DriverOptions` whose tag addresses map 1:1 to the JSON profile's seed offsets (DB1.DBW0 u16, DB1.DBW10 i16, DB1.DBD20 i32, DB1.DBD30 f32, DB1.DBX50.3 bool, DB1.DBW100 scratch); (d) `S7_1500SmokeTests` — three tests proving typed reads + write-then-read round-trip work through real S7netplus + real ISO-on-TCP + real snap7 server. Picked port 1102 default instead of S7-standard 102 because 102 is privileged on Linux + triggers Windows Firewall prompt; S7netplus 0.20 has a 5-arg `Plc(CpuType, host, port, rack, slot)` ctor that lets the driver honour `S7DriverOptions.Port`, but the existing driver code called the 4-arg overload + silently hardcoded 102. One-line driver fix (S7Driver.cs:87) threads `_options.Port` through — the S7 unit suite (58/58) still passes unchanged because every unit test uses a fake IS7Client that never sees the real ctor. Server seed-type matrix in `server.py` covers u8 / i8 / u16 / i16 / u32 / i32 / f32 / bool-with-bit / ascii (S7 STRING with max_len header). register_area takes the SrvArea enum value, not the string name — a 15-minute debug after the first test run caught that; documented inline.

Per-driver test-fixture coverage docs — eight new files in `docs/drivers/` laying out what each driver's harness actually benchmarks vs. what's trusted from field deployments. Pattern mirrors the AbServer-Test-Fixture.md doc that shipped earlier in this arc: TL;DR → What the fixture is → What it actually covers → What it does NOT cover → When-to-trust table → Follow-up candidates → Key files. Ugly truth the survey made visible: Galaxy + Modbus + (now) S7 + AB CIP have real wire-level coverage; AB Legacy / TwinCAT / FOCAS / OpcUaClient are still contract-only because their libraries ship no fake + no open-source simulator exists (AB Legacy PCCC), no public simulator exists (FOCAS), the vendor SDK has no in-process fake (TwinCAT/ADS.NET), or the test wiring just hasn't happened yet (OpcUaClient could trivially loopback against this repo's own server — flagged as #215). Each doc names the specific follow-up route: Snap7 server for S7 (done), TwinCAT 3 developer-runtime auto-restart for TwinCAT, Tier-C out-of-process Host for FOCAS, lab rigs for AB Legacy + hardware-gated bits of the others. `docs/drivers/README.md` gains a coverage-map section linking all eight. Tracking tasks #215-#222 filed for each PR-able follow-up.

Build clean (driver + integration project + docs); S7.Tests 58/58 (unchanged); S7.IntegrationTests 3/3 (new, verified end-to-end against a live python-snap7 server: `driver_reads_seeded_u16_through_real_S7comm`, `driver_reads_seeded_typed_batch`, `driver_write_then_read_round_trip_on_scratch_word`). Next fixture follow-up is #215 (OpcUaClient loopback against own server) — highest ROI of the remaining set, zero external deps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 11:29:15 -04:00

151 lines
5.3 KiB
Python

"""python-snap7 S7 server for integration tests.
Reads a JSON profile from argv[1], allocates bytearrays for each declared area
(DB / MB / EB / AB), poke-seeds values at declared offsets, then starts the
snap7 Server on the configured port + blocks until Ctrl+C. Shape intentionally
mirrors the pymodbus `serve.ps1 + *.json` pattern one directory over so
someone familiar with the Modbus fixture can read this without re-learning.
The snap7.server.Server class is the MIT-licensed S7 PLC emulator wrapped by
python-snap7 (https://github.com/gijzelaerr/python-snap7). Its own docstring
admits "protocol compliance testing rather than full industrial-grade
functionality" — good enough for ISO-on-TCP wire-level round-trip but NOT
for S7-1500 Optimized-DB symbolic access, SCL variant-specific behaviour, or
PG/OP/S7-Basic session differentiation.
"""
from __future__ import annotations
import argparse
import ctypes
import json
import signal
import sys
import time
from pathlib import Path
# python-snap7 installs as `snap7` package; Server class lives under `snap7.server`.
import snap7
from snap7.type import SrvArea
# Map JSON area names → SrvArea enum values. PE = inputs (I/E), PA = outputs
# (Q/A), MK = memory (M), DB = data blocks, TM = timers, CT = counters.
AREA_MAP: dict[str, int] = {
"PE": SrvArea.PE,
"PA": SrvArea.PA,
"MK": SrvArea.MK,
"DB": SrvArea.DB,
"TM": SrvArea.TM,
"CT": SrvArea.CT,
}
def seed_buffer(buf: bytearray, seeds: list[dict]) -> None:
"""Poke seed values into the area buffer at declared byte offsets.
Each seed is {"offset": int, "type": str, "value": int|float|bool|str}
where type ∈ {u8, i8, u16, i16, u32, i32, f32, bool, ascii}. Endianness is
big-endian (Siemens wire format).
"""
for seed in seeds:
off = int(seed["offset"])
t = seed["type"]
v = seed["value"]
if t == "u8":
buf[off] = int(v) & 0xFF
elif t == "i8":
buf[off] = int(v) & 0xFF
elif t == "u16":
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=False)
elif t == "i16":
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=True)
elif t == "u32":
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=False)
elif t == "i32":
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=True)
elif t == "f32":
import struct
buf[off:off + 4] = struct.pack(">f", float(v))
elif t == "bool":
bit = int(seed.get("bit", 0))
if bool(v):
buf[off] |= (1 << bit)
else:
buf[off] &= ~(1 << bit) & 0xFF
elif t == "ascii":
# Siemens STRING type: byte 0 = max length, byte 1 = current length,
# bytes 2+ = payload. Seeds supply the payload text; we fill max/cur.
payload = str(v).encode("ascii")
max_len = int(seed.get("max_len", 254))
buf[off] = max_len
buf[off + 1] = len(payload)
buf[off + 2:off + 2 + len(payload)] = payload
else:
raise ValueError(f"Unknown seed type '{t}'")
def main() -> int:
parser = argparse.ArgumentParser(description="python-snap7 S7 server for integration tests")
parser.add_argument("profile", help="Path to profile JSON")
parser.add_argument("--port", type=int, default=1102, help="TCP port (default 1102 non-privileged)")
args = parser.parse_args()
profile_path = Path(args.profile)
if not profile_path.is_file():
print(f"profile not found: {profile_path}", file=sys.stderr)
return 1
with profile_path.open() as f:
profile = json.load(f)
server = snap7.server.Server()
# Keep bytearray refs alive for the server's lifetime — snap7 doesn't copy
# the buffer, it takes a pointer. Letting GC collect would corrupt reads.
buffers: list[bytearray] = []
for area_decl in profile.get("areas", []):
area_name = area_decl["area"]
if area_name not in AREA_MAP:
print(f"unknown area '{area_name}' (expected one of {list(AREA_MAP)})", file=sys.stderr)
return 1
index = int(area_decl.get("index", 0)) # DB number for DB area, 0 for MK/PE/PA
size = int(area_decl["size"])
buf = bytearray(size)
seed_buffer(buf, area_decl.get("seeds", []))
buffers.append(buf)
# register_area takes (area, index, c-array); we wrap the bytearray
# into a ctypes char array so the native lib can take &buf[0].
arr_type = ctypes.c_char * size
arr = arr_type.from_buffer(buf)
server.register_area(AREA_MAP[area_name], index, arr)
print(f" seeded {area_name}{index} size={size} seeds={len(area_decl.get('seeds', []))}")
port = int(args.port)
print(f"Starting python-snap7 server on TCP {port} (Ctrl+C to stop)")
server.start(tcp_port=port)
stop = {"sig": False}
def _handle(*_a):
stop["sig"] = True
signal.signal(signal.SIGINT, _handle)
try:
signal.signal(signal.SIGTERM, _handle)
except Exception:
pass # SIGTERM not on all platforms
try:
while not stop["sig"]:
time.sleep(0.25)
finally:
print("stopping python-snap7 server")
try:
server.stop()
except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())