chore: organize solution into module folders (Core/Server/Drivers/Client/Tooling)
Group all 69 projects into category subfolders under src/ and tests/ so the Rider Solution Explorer mirrors the module structure. Folders: Core, Server, Drivers (with a nested Driver CLIs subfolder), Client, Tooling. - Move every project folder on disk with git mv (history preserved as renames). - Recompute relative paths in 57 .csproj files: cross-category ProjectReferences, the lib/ HintPath+None refs in Driver.Historian.Wonderware, and the external mxaccessgw refs in Driver.Galaxy and its test project. - Rebuild ZB.MOM.WW.OtOpcUa.slnx with nested solution folders. - Re-prefix project paths in functional scripts (e2e, compliance, smoke SQL, integration, install). Build green (0 errors); unit tests pass. Docs left for a separate pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,150 @@
|
||||
"""python-snap7 S7 server for integration tests.
|
||||
|
||||
Reads a JSON profile from argv[1], allocates bytearrays for each declared area
|
||||
(DB / MB / EB / AB), poke-seeds values at declared offsets, then starts the
|
||||
snap7 Server on the configured port + blocks until Ctrl+C. Shape intentionally
|
||||
mirrors the pymodbus `serve.ps1 + *.json` pattern one directory over so
|
||||
someone familiar with the Modbus fixture can read this without re-learning.
|
||||
|
||||
The snap7.server.Server class is the MIT-licensed S7 PLC emulator wrapped by
|
||||
python-snap7 (https://github.com/gijzelaerr/python-snap7). Its own docstring
|
||||
admits "protocol compliance testing rather than full industrial-grade
|
||||
functionality" — good enough for ISO-on-TCP wire-level round-trip but NOT
|
||||
for S7-1500 Optimized-DB symbolic access, SCL variant-specific behaviour, or
|
||||
PG/OP/S7-Basic session differentiation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import ctypes
|
||||
import json
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# python-snap7 installs as `snap7` package; Server class lives under `snap7.server`.
|
||||
import snap7
|
||||
from snap7.type import SrvArea
|
||||
|
||||
|
||||
# Map JSON area names → SrvArea enum values. PE = inputs (I/E), PA = outputs
|
||||
# (Q/A), MK = memory (M), DB = data blocks, TM = timers, CT = counters.
|
||||
AREA_MAP: dict[str, int] = {
|
||||
"PE": SrvArea.PE,
|
||||
"PA": SrvArea.PA,
|
||||
"MK": SrvArea.MK,
|
||||
"DB": SrvArea.DB,
|
||||
"TM": SrvArea.TM,
|
||||
"CT": SrvArea.CT,
|
||||
}
|
||||
|
||||
|
||||
def seed_buffer(buf: bytearray, seeds: list[dict]) -> None:
|
||||
"""Poke seed values into the area buffer at declared byte offsets.
|
||||
|
||||
Each seed is {"offset": int, "type": str, "value": int|float|bool|str}
|
||||
where type ∈ {u8, i8, u16, i16, u32, i32, f32, bool, ascii}. Endianness is
|
||||
big-endian (Siemens wire format).
|
||||
"""
|
||||
for seed in seeds:
|
||||
off = int(seed["offset"])
|
||||
t = seed["type"]
|
||||
v = seed["value"]
|
||||
if t == "u8":
|
||||
buf[off] = int(v) & 0xFF
|
||||
elif t == "i8":
|
||||
buf[off] = int(v) & 0xFF
|
||||
elif t == "u16":
|
||||
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=False)
|
||||
elif t == "i16":
|
||||
buf[off:off + 2] = int(v).to_bytes(2, "big", signed=True)
|
||||
elif t == "u32":
|
||||
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=False)
|
||||
elif t == "i32":
|
||||
buf[off:off + 4] = int(v).to_bytes(4, "big", signed=True)
|
||||
elif t == "f32":
|
||||
import struct
|
||||
buf[off:off + 4] = struct.pack(">f", float(v))
|
||||
elif t == "bool":
|
||||
bit = int(seed.get("bit", 0))
|
||||
if bool(v):
|
||||
buf[off] |= (1 << bit)
|
||||
else:
|
||||
buf[off] &= ~(1 << bit) & 0xFF
|
||||
elif t == "ascii":
|
||||
# Siemens STRING type: byte 0 = max length, byte 1 = current length,
|
||||
# bytes 2+ = payload. Seeds supply the payload text; we fill max/cur.
|
||||
payload = str(v).encode("ascii")
|
||||
max_len = int(seed.get("max_len", 254))
|
||||
buf[off] = max_len
|
||||
buf[off + 1] = len(payload)
|
||||
buf[off + 2:off + 2 + len(payload)] = payload
|
||||
else:
|
||||
raise ValueError(f"Unknown seed type '{t}'")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="python-snap7 S7 server for integration tests")
|
||||
parser.add_argument("profile", help="Path to profile JSON")
|
||||
parser.add_argument("--port", type=int, default=1102, help="TCP port (default 1102 non-privileged)")
|
||||
args = parser.parse_args()
|
||||
|
||||
profile_path = Path(args.profile)
|
||||
if not profile_path.is_file():
|
||||
print(f"profile not found: {profile_path}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
with profile_path.open() as f:
|
||||
profile = json.load(f)
|
||||
|
||||
server = snap7.server.Server()
|
||||
# Keep bytearray refs alive for the server's lifetime — snap7 doesn't copy
|
||||
# the buffer, it takes a pointer. Letting GC collect would corrupt reads.
|
||||
buffers: list[bytearray] = []
|
||||
|
||||
for area_decl in profile.get("areas", []):
|
||||
area_name = area_decl["area"]
|
||||
if area_name not in AREA_MAP:
|
||||
print(f"unknown area '{area_name}' (expected one of {list(AREA_MAP)})", file=sys.stderr)
|
||||
return 1
|
||||
index = int(area_decl.get("index", 0)) # DB number for DB area, 0 for MK/PE/PA
|
||||
size = int(area_decl["size"])
|
||||
buf = bytearray(size)
|
||||
seed_buffer(buf, area_decl.get("seeds", []))
|
||||
buffers.append(buf)
|
||||
# register_area takes (area, index, c-array); we wrap the bytearray
|
||||
# into a ctypes char array so the native lib can take &buf[0].
|
||||
arr_type = ctypes.c_char * size
|
||||
arr = arr_type.from_buffer(buf)
|
||||
server.register_area(AREA_MAP[area_name], index, arr)
|
||||
print(f" seeded {area_name}{index} size={size} seeds={len(area_decl.get('seeds', []))}")
|
||||
|
||||
port = int(args.port)
|
||||
print(f"Starting python-snap7 server on TCP {port} (Ctrl+C to stop)")
|
||||
server.start(tcp_port=port)
|
||||
|
||||
stop = {"sig": False}
|
||||
def _handle(*_a):
|
||||
stop["sig"] = True
|
||||
signal.signal(signal.SIGINT, _handle)
|
||||
try:
|
||||
signal.signal(signal.SIGTERM, _handle)
|
||||
except Exception:
|
||||
pass # SIGTERM not on all platforms
|
||||
|
||||
try:
|
||||
while not stop["sig"]:
|
||||
time.sleep(0.25)
|
||||
finally:
|
||||
print("stopping python-snap7 server")
|
||||
try:
|
||||
server.stop()
|
||||
except Exception:
|
||||
pass
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user