#!/usr/bin/env python3 """FOCAS/2 v3 data-PDU capture for offline parser work. Replays exactly the connect sequence ``FocasWireClient.ConnectCoreAsync`` performs (two-socket initiate, then the ``cnc_sysinfo`` 0x0018 + ``0x000e``/0x26f0 setup requests on socket 2), then issues each target data request on socket 2 and dumps the RAW v3 response PDU bytes. Read-only: NOT a single write/operate PDU is sent. Every socket read is timeout-bounded, so a command that "hangs" the C# client (``cnc_rdsvmeter`` on v3) shows up here as a recorded partial/timeout rather than an infinite block. Captured ``.bin`` files become offline unit-test fixtures. The request framing here mirrors ``Wire/FocasWireProtocol.cs`` / ``Wire/FocasWireClient.cs`` byte-for-byte: PDU = a0 a0 a0 a0 | ver(u16) | type | dir | bodyLen(u16) | body data body = blockCount(u16) | block... block = blockLen(u16) | reqClass(u16) | pathId(u16) | cmd(u16) | arg1(i32) | arg2(i32) | arg3(i32) | arg4(i32) | arg5(u16) | extraLen(u16) | extra We EMIT version=1 (the CNC accepts v1 requests) and ACCEPT whatever version it answers. Usage: python3 scripts/focas/capture-v3.py [port] [--out DIR] See: docs/plans/2026-06-25-focas-pdu-v3-30i-b-support.md docs/plans/2026-06-25-focas-pdu-v3-implementation-plan.md (Phase 1) """ import os import socket import sys MAGIC = bytes([0xA0, 0xA0, 0xA0, 0xA0]) EMIT_VERSION = 1 # what the WireFocasClient puts in request headers TYPE_INITIATE, TYPE_DATA = 0x01, 0x21 DIR_REQUEST = 0x01 READ_TIMEOUT = 6.0 # seconds; a hang becomes a recorded timeout def hexdump(b): return " ".join(f"{x:02x}" for x in b) def build_pdu(version, type_, direction, body): h = bytearray(MAGIC) h += version.to_bytes(2, "big") h += bytes([type_, direction]) h += len(body).to_bytes(2, "big") return bytes(h) + body def build_block(cmd, a1=0, a2=0, a3=0, a4=0, a5=0, req_class=1, path_id=1, extra=b""): blk = bytearray() blk += (0x1C + len(extra)).to_bytes(2, "big") blk += req_class.to_bytes(2, "big") blk += path_id.to_bytes(2, "big") blk += cmd.to_bytes(2, "big") for a in (a1, a2, a3, a4): blk += int(a).to_bytes(4, "big", signed=True) blk += (a5 & 0xFFFF).to_bytes(2, "big") blk += len(extra).to_bytes(2, "big") blk += extra return bytes(blk) def build_data_pdu(blocks): body = bytearray(len(blocks).to_bytes(2, "big")) for b in blocks: body += b return build_pdu(EMIT_VERSION, TYPE_DATA, DIR_REQUEST, bytes(body)) def recv_exactly(sock, n): """Read exactly n bytes; returns (data, complete). Stops early on timeout/close.""" buf = b"" while len(buf) < n: try: chunk = sock.recv(n - len(buf)) except socket.timeout: return buf, False if not chunk: return buf, False buf += chunk return buf, True def read_pdu(sock): """Read one full response PDU. Returns dict with header fields + raw bytes.""" header, ok = recv_exactly(sock, 10) if not ok or len(header) < 10: return {"ok": False, "raw": header, "note": f"short/timeout header ({len(header)} bytes)"} version = int.from_bytes(header[4:6], "big") body_len = int.from_bytes(header[8:10], "big") body, body_ok = recv_exactly(sock, body_len) return { "ok": body_ok, "version": version, "type": header[6], "dir": header[7], "body_len": body_len, "body": body, "raw": header + body, "note": "" if body_ok else f"body short/timeout: got {len(body)}/{body_len}", } def loose_parse_blocks(body): """Best-effort v1-layout block walk so we can eyeball cmd/rc/payloadLen on v3. Returns a list of human-readable lines; never raises (v3 structural drift just yields a 'parse aborted' note while the raw bytes remain authoritative). """ out = [] try: if len(body) < 2: return ["(body < 2 bytes; no block count)"] count = int.from_bytes(body[0:2], "big") out.append(f"blockCount={count}") off = 2 for i in range(count): if off + 16 > len(body): out.append(f" block[{i}] truncated at off={off}") break blk_len = int.from_bytes(body[off:off + 2], "big") cmd = int.from_bytes(body[off + 6:off + 8], "big") rc = int.from_bytes(body[off + 8:off + 10], "big", signed=True) payload_len = int.from_bytes(body[off + 14:off + 16], "big") payload = body[off + 16:off + 16 + payload_len] out.append( f" block[{i}] blkLen={blk_len} cmd=0x{cmd:04x} rc={rc} " f"payloadLen={payload_len} payload={hexdump(payload) or '(empty)'}") if blk_len < 0x10: out.append(" !! blkLen < 0x10 — aborting walk (v3 layout differs)") break off += blk_len except Exception as e: # noqa: BLE001 - capture tool, never crash on parse out.append(f" !! loose parse aborted: {type(e).__name__}: {e}") return out class Session: def __init__(self, host, port, log): self.host, self.port, self.log = host, port, log self.s1 = self.s2 = None def _connect_socket(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.settimeout(READ_TIMEOUT) s.connect((self.host, self.port)) return s def connect(self): self.log("=== two-socket initiate handshake ===") self.s1 = self._connect_socket() self.s1.sendall(build_pdu(EMIT_VERSION, TYPE_INITIATE, DIR_REQUEST, (1).to_bytes(2, "big"))) r1 = read_pdu(self.s1) self.log(f" socket1 initiate <- v={r1.get('version')} bodyLen={r1.get('body_len')} ok={r1['ok']}") self.s2 = self._connect_socket() self.s2.sendall(build_pdu(EMIT_VERSION, TYPE_INITIATE, DIR_REQUEST, (2).to_bytes(2, "big"))) r2 = read_pdu(self.s2) self.log(f" socket2 initiate <- v={r2.get('version')} bodyLen={r2.get('body_len')} ok={r2['ok']}") # mirror ConnectCoreAsync: sysinfo (cached) then the 0x000e/0x26f0 setup request self.request("setup-sysinfo-0x0018", [build_block(0x0018, path_id=1)], save=False) self.request("setup-0x000e-26f0", [build_block(0x000E, 0x26F0, 0x26F0, path_id=1)], save=False) return r1, r2 def request(self, label, blocks, save=True): pdu = build_data_pdu(blocks) self.log(f"\n--- {label} ---") self.log(f" -> {len(pdu)}B req: {hexdump(pdu)}") try: self.s2.sendall(pdu) resp = read_pdu(self.s2) except Exception as e: # noqa: BLE001 self.log(f" !! send/read failed: {type(e).__name__}: {e}") return None self.log(f" <- header: version={resp.get('version')} type=0x{resp.get('type', 0):02x} " f"dir=0x{resp.get('dir', 0):02x} bodyLen={resp.get('body_len')} complete={resp['ok']}") if resp.get("note"): self.log(f" <- NOTE: {resp['note']}") self.log(f" <- raw ({len(resp['raw'])}B): {hexdump(resp['raw'])}") if resp.get("body"): for line in loose_parse_blocks(resp["body"]): self.log(" " + line) if save and OUT_DIR and resp.get("raw"): path = os.path.join(OUT_DIR, f"{label}.bin") with open(path, "wb") as f: f.write(resp["raw"]) return resp def close(self): for s in (self.s2, self.s1): try: if s: s.close() except Exception: # noqa: BLE001 pass # (label, [blocks]) — references first (known-good on v3), then the failing targets. def target_specs(): return [ # ---- known-good references (calibrate the v3 block envelope) ---- ("ref-sysinfo-0x0018", [build_block(0x0018, path_id=1)]), ("ref-axisname-0x0089", [build_block(0x0089, path_id=1)]), ("ref-spdlname-0x008a", [build_block(0x008A, path_id=1)]), ("ref-macro-3901-0x0015", [build_block(0x0015, 3901, 3901, path_id=1)]), ("ref-macro-500-0x0015", [build_block(0x0015, 500, 500, path_id=1)]), ("ref-opmode-0x0057", [build_block(0x0057, path_id=1)]), ("ref-exeprgname-0x00fc", [build_block(0x00FC, path_id=1)]), ("ref-statinfo", [ build_block(0x0019, path_id=1), build_block(0x00E1, path_id=1), build_block(0x0098, path_id=1), ]), # cnc_rddynamic2 axis 1 — the full 9-block bundle the client sends (known-good) ("ref-dynamic2-axis1", [ build_block(0x001A, path_id=1), build_block(0x001C, path_id=1), build_block(0x001D, path_id=1), build_block(0x0024, path_id=1), build_block(0x0025, path_id=1), build_block(0x0026, 4, 1, path_id=1), build_block(0x0026, 1, 1, path_id=1), build_block(0x0026, 6, 1, path_id=1), build_block(0x0026, 7, 1, path_id=1), ]), # ---- failing / target commands ---- ("timer-poweron-0x0120-t0", [build_block(0x0120, 0, path_id=1)]), ("timer-operating-0x0120-t1", [build_block(0x0120, 1, path_id=1)]), ("timer-cutting-0x0120-t2", [build_block(0x0120, 2, path_id=1)]), ("timer-cycle-0x0120-t3", [build_block(0x0120, 3, path_id=1)]), # cnc_rdsvmeter — the C# client sends 0x0056(arg1=1) + 0x0089 together (this HANGS). ("svmeter-pair-0x0056-0x0089", [ build_block(0x0056, 1, path_id=1), build_block(0x0089, path_id=1), ]), # and 0x0056 alone, to isolate which block stalls ("svmeter-alone-0x0056", [build_block(0x0056, 1, path_id=1)]), # pmc_rdpmcrng R100 (area R=5, dataType Word=1, reqClass=2) — BadOutOfRange ("pmcrng-R100-0x8001", [build_block(0x8001, 100, 100, 5, 1, req_class=2, path_id=1)]), ("pmcrng-R0-0x8001", [build_block(0x8001, 0, 0, 5, 1, req_class=2, path_id=1)]), # cnc_rdparam 1320 (axis 0 -> arg2=dataNumber) — BadNotSupported ("param-1320-0x000e", [build_block(0x000E, 1320, 1320, path_id=1)]), ("param-100-0x000e", [build_block(0x000E, 100, 100, path_id=1)]), # cnc_rdalmmsg2 (type=-1 all, count=32, arg3=2, arg4=0x40) — untested ("alarms-0x0023", [build_block(0x0023, -1, 32, 2, 0x40, path_id=1)]), ] OUT_DIR = None def main(): global OUT_DIR argv = [a for a in sys.argv[1:] if a] if not argv: print(__doc__) sys.exit(2) host = argv[0] port = 8193 rest = argv[1:] while rest: a = rest.pop(0) if a == "--out": OUT_DIR = rest.pop(0) else: port = int(a) if OUT_DIR: os.makedirs(OUT_DIR, exist_ok=True) lines = [] def log(msg): print(msg) lines.append(msg) log(f"# FOCAS v3 capture {host}:{port} (emit version={EMIT_VERSION})") sess = Session(host, port, log) try: sess.connect() for label, blocks in target_specs(): sess.request(label, blocks) finally: sess.close() if OUT_DIR: with open(os.path.join(OUT_DIR, "capture-log.txt"), "w") as f: f.write("\n".join(lines) + "\n") log(f"\n# wrote .bin fixtures + capture-log.txt to {OUT_DIR}") if __name__ == "__main__": main()