FOCAS — retire Tier-C split, inline managed wire client, make read-only

Migration closes the FOCAS Tier-C architecture. OtOpcUa previously had
`Driver.FOCAS.Host` (NSSM-wrapped Windows service loading Fwlib64.dll via
P/Invoke) + `Driver.FOCAS.Shared` (MessagePack IPC contracts) + a C shim
DLL stand-in for unit tests. All of it is deleted; the driver is now a
single in-process managed assembly talking the FOCAS/2 Ethernet binary
protocol directly on TCP:8193.

Architecture

- Pure-managed `FocasWireClient` inlined at `src/.../Driver.FOCAS/Wire/`
  (owner-imported — see Wire/FocasWireClient.cs for the full surface).
  Opens two TCP sockets, runs the initiate handshake, serialises requests
  on socket 2 through a semaphore, closes cleanly with PDU + socket
  teardown. Both sync `IDisposable` and async `IAsyncDisposable`.
- `WireFocasClient` (same folder) adapts the wire client to OtOpcUa's
  `IFocasClient` surface — fixed-tree reads, PARAM/MACRO/PMC addresses,
  alarms. Writes return `BadNotWritable` by design — OtOpcUa is read-only
  against FOCAS.
- `FocasDriverFactoryExtensions` now accepts `"Backend": "wire"` (default)
  and `"Backend": "unimplemented"`. Legacy `ipc` and `fwlib` backends are
  rejected at startup with a diagnostic pointing at the migration doc.

Deletions

- `src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host/` — whole project + Ipc/,
  Backend/, Stability/, Program.cs.
- `src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared/` — Contracts/, FrameReader,
  FrameWriter, whole project.
- `tests/...Driver.FOCAS.Host.Tests/` + `.Shared.Tests/` — whole projects.
- `src/.../Driver.FOCAS/FwlibNative.cs` + `FwlibFocasClient.cs` — 21
  P/Invokes + 7 `Pack=1` marshalling structs + the Fwlib-backed
  `IFocasClient` implementation.
- `src/.../Driver.FOCAS/Ipc/` + `Supervisor/` — IPC client wrapper +
  Host-process supervisor (backoff, circuit breaker, heartbeat, post-
  mortem reader, process launcher).
- `scripts/install/Install-FocasHost.ps1` — NSSM service installer.
- `tests/.../Driver.FOCAS.Tests/{IpcFocasClientTests, IpcLoopback,
  FwlibNativeHelperTests, PostMortemReaderCompatibilityTests,
  SupervisorTests, FocasDriverFactoryExtensionsTests}.cs` — tests that
  exercised the retired surfaces.
- `tests/.../Driver.FOCAS.IntegrationTests/Shim/` — the zig-built C shim
  DLL that masqueraded as Fwlib64.dll.

Solution changes

- `ZB.MOM.WW.OtOpcUa.slnx` drops the 4 retired project refs.
- `src/.../Driver.FOCAS.csproj` drops the Shared ProjectReference, adds
  `Microsoft.Extensions.Logging.Abstractions` for the optional `ILogger`
  hook in `FocasWireClient`.
- `src/.../Driver.FOCAS.Cli.csproj` drops the six `<Content Include>`
  entries that copied `vendor/fanuc/*.dll` into the CLI bin. CLI now uses
  `WireFocasClient` directly.
- `FocasDriver` default factory flips to `Wire.WireFocasClientFactory`.

Integration tests

- New `tests/.../Driver.FOCAS.IntegrationTests/` project covering fixed-
  tree reads (identity, axes, dynamic, program, operation mode, timers,
  spindle load + max RPM, servo meters), user-authored PARAM / MACRO /
  PMC reads, `DiscoverAsync` emission, `SubscribeAsync` + `OnDataChange`,
  `IAlarmSource` raise/clear transitions, and `ProbeAsync` /
  `OnHostStatusChanged`. 9 e2e tests against the focas-mock fixture
  (Docker container with the vendored Python mock's native FOCAS/2
  Ethernet responder).
- `scripts/integration/run-focas.ps1` orchestrates compose up → tests →
  compose down. Dropped the shim-build stage + DLL-copy step + the split
  testhost workaround (the latter only existed because of native-DLL
  lifecycle bugs the shim tripped).
- Docker compose collapses from 11 per-series services to one `focas-sim`
  service. Tests seed per-series state via `mock_load_profile` at test
  start.
- Vendored focas-mock snapshot refreshed to pick up upstream's native
  FOCAS/2 Ethernet responder (was 660 lines, now 1018) — the
  pre-refresh snapshot only spoke the JSON admin protocol.

Tests

- 145/145 unit tests in `Driver.FOCAS.Tests` pass (was 208 pre-deletion;
  63 removed tests exercised the retired IPC/shim/supervisor/Fwlib
  surfaces).
- 9/9 integration tests pass against the refreshed mock.
- `FocasScaffoldingTests.Unimplemented_factory_throws_on_Create…` updated
  to assert the new diagnostic message pointing at
  `docs/drivers/FOCAS.md` rather than the now-gone `Fwlib64.dll`.

Docs

- `docs/drivers/FOCAS.md` rewritten for the managed wire topology —
  deployment collapses to one `"Backend": "wire"` config block, no
  separate service, no DLL deployment, no pipe ACL.
- `docs/drivers/FOCAS-Test-Fixture.md` updated — single TCP probe skip
  gate instead of TCP + shim probe; fewer moving parts.
- `docs/drivers/README.md` row for FOCAS reflects the Tier-A managed
  topology (previously listed Tier-C + `Fwlib64.dll` P/Invoke).
- `docs/Driver.FOCAS.Cli.md` drops the Tier-C architecture-note section.
- `docs/v2/implementation/focas-isolation-plan.md` marked historical —
  the plan it documents was executed then superseded by the wire client.
- `docs/v2/v2-release-readiness.md` re-audited 2026-04-24. Phase 5
  driver complement closed. FOCAS change-log entry added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-24 14:10:59 -04:00
parent 404b54add0
commit 4b0664bd55
105 changed files with 19530 additions and 4873 deletions

View File

@@ -0,0 +1,185 @@
Metadata-Version: 2.4
Name: focas-mock
Version: 0.1.0
Summary: Mock FOCAS server with version-aware profiles derived from FANUC 64-bit DLL exports.
License-Expression: MIT
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pefile>=2024.8.26
Dynamic: license-file
# focas-mock
`focas-mock` is a Python TCP mock server for testing higher-level FOCAS clients without a real FANUC control.
The project is built from two inputs:
- The 64-bit FANUC-related DLLs downloaded from [Ladder99/fanuc-cnc-api](https://github.com/Ladder99/fanuc-cnc-api)
- The vendor `fwlib.cs` interop file, used as the callable surface reference
The DLLs are not reimplemented at the binary ABI level. Instead, this project extracts their export tables, builds per-version capability profiles, and exposes a JSON-over-TCP mock API whose method names match common FOCAS entry points such as `cnc_allclibhndl3`, `cnc_sysinfo`, `cnc_statinfo`, and `cnc_rddynamic2`.
## What is included
- Vendored 64-bit DLLs under `vendor/fanuc-cnc-api/64bit/`
- A profile extractor that inspects PE exports with `pefile`
- A Windows P/Invoke shim source under `shim/` for clients that load `FWLIB64.dll` directly
- Built-in profiles for:
- `FWLIB64`
- `fwlib0DN64`
- `fwlib0iD64`
- `fwlib30i64`
- `fwlibe64`
- `fwlibNCG64`
- A stateful mock server with:
- version/profile switching
- forced error injection
- runtime state patching
- built-in default mock data
## Quick start
Install in editable mode:
```powershell
python -m pip install -e .
```
List the generated profiles:
```powershell
focas-mock list-profiles
```
Start the mock server with the 30i profile:
```powershell
focas-mock serve --profile fwlib30i64 --host 127.0.0.1 --port 8193
```
Start with a JSON patch file that overrides the default data:
```powershell
focas-mock serve --profile fwlib30i64 --data examples/mock-30i.json
```
## Protocol
The server speaks newline-delimited JSON. Each request is one JSON object per line:
```json
{"id":1,"method":"cnc_allclibhndl3","params":{"ipaddr":"127.0.0.1","port":8193,"timeout":10}}
```
Example response:
```json
{"id":1,"method":"cnc_allclibhndl3","rc":0,"message":"EW_OK","result":{"FlibHndl":1,"profile":"fwlib30i64"}}
```
Supported admin methods:
- `mock_get_state`
- `mock_patch`
- `mock_reset`
- `mock_load_profile`
- `mock_list_methods`
Example patch request:
```json
{"id":2,"method":"mock_patch","params":{"state":{"parameters":{"6711":{"type":"long","value":1234,"decimal":0}}}}}
```
Example test setup over TCP:
```json
{"id":1,"method":"mock_load_profile","params":{"profile":"FWLIB64"}}
{"id":2,"method":"mock_patch","params":{"state":{"pmc":{"R":{"100":{"type":"byte","value":1}}},"parameters":{"6711":{"type":"long","value":1234,"decimal":0}},"macros":{"500":{"value":42000,"decimal":3}},"statinfo":{"run":3,"aut":1,"emergency":0},"alarms":[{"alm_no":100,"type":1,"axis":0,"msg":"TEST ALARM"}]}}}
{"id":3,"method":"cnc_allclibhndl3","params":{"ipaddr":"127.0.0.1","port":8193,"timeout":10}}
{"id":4,"method":"pmc_rdpmcrng","params":{"FlibHndl":1,"area":"R","data_type":"byte","start":100,"end":100}}
```
## Regenerating profiles
The built-in JSON profiles are generated from the vendored binaries:
```powershell
python -m focas_mock.cli extract-profiles
```
By default this reads:
- `vendor/fanuc-cnc-api/64bit/*.dll`
- `upstream/fwlib.cs`
and writes:
- `src/focas_mock/builtin_profiles/*.json`
## Testing Direct P/Invoke Clients
If a client directly P/Invokes FANUC's 64-bit DLLs, point it at the shim DLLs built from `shim/` instead of the real vendor DLLs. The shim exports the small FOCAS surface used by the client and forwards calls to this Python server over JSON/TCP.
```powershell
focas-mock serve --profile FWLIB64 --host 127.0.0.1 --port 8193
.\shim\build.ps1
$env:FOCAS_MOCK_HOST = "127.0.0.1"
$env:FOCAS_MOCK_PORT = "8193"
```
Before running the client, seed profile/state with `mock_load_profile` and `mock_patch` as shown above.
Detailed documentation for the supported FOCAS subset is in `docs/USED_FOCAS_API.md`.
OtOpcUa-specific setup notes are in `docs/OTOPCUA_DOTNET_INTEGRATION.md`.
## Implemented mock calls
The server currently implements a practical subset of the surface observed in the exported DLLs and the C# wrapper:
- `cnc_allclibhndl`
- `cnc_allclibhndl2`
- `cnc_allclibhndl3`
- `cnc_freelibhndl`
- `cnc_sysinfo`
- `cnc_statinfo`
- `cnc_rddynamic2`
- `cnc_actf`
- `cnc_acts`
- `cnc_acts2`
- `cnc_getpath`
- `cnc_setpath`
- `cnc_rdaxisname`
- `cnc_rdspdlname`
- `cnc_rdparam`
- `cnc_wrparam`
- `cnc_rdmacro`
- `cnc_wrmacro`
- `cnc_rdalmmsg2`
- `pmc_rdpmcrng`
- `pmc_wrpmcrng`
- `cnc_rdopmsg`
- `cnc_rdopmode`
- `cnc_rdprgnum`
- `cnc_exeprgname2`
- `cnc_rdexecprog`
- `cnc_rdseqnum`
- `cnc_rdblkcount`
- `cnc_rdproginfo`
- `cnc_rdprogdir3`
- `cnc_rdtimer`
- `cnc_rdspmeter`
- `cnc_rdsvmeter`
- `cnc_rdspload`
- `cnc_rdspgear`
- `cnc_rdspmaxrpm`
- `cnc_rddiagnum`
- `cnc_rddiaginfo`
- `cnc_diagnoss`
## Limitations
- This is not a binary-compatible replacement for FANUC's DLLs.
- This is not a reverse-engineered implementation of FANUC's wire protocol.
- The per-version profiles are grounded in exported symbol tables plus the published interop wrapper, while some defaults such as axis-count hints are inferred from filename families and documented as heuristics.

View File

@@ -0,0 +1,25 @@
LICENSE
README.md
pyproject.toml
src/focas_mock/__init__.py
src/focas_mock/cli.py
src/focas_mock/constants.py
src/focas_mock/data_store.py
src/focas_mock/defaults.py
src/focas_mock/export_introspection.py
src/focas_mock/profiles.py
src/focas_mock/server.py
src/focas_mock.egg-info/PKG-INFO
src/focas_mock.egg-info/SOURCES.txt
src/focas_mock.egg-info/dependency_links.txt
src/focas_mock.egg-info/entry_points.txt
src/focas_mock.egg-info/requires.txt
src/focas_mock.egg-info/top_level.txt
src/focas_mock/builtin_profiles/FWLIB64.json
src/focas_mock/builtin_profiles/fwlib0DN64.json
src/focas_mock/builtin_profiles/fwlib0iD64.json
src/focas_mock/builtin_profiles/fwlib30i64.json
src/focas_mock/builtin_profiles/fwlibNCG64.json
src/focas_mock/builtin_profiles/fwlibe64.json
tests/test_profiles.py
tests/test_server.py

View File

@@ -0,0 +1,2 @@
[console_scripts]
focas-mock = focas_mock.cli:main

View File

@@ -0,0 +1,5 @@
from .profiles import list_profiles, load_profile
from .server import FocasMockServer
__all__ = ["FocasMockServer", "list_profiles", "load_profile"]

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import argparse
import asyncio
import json
from pathlib import Path
from .data_store import MockDataStore
from .export_introspection import write_profiles
from .profiles import list_profiles, load_profile
from .server import FocasMockServer
def _default_root() -> Path:
return Path(__file__).resolve().parents[2]
def build_parser() -> argparse.ArgumentParser:
root = _default_root()
parser = argparse.ArgumentParser(prog="focas-mock")
sub = parser.add_subparsers(dest="command", required=True)
serve = sub.add_parser("serve", help="Start the mock server.")
serve.add_argument("--host", default="127.0.0.1")
serve.add_argument("--port", type=int, default=8193)
serve.add_argument("--profile", default="fwlib30i64")
serve.add_argument("--data", help="Optional JSON patch file.")
sub.add_parser("list-profiles", help="List built-in profiles.")
dump_profile = sub.add_parser("dump-profile", help="Print one built-in profile as JSON.")
dump_profile.add_argument("profile")
extract = sub.add_parser("extract-profiles", help="Regenerate JSON profiles from vendored DLLs.")
extract.add_argument("--dll-dir", default=str(root / "vendor" / "fanuc-cnc-api" / "64bit"))
extract.add_argument("--fwlib", default=str(root / "upstream" / "fwlib.cs"))
extract.add_argument("--out-dir", default=str(root / "src" / "focas_mock" / "builtin_profiles"))
return parser
async def _run_server(args: argparse.Namespace) -> None:
profile = load_profile(args.profile)
store = MockDataStore(profile)
if args.data:
store.load_patch_file(args.data)
server = FocasMockServer(args.host, args.port, profile, store)
await server.start()
print(f"focas-mock listening on {server.host}:{server.port} with profile {profile['profile_name']}")
try:
await server.serve_forever()
finally:
await server.close()
def main(argv: list[str] | None = None) -> None:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "list-profiles":
for profile in list_profiles():
print(profile)
return
if args.command == "dump-profile":
print(json.dumps(load_profile(args.profile), indent=2))
return
if args.command == "extract-profiles":
written = write_profiles(args.dll_dir, args.fwlib, args.out_dir)
for path in written:
print(path)
return
if args.command == "serve":
asyncio.run(_run_server(args))
return
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,120 @@
from __future__ import annotations
EW_PROTOCOL = -17
EW_SOCKET = -16
EW_NODLL = -15
EW_BUS = -11
EW_SYSTEM2 = -10
EW_HSSB = -9
EW_HANDLE = -8
EW_VERSION = -7
EW_UNEXP = -6
EW_SYSTEM = -5
EW_PARITY = -4
EW_MMCSYS = -3
EW_RESET = -2
EW_BUSY = -1
EW_OK = 0
EW_FUNC = 1
EW_LENGTH = 2
EW_NUMBER = 3
EW_ATTRIB = 4
EW_DATA = 5
EW_NOOPT = 6
EW_PROT = 7
EW_OVRFLOW = 8
EW_PARAM = 9
EW_BUFFER = 10
EW_PATH = 11
EW_MODE = 12
EW_REJECT = 13
EW_DTSRVR = 14
EW_ALARM = 15
EW_STOP = 16
EW_PASSWD = 17
RC_LABELS = {
EW_PROTOCOL: "EW_PROTOCOL",
EW_SOCKET: "EW_SOCKET",
EW_NODLL: "EW_NODLL",
EW_BUS: "EW_BUS",
EW_SYSTEM2: "EW_SYSTEM2",
EW_HSSB: "EW_HSSB",
EW_HANDLE: "EW_HANDLE",
EW_VERSION: "EW_VERSION",
EW_UNEXP: "EW_UNEXP",
EW_SYSTEM: "EW_SYSTEM",
EW_PARITY: "EW_PARITY",
EW_MMCSYS: "EW_MMCSYS",
EW_RESET: "EW_RESET",
EW_BUSY: "EW_BUSY",
EW_OK: "EW_OK",
EW_FUNC: "EW_FUNC",
EW_LENGTH: "EW_LENGTH",
EW_NUMBER: "EW_NUMBER",
EW_ATTRIB: "EW_ATTRIB",
EW_DATA: "EW_DATA",
EW_NOOPT: "EW_NOOPT",
EW_PROT: "EW_PROT",
EW_OVRFLOW: "EW_OVRFLOW",
EW_PARAM: "EW_PARAM",
EW_BUFFER: "EW_BUFFER",
EW_PATH: "EW_PATH",
EW_MODE: "EW_MODE",
EW_REJECT: "EW_REJECT",
EW_DTSRVR: "EW_DTSRVR",
EW_ALARM: "EW_ALARM",
EW_STOP: "EW_STOP",
EW_PASSWD: "EW_PASSWD",
}
IMPLEMENTED_FOCAS_METHODS = [
"cnc_allclibhndl",
"cnc_allclibhndl2",
"cnc_allclibhndl3",
"cnc_freelibhndl",
"cnc_sysinfo",
"cnc_statinfo",
"cnc_rddynamic2",
"cnc_actf",
"cnc_acts",
"cnc_acts2",
"cnc_getpath",
"cnc_setpath",
"cnc_rdaxisname",
"cnc_rdspdlname",
"cnc_rdparam",
"cnc_wrparam",
"cnc_rdmacro",
"cnc_wrmacro",
"cnc_rdalmmsg2",
"pmc_rdpmcrng",
"pmc_wrpmcrng",
"cnc_rdopmsg",
"cnc_rdopmode",
"cnc_rdprgnum",
"cnc_exeprgname2",
"cnc_rdexecprog",
"cnc_rdseqnum",
"cnc_rdblkcount",
"cnc_rdproginfo",
"cnc_rdprogdir3",
"cnc_rdtimer",
"cnc_rdspmeter",
"cnc_rdsvmeter",
"cnc_rdspload",
"cnc_rdspgear",
"cnc_rdspmaxrpm",
"cnc_rddiagnum",
"cnc_rddiaginfo",
"cnc_diagnoss",
]
ADMIN_METHODS = [
"mock_get_state",
"mock_patch",
"mock_reset",
"mock_load_profile",
"mock_list_methods",
"mock_schedule_alarms",
]

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import json
from copy import deepcopy
from pathlib import Path
from typing import Any, Mapping
from .defaults import make_default_state
def _deep_merge(target: dict[str, Any], patch: Mapping[str, Any]) -> dict[str, Any]:
for key, value in patch.items():
if isinstance(value, Mapping) and isinstance(target.get(key), dict):
_deep_merge(target[key], value)
else:
target[key] = deepcopy(value)
return target
class MockDataStore:
def __init__(self, profile: Mapping[str, Any]) -> None:
self.profile = dict(profile)
self._defaults = make_default_state(profile)
self._state = deepcopy(self._defaults)
@property
def state(self) -> dict[str, Any]:
return self._state
def snapshot(self) -> dict[str, Any]:
return deepcopy(self._state)
def reset(self) -> dict[str, Any]:
self._state = deepcopy(self._defaults)
return self.snapshot()
def merge_patch(self, patch: Mapping[str, Any]) -> dict[str, Any]:
_deep_merge(self._state, patch)
return self.snapshot()
def load_patch_file(self, path: str | Path) -> dict[str, Any]:
patch = json.loads(Path(path).read_text(encoding="utf-8"))
return self.merge_patch(patch)
def consume_forced_error(self, method: str) -> tuple[int, str] | None:
entry = self._state.get("forced_errors", {}).get(method)
if not entry:
return None
if isinstance(entry, int):
return entry, f"forced error for {method}"
rc = int(entry.get("rc", 0))
count = int(entry.get("count", 1))
message = str(entry.get("message", f"forced error for {method}"))
if count <= 1:
self._state["forced_errors"].pop(method, None)
else:
entry["count"] = count - 1
return rc, message

View File

@@ -0,0 +1,142 @@
from __future__ import annotations
from typing import Any, Mapping
def _family_config(profile_name: str) -> tuple[str, int, int, int]:
lowered = profile_name.lower()
if "30i" in lowered or "ncg" in lowered:
return ("30i", 32, 2, 4)
if "0id" in lowered or "0dn" in lowered:
return ("0i-D", 24, 1, 2)
if "fwlibe64" in lowered:
return ("e64", 8, 1, 2)
return ("generic", 8, 1, 2)
def make_default_state(profile: Mapping[str, Any]) -> dict[str, Any]:
profile_name = str(profile.get("profile_name", "FWLIB64"))
family, default_axes, max_path, max_spindles = _family_config(profile_name)
max_axis = int(profile.get("max_axis_hint") or default_axes)
axis_names = [
"X",
"Y",
"Z",
"A",
"B",
"C",
"U",
"V",
"W",
"P",
"Q",
"R",
]
while len(axis_names) < max_axis:
axis_names.append(f"A{len(axis_names) + 1}")
spindle_names = [f"S{i}" for i in range(1, max_spindles + 1)]
programs = [
{"number": 1, "comment": "MAIN", "length": 128},
{"number": 100, "comment": "TOOLCHANGE", "length": 84},
]
return {
"sysinfo": {
"addinfo": 0,
"max_axis": max_axis,
"cnc_type": family[:2].ljust(2),
"mt_type": "M ",
"series": family[:4].ljust(4),
"version": "A1.0",
"axes": str(max_axis).rjust(2, "0"),
},
"statinfo": {
"aut": 1,
"run": 3,
"motion": 1,
"mstb": 0,
"emergency": 0,
"alarm": 0,
"edit": 0,
},
"paths": {"current": 1, "max": max_path},
"dynamic": {
"alarm": 0,
"prgnum": 1,
"prgmnum": 1,
"seqnum": 120,
"actf": 1500,
"acts": 3200,
"axes": {
axis_names[0]: {"absolute": 123456, "machine": 123450, "relative": 6, "distance": 0},
axis_names[1]: {"absolute": -22000, "machine": -22010, "relative": 10, "distance": 0},
axis_names[2]: {"absolute": 8000, "machine": 7990, "relative": 10, "distance": 0},
},
},
"actf": 1500,
"acts": 3200,
"acts2": [3200] + [0] * (max_spindles - 1),
"axis_names": axis_names[:max_axis],
"spindle_names": spindle_names,
"parameters": {
"6711": {"type": "long", "value": 1, "decimal": 0, "description": "example parameter"},
"6712": {"type": "long", "value": 500, "decimal": 0, "description": "example parameter"},
},
"pmc": {
"R": {
"100": {"type": "byte", "value": 0},
"101": {"type": "byte", "value": 1},
"102": {"type": "byte", "value": 0},
},
},
"macros": {
"100": {"value": 12345, "decimal": 3},
"101": {"value": 98765, "decimal": 3},
},
"alarms": [
{"alm_no": 0, "type": 0, "axis": 0, "msg": ""},
],
"operator_messages": [
{"number": 200, "type": 0, "char_num": 12, "data": "READY"},
],
"program": {
"current": 1,
"main": 1,
"sequence": 120,
"block_count": 42,
"executing": "%\nO0001\nG90 G54 G00 X0 Y0\nM30\n%",
"executing_path": "//CNC_MEM/USER/PATH1/O0001",
"directory": programs,
},
"spindle": {
"meter": [
{"name": spindle_names[0], "value": 56, "unit": "%"},
{"name": spindle_names[1], "value": 0, "unit": "%"},
],
"servo_meter": [
{"name": axis_names[0], "value": 14, "unit": "%"},
{"name": axis_names[1], "value": 8, "unit": "%"},
{"name": axis_names[2], "value": 5, "unit": "%"},
],
"load": [
{"name": spindle_names[0], "load": 56, "speed": 3200},
{"name": spindle_names[1], "load": 0, "speed": 0},
],
"gear": [1] * max_spindles,
"max_rpm": [6000] * max_spindles,
},
"timers": {
"power_on": 86400,
"operating": 7200,
"cutting": 3600,
"cycle": 95,
},
"operation_mode": {"mode": 1, "name": "MEM"},
"diagnostics": {
"300": {"type": "long", "value": 14, "description": "servo load X"},
"301": {"type": "long", "value": 8, "description": "servo load Y"},
"302": {"type": "long", "value": 5, "description": "servo load Z"},
},
"forced_errors": {},
}

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from .constants import IMPLEMENTED_FOCAS_METHODS
def parse_fwlib_imports(fwlib_cs_path: str | Path) -> list[str]:
text = Path(fwlib_cs_path).read_text(encoding="utf-8", errors="ignore")
imports = re.findall(r"extern short\s+([A-Za-z0-9_]+)\s*\(", text)
return sorted(set(imports))
def extract_dll_exports(dll_path: str | Path) -> list[str]:
import pefile
pe = pefile.PE(str(dll_path))
exports = [entry.name.decode("ascii", "ignore") for entry in pe.DIRECTORY_ENTRY_EXPORT.symbols if entry.name]
return sorted(set(exports))
def _infer_metadata(dll_name: str) -> tuple[str, int, int]:
lowered = dll_name.lower()
if lowered == "fwlib64.dll":
return ("generic-ethernet", 8, 1)
if lowered == "fwlibe64.dll":
return ("embedded-ethernet", 8, 1)
if "30i" in lowered:
return ("30i/31i/32i", 32, 2)
if "ncg" in lowered:
return ("ncguide-family", 32, 2)
if "0id" in lowered:
return ("0i-d-family", 24, 1)
if "0dn" in lowered:
return ("0-dn-family", 24, 1)
return ("unknown", 8, 1)
def build_profile(dll_path: str | Path, fwlib_cs_path: str | Path) -> dict[str, Any]:
dll_path = Path(dll_path)
exports = extract_dll_exports(dll_path)
wrapper_imports = set(parse_fwlib_imports(fwlib_cs_path))
series_hint, max_axis_hint, max_path_hint = _infer_metadata(dll_path.name)
export_set = set(exports)
connection_methods = sorted(symbol for symbol in exports if symbol.startswith("cnc_allclibhndl"))
mock_methods = sorted(set(IMPLEMENTED_FOCAS_METHODS) & export_set)
wrapper_supported = sorted(wrapper_imports & export_set)
return {
"profile_name": dll_path.stem,
"dll_name": dll_path.name,
"series_hint": series_hint,
"max_axis_hint": max_axis_hint,
"max_path_hint": max_path_hint,
"export_count": len(exports),
"connection_methods": connection_methods,
"mock_methods": mock_methods,
"wrapper_supported_count": len(wrapper_supported),
"wrapper_supported_methods": wrapper_supported,
"exports": exports,
"notes": [
"Exports extracted directly from the 64-bit DLL PE export directory.",
"Wrapper-supported methods are intersected with upstream fwlib.cs extern declarations.",
"Axis and path hints are filename-family heuristics, not protocol-level proofs.",
],
}
def write_profiles(dll_dir: str | Path, fwlib_cs_path: str | Path, out_dir: str | Path) -> list[Path]:
dll_dir = Path(dll_dir)
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
for dll_path in sorted(dll_dir.glob("*.dll")):
profile = build_profile(dll_path, fwlib_cs_path)
out_path = out_dir / f"{dll_path.stem}.json"
out_path.write_text(json.dumps(profile, indent=2), encoding="utf-8")
written.append(out_path)
return written

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
PROFILE_DIR = Path(__file__).resolve().parent / "builtin_profiles"
PROFILE_ALIASES = {
"ZeroI_D": "fwlib0iD64",
"ZeroI_F": "fwlib0iD64",
"ZeroI_MF": "fwlib0iD64",
"ZeroI_TF": "fwlib0iD64",
"Sixteen_i": "FWLIB64",
"Thirty_i": "fwlib30i64",
"ThirtyOne_i": "fwlib30i64",
"ThirtyTwo_i": "fwlib30i64",
"PowerMotion_i": "fwlib0DN64",
}
def list_profiles() -> list[str]:
return sorted(path.stem for path in PROFILE_DIR.glob("*.json"))
def resolve_profile_name(profile_name: str) -> str:
return PROFILE_ALIASES.get(profile_name, profile_name)
def load_profile(profile_name: str) -> dict[str, Any]:
profile_name = resolve_profile_name(profile_name)
candidates = [
PROFILE_DIR / f"{profile_name}.json",
PROFILE_DIR / f"{Path(profile_name).stem}.json",
]
for candidate in candidates:
if candidate.exists():
return json.loads(candidate.read_text(encoding="utf-8"))
available = ", ".join(list_profiles())
raise FileNotFoundError(f"Unknown profile '{profile_name}'. Available profiles: {available}")