a25593a9c6
Group all 69 projects into category subfolders under src/ and tests/ so the Rider Solution Explorer mirrors the module structure. Folders: Core, Server, Drivers (with a nested Driver CLIs subfolder), Client, Tooling. - Move every project folder on disk with git mv (history preserved as renames). - Recompute relative paths in 57 .csproj files: cross-category ProjectReferences, the lib/ HintPath+None refs in Driver.Historian.Wonderware, and the external mxaccessgw refs in Driver.Galaxy and its test project. - Rebuild ZB.MOM.WW.OtOpcUa.slnx with nested solution folders. - Re-prefix project paths in functional scripts (e2e, compliance, smoke SQL, integration, install). Build green (0 errors); unit tests pass. Docs left for a separate pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1019 lines
45 KiB
Python
1019 lines
45 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from itertools import count
|
|
from typing import Any, Awaitable, Callable
|
|
|
|
from .constants import (
|
|
ADMIN_METHODS,
|
|
EW_FUNC,
|
|
EW_HANDLE,
|
|
EW_NUMBER,
|
|
EW_OK,
|
|
EW_PARAM,
|
|
EW_PATH,
|
|
EW_PROTOCOL,
|
|
EW_VERSION,
|
|
RC_LABELS,
|
|
)
|
|
from .data_store import MockDataStore
|
|
from .profiles import load_profile
|
|
|
|
|
|
Handler = Callable[[dict[str, Any]], Awaitable[tuple[int, dict[str, Any] | None, str | None]]]
|
|
|
|
|
|
class FocasMockServer:
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
profile: dict[str, Any],
|
|
store: MockDataStore | None = None,
|
|
) -> None:
|
|
self.host = host
|
|
self.port = port
|
|
self.profile = profile
|
|
self.store = store or MockDataStore(profile)
|
|
self._next_handle = count(1)
|
|
self._handles: dict[int, dict[str, Any]] = {}
|
|
self._server: asyncio.AbstractServer | None = None
|
|
self._alarm_schedule_task: asyncio.Task[None] | None = None
|
|
self._wire_connection_count = count(1)
|
|
self._handlers: dict[str, Handler] = {
|
|
"cnc_allclibhndl": self._connect,
|
|
"cnc_allclibhndl2": self._connect,
|
|
"cnc_allclibhndl3": self._connect,
|
|
"cnc_freelibhndl": self._disconnect,
|
|
"cnc_sysinfo": self._sysinfo,
|
|
"cnc_statinfo": self._statinfo,
|
|
"cnc_rddynamic2": self._rddynamic2,
|
|
"cnc_actf": self._actf,
|
|
"cnc_acts": self._acts,
|
|
"cnc_acts2": self._acts2,
|
|
"cnc_getpath": self._getpath,
|
|
"cnc_setpath": self._setpath,
|
|
"cnc_rdaxisname": self._rdaxisname,
|
|
"cnc_rdspdlname": self._rdspdlname,
|
|
"cnc_rdparam": self._rdparam,
|
|
"cnc_wrparam": self._wrparam,
|
|
"cnc_rdmacro": self._rdmacro,
|
|
"cnc_wrmacro": self._wrmacro,
|
|
"cnc_rdalmmsg2": self._rdalmmsg2,
|
|
"pmc_rdpmcrng": self._pmc_rdpmcrng,
|
|
"pmc_wrpmcrng": self._pmc_wrpmcrng,
|
|
"cnc_rdopmsg": self._rdopmsg,
|
|
"cnc_rdopmode": self._rdopmode,
|
|
"cnc_rdprgnum": self._rdprgnum,
|
|
"cnc_exeprgname2": self._exeprgname2,
|
|
"cnc_rdexecprog": self._rdexecprog,
|
|
"cnc_rdseqnum": self._rdseqnum,
|
|
"cnc_rdblkcount": self._rdblkcount,
|
|
"cnc_rdproginfo": self._rdproginfo,
|
|
"cnc_rdprogdir3": self._rdprogdir3,
|
|
"cnc_rdtimer": self._rdtimer,
|
|
"cnc_rdspmeter": self._rdspmeter,
|
|
"cnc_rdsvmeter": self._rdsvmeter,
|
|
"cnc_rdspload": self._rdspload,
|
|
"cnc_rdspgear": self._rdspgear,
|
|
"cnc_rdspmaxrpm": self._rdspmaxrpm,
|
|
"cnc_rddiagnum": self._rddiagnum,
|
|
"cnc_rddiaginfo": self._rddiaginfo,
|
|
"cnc_diagnoss": self._diagnoss,
|
|
"mock_get_state": self._mock_get_state,
|
|
"mock_patch": self._mock_patch,
|
|
"mock_reset": self._mock_reset,
|
|
"mock_load_profile": self._mock_load_profile,
|
|
"mock_list_methods": self._mock_list_methods,
|
|
"mock_schedule_alarms": self._mock_schedule_alarms,
|
|
}
|
|
|
|
async def start(self) -> "FocasMockServer":
|
|
self._server = await asyncio.start_server(self._handle_client, self.host, self.port)
|
|
socket = self._server.sockets[0]
|
|
self.port = int(socket.getsockname()[1])
|
|
return self
|
|
|
|
async def close(self) -> None:
|
|
self._cancel_alarm_schedule()
|
|
if self._server is None:
|
|
return
|
|
self._server.close()
|
|
await self._server.wait_closed()
|
|
self._server = None
|
|
|
|
async def serve_forever(self) -> None:
|
|
if self._server is None:
|
|
await self.start()
|
|
assert self._server is not None
|
|
async with self._server:
|
|
await self._server.serve_forever()
|
|
|
|
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
try:
|
|
while not reader.at_eof():
|
|
first = await reader.read(1)
|
|
if not first:
|
|
break
|
|
if first == b"\xa0":
|
|
header = first + await reader.readexactly(9)
|
|
await self._handle_wire_client(reader, writer, header)
|
|
break
|
|
raw = first + await reader.readline()
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
response = await self._dispatch_raw(raw)
|
|
writer.write(json.dumps(response).encode("utf-8") + b"\n")
|
|
await writer.drain()
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
async def _handle_wire_client(
|
|
self,
|
|
reader: asyncio.StreamReader,
|
|
writer: asyncio.StreamWriter,
|
|
first_header: bytes,
|
|
) -> None:
|
|
connection_index = next(self._wire_connection_count)
|
|
header = first_header
|
|
while True:
|
|
if len(header) != 10 or header[:4] != b"\xa0\xa0\xa0\xa0":
|
|
break
|
|
body_len = int.from_bytes(header[8:10], "big")
|
|
body = await reader.readexactly(body_len) if body_len else b""
|
|
response = self._dispatch_wire_pdu(connection_index, header, body)
|
|
if response:
|
|
writer.write(response)
|
|
await writer.drain()
|
|
try:
|
|
header = await reader.readexactly(10)
|
|
except asyncio.IncompleteReadError:
|
|
break
|
|
|
|
def _dispatch_wire_pdu(self, connection_index: int, header: bytes, body: bytes) -> bytes:
|
|
pdu_type = header[6]
|
|
if pdu_type == 0x01:
|
|
return self._wire_initiate_response(connection_index)
|
|
if pdu_type == 0x02:
|
|
return self._wire_pdu(0x02, 0x02, b"")
|
|
if pdu_type == 0x21:
|
|
return self._wire_request_response(body)
|
|
return b""
|
|
|
|
def _wire_pdu(self, pdu_type: int, direction: int, body: bytes, version: int = 1) -> bytes:
|
|
return (
|
|
b"\xa0\xa0\xa0\xa0"
|
|
+ version.to_bytes(2, "big")
|
|
+ bytes([pdu_type & 0xFF, direction & 0xFF])
|
|
+ len(body).to_bytes(2, "big")
|
|
+ body
|
|
)
|
|
|
|
def _wire_initiate_response(self, connection_index: int) -> bytes:
|
|
del connection_index
|
|
max_axis = int(self.store.snapshot()["sysinfo"].get("max_axis", 1))
|
|
body = bytearray(24)
|
|
body[8:10] = (1).to_bytes(2, "big")
|
|
body[16:18] = b"M "
|
|
body[20:22] = max_axis.to_bytes(2, "big")
|
|
body[22:24] = (len(self.store.snapshot()["spindle_names"])).to_bytes(2, "big")
|
|
return self._wire_pdu(0x01, 0x02, bytes(body))
|
|
|
|
def _wire_request_response(self, request_body: bytes) -> bytes:
|
|
if len(request_body) < 2:
|
|
return self._wire_pdu(0x21, 0x02, self._wire_response_body([self._make_wire_response_block(0, b"\x00\x00\x00\x00")]))
|
|
|
|
count_value = int.from_bytes(request_body[:2], "big")
|
|
blocks: list[bytes] = []
|
|
offset = 2
|
|
for _index in range(count_value):
|
|
if offset + 2 > len(request_body):
|
|
break
|
|
block_len = int.from_bytes(request_body[offset : offset + 2], "big")
|
|
block = request_body[offset : offset + block_len]
|
|
if len(block) < block_len:
|
|
break
|
|
blocks.append(self._wire_response_block(block))
|
|
offset += block_len
|
|
|
|
if not blocks:
|
|
blocks.append(self._make_wire_response_block(0, b"\x00\x00\x00\x00"))
|
|
return self._wire_pdu(0x21, 0x02, self._wire_response_body(blocks))
|
|
|
|
def _wire_response_body(self, blocks: list[bytes]) -> bytes:
|
|
return len(blocks).to_bytes(2, "big") + b"".join(blocks)
|
|
|
|
def _wire_response_block(self, request_block: bytes) -> bytes:
|
|
command = int.from_bytes(request_block[6:8], "big") if len(request_block) >= 8 else 0
|
|
payload = self._wire_payload(command, request_block)
|
|
return self._make_wire_response_block(command, payload, request_block)
|
|
|
|
def _make_wire_response_block(self, command: int, payload: bytes, request_block: bytes | None = None) -> bytes:
|
|
block_len = 0x10 + len(payload)
|
|
block = bytearray(block_len)
|
|
block[0:2] = block_len.to_bytes(2, "big")
|
|
if request_block and len(request_block) >= 8:
|
|
block[2:8] = request_block[2:8]
|
|
else:
|
|
block[6:8] = command.to_bytes(2, "big")
|
|
block[14:16] = len(payload).to_bytes(2, "big")
|
|
block[16:] = payload
|
|
return bytes(block)
|
|
|
|
def _wire_payload(self, command: int, request_block: bytes) -> bytes:
|
|
if command == 0x0E and self._block_u32(request_block, 8) == 0x26F0:
|
|
return bytes(8) + self._u32(0xFF)
|
|
if command == 0x18:
|
|
return self._wire_sysinfo()
|
|
if command == 0x19:
|
|
return self._wire_statinfo_primary()
|
|
if command == 0xE1:
|
|
return self._u16(0)
|
|
if command == 0x98:
|
|
statinfo = self.store.snapshot()["statinfo"]
|
|
return self._u16(int(statinfo.get("tmmode", statinfo.get("manual", 0))))
|
|
if command == 0x0E:
|
|
return self._wire_rdparam(request_block)
|
|
if command == 0x1A:
|
|
return self._wire_dynamic_scalar("alarm")
|
|
if command == 0x1C:
|
|
dynamic = self.store.snapshot()["dynamic"]
|
|
return self._u32(int(dynamic.get("prgnum", 0))) + self._u32(int(dynamic.get("prgmnum", 0)))
|
|
if command == 0x1D:
|
|
return self._wire_dynamic_scalar("seqnum")
|
|
if command == 0x24:
|
|
return self._wire_dynamic_scalar("actf")
|
|
if command == 0x25:
|
|
return self._wire_dynamic_scalar("acts")
|
|
if command == 0x26:
|
|
return self._wire_dynamic_axis(request_block)
|
|
if command == 0x35:
|
|
return self._u32(int(self.store.snapshot()["program"].get("block_count", 0)))
|
|
if command == 0x40:
|
|
return self._wire_spindle_metric(request_block)
|
|
if command == 0x56:
|
|
return self._wire_servo_meter()
|
|
if command == 0x57:
|
|
mode = self.store.snapshot()["operation_mode"]
|
|
return self._u16(int(mode.get("mode", 0) if isinstance(mode, dict) else mode))
|
|
if command == 0x89:
|
|
return self._wire_axis_names()
|
|
if command == 0x8A:
|
|
return self._wire_spindle_names()
|
|
if command == 0xFC:
|
|
return self._wire_program_name()
|
|
if command == 0x120:
|
|
return self._wire_timer(request_block)
|
|
if command == 0x10:
|
|
self._wire_wrparam(request_block)
|
|
return b""
|
|
if command == 0x15:
|
|
return self._wire_rdmacro(request_block)
|
|
if command == 0x16:
|
|
self._wire_wrmacro(request_block)
|
|
return b""
|
|
if command == 0x23:
|
|
return self._wire_alarms(request_block)
|
|
if command == 0x8001:
|
|
return self._wire_pmc_read(request_block)
|
|
if command == 0x8002:
|
|
self._wire_pmc_write(request_block)
|
|
return b""
|
|
return b"\x00\x00\x00\x00"
|
|
|
|
def _wire_sysinfo(self) -> bytes:
|
|
info = self.store.snapshot()["sysinfo"]
|
|
payload = bytearray()
|
|
payload += self._u16(int(info.get("addinfo", 0)))
|
|
payload += self._u16(int(info.get("max_axis", 0)))
|
|
payload += self._ascii_fixed(str(info.get("cnc_type", "")), 2)
|
|
payload += self._ascii_fixed(str(info.get("mt_type", "")), 2)
|
|
payload += self._ascii_fixed(str(info.get("series", "")), 4)
|
|
payload += self._ascii_fixed(str(info.get("version", "")), 4)
|
|
payload += self._ascii_fixed(str(info.get("axes", "")), 2)
|
|
return bytes(payload)
|
|
|
|
def _wire_statinfo_primary(self) -> bytes:
|
|
statinfo = self.store.snapshot()["statinfo"]
|
|
values = (
|
|
int(statinfo.get("aut", 0)),
|
|
int(statinfo.get("run", 0)),
|
|
int(statinfo.get("motion", 0)),
|
|
int(statinfo.get("mstb", 0)),
|
|
int(statinfo.get("emergency", 0)),
|
|
int(statinfo.get("alarm", 0)),
|
|
int(statinfo.get("edit", 0)),
|
|
)
|
|
return b"".join(self._u16(value) for value in values)
|
|
|
|
def _wire_rdparam(self, request_block: bytes) -> bytes:
|
|
datano = self._block_u32(request_block, 8)
|
|
parameter = self.store.snapshot()["parameters"].get(str(datano), {})
|
|
value = int(parameter.get("value", 0)) if isinstance(parameter, dict) else 0
|
|
return self._u32(value)
|
|
|
|
def _wire_dynamic_scalar(self, key: str) -> bytes:
|
|
value = int(self.store.snapshot()["dynamic"].get(key, 0))
|
|
return self._u32(value)
|
|
|
|
def _wire_dynamic_axis(self, request_block: bytes) -> bytes:
|
|
selector = self._block_u32(request_block, 8)
|
|
axis = self._block_u32(request_block, 12)
|
|
field = {4: "absolute", 1: "machine", 6: "relative", 7: "distance"}.get(selector, "absolute")
|
|
state = self.store.snapshot()
|
|
names = list(state["axis_names"])
|
|
dynamic_axes = state["dynamic"]["axes"]
|
|
payload = bytearray()
|
|
for name in names:
|
|
position = dynamic_axes.get(name, {})
|
|
payload += self._u32(int(position.get(field, 0)))
|
|
payload += self._u32(0)
|
|
if axis > 0 and axis <= len(names):
|
|
name = names[axis - 1]
|
|
position = dynamic_axes.get(name, {})
|
|
return self._u32(int(position.get(field, 0))) + self._u32(0)
|
|
return bytes(payload)
|
|
|
|
def _wire_wrparam(self, request_block: bytes) -> None:
|
|
datano = self._block_u32(request_block, 8)
|
|
extra = request_block[28:]
|
|
value = int.from_bytes(extra[:4], "big", signed=True) if len(extra) >= 4 else self._block_u32(request_block, 12)
|
|
self.store.merge_patch({"parameters": {str(datano): {"type": "long", "value": value, "decimal": 0}}})
|
|
|
|
def _wire_rdmacro(self, request_block: bytes) -> bytes:
|
|
number = self._block_u32(request_block, 8)
|
|
macro = self.store.snapshot()["macros"].get(str(number), {})
|
|
value = int(macro.get("value", 0)) if isinstance(macro, dict) else 0
|
|
decimal = int(macro.get("decimal", 0)) if isinstance(macro, dict) else 0
|
|
return self._u32(value) + self._u16(decimal) + self._u16(0)
|
|
|
|
def _wire_wrmacro(self, request_block: bytes) -> None:
|
|
number = self._block_u32(request_block, 8)
|
|
extra = request_block[28:]
|
|
value = int.from_bytes(extra[:4], "big", signed=True) if len(extra) >= 4 else 0
|
|
decimal = int.from_bytes(extra[4:6], "big", signed=True) if len(extra) >= 6 else 0
|
|
self.store.merge_patch({"macros": {str(number): {"value": value, "decimal": decimal}}})
|
|
|
|
def _wire_axis_names(self) -> bytes:
|
|
payload = bytearray()
|
|
for name in self.store.snapshot()["axis_names"]:
|
|
payload += self._name_record(name, 4)
|
|
return bytes(payload)
|
|
|
|
def _wire_spindle_names(self) -> bytes:
|
|
payload = bytearray()
|
|
for name in self.store.snapshot()["spindle_names"]:
|
|
payload += self._name_record(name, 4)
|
|
return bytes(payload)
|
|
|
|
def _wire_servo_meter(self) -> bytes:
|
|
state = self.store.snapshot()
|
|
configured = {str(entry.get("name", "")): entry for entry in state["spindle"]["servo_meter"]}
|
|
payload = bytearray()
|
|
for name in state["axis_names"]:
|
|
entry = configured.get(name, {"name": name, "value": 0})
|
|
payload += self._u32(int(entry.get("value", 0)))
|
|
payload += self._u16(0)
|
|
payload += self._u16(1)
|
|
payload += self._name_record(name, 4)
|
|
return bytes(payload)
|
|
|
|
def _wire_spindle_metric(self, request_block: bytes) -> bytes:
|
|
metric = self._block_u32(request_block, 8)
|
|
spindle = self.store.snapshot()["spindle"]
|
|
if metric == 1:
|
|
values = [int(value) for value in spindle.get("max_rpm", [])]
|
|
else:
|
|
values = [int(entry.get("load", 0)) for entry in spindle.get("load", [])]
|
|
payload = bytearray()
|
|
for value in values[:4]:
|
|
payload += self._u32(value)
|
|
payload += self._u32(0)
|
|
return bytes(payload)
|
|
|
|
def _wire_program_name(self) -> bytes:
|
|
program = self.store.snapshot()["program"]
|
|
name = str(program.get("executing_path", ""))
|
|
leaf = name.replace("\\", "/").rstrip("/").split("/")[-1] or name
|
|
number = int(program.get("current", program.get("main", 0)))
|
|
return self._ascii_fixed(leaf, 36) + self._u32(number)
|
|
|
|
def _wire_timer(self, request_block: bytes) -> bytes:
|
|
timer_type = self._block_u32(request_block, 8)
|
|
key_map = {0: "power_on", 1: "operating", 2: "cutting", 3: "cycle"}
|
|
seconds = int(self.store.snapshot()["timers"].get(key_map.get(timer_type, "power_on"), 0))
|
|
return self._u32(seconds // 60) + self._u32((seconds % 60) * 1000)
|
|
|
|
def _wire_alarms(self, request_block: bytes) -> bytes:
|
|
requested = max(self._block_u32(request_block, 12), 1)
|
|
records = bytearray()
|
|
for alarm in self.store.snapshot()["alarms"][:requested]:
|
|
alm_no = int(alarm.get("alm_no", alarm.get("number", 0)))
|
|
alarm_type = int(alarm.get("type", 0))
|
|
axis = int(alarm.get("axis", 0))
|
|
message = str(alarm.get("msg", alarm.get("message", ""))).encode("ascii", errors="replace")[:64]
|
|
records += self._u32(alm_no)
|
|
records += self._u32(alarm_type)
|
|
records += self._u32(axis)
|
|
records += self._u32(len(message))
|
|
records += message.ljust(64, b"\x00")
|
|
return bytes(records)
|
|
|
|
def _wire_pmc_read(self, request_block: bytes) -> bytes:
|
|
start = self._block_u32(request_block, 8)
|
|
end = self._block_u32(request_block, 12)
|
|
area = self._pmc_area_name(self._block_u32(request_block, 16))
|
|
data_type_code = self._block_u32(request_block, 20)
|
|
data_type = self._pmc_data_type(data_type_code)
|
|
area_state = self.store.snapshot().get("pmc", {}).get(area, {})
|
|
payload = bytearray()
|
|
for address in range(start, end + 1):
|
|
entry = area_state.get(str(address), {"value": 0})
|
|
value = int(entry.get("value", 0)) if isinstance(entry, dict) else int(entry)
|
|
if data_type_code == 1:
|
|
payload += self._u16(value)
|
|
elif data_type_code in (2, 4):
|
|
payload += self._u32(value)
|
|
elif data_type_code == 5:
|
|
payload += int(value).to_bytes(8, "big", signed=True)
|
|
else:
|
|
payload += bytes([value & 0xFF])
|
|
return bytes(payload)
|
|
|
|
def _wire_pmc_write(self, request_block: bytes) -> None:
|
|
start = self._block_u32(request_block, 8)
|
|
end = self._block_u32(request_block, 12)
|
|
area = self._pmc_area_name(self._block_u32(request_block, 16))
|
|
data_type_code = self._block_u32(request_block, 20)
|
|
data_type = self._pmc_data_type(data_type_code)
|
|
extra = request_block[28:]
|
|
width = {1: 2, 2: 4, 4: 4, 5: 8}.get(data_type_code, 1)
|
|
patch = {"pmc": {area: {}}}
|
|
for index, address in enumerate(range(start, end + 1)):
|
|
raw = extra[index * width : (index + 1) * width]
|
|
if len(raw) < width:
|
|
break
|
|
value = int.from_bytes(raw, "big", signed=data_type_code != 0)
|
|
patch["pmc"][area][str(address)] = {"type": data_type, "value": value}
|
|
self.store.merge_patch(patch)
|
|
|
|
def _block_u32(self, block: bytes, offset: int) -> int:
|
|
if offset + 4 > len(block):
|
|
return 0
|
|
return int.from_bytes(block[offset : offset + 4], "big")
|
|
|
|
def _u16(self, value: int) -> bytes:
|
|
return int(value).to_bytes(2, "big", signed=True)
|
|
|
|
def _u32(self, value: int) -> bytes:
|
|
return int(value).to_bytes(4, "big", signed=True)
|
|
|
|
def _ascii_fixed(self, value: str, length: int) -> bytes:
|
|
return value.encode("ascii", errors="replace")[:length].ljust(length, b" ")
|
|
|
|
def _name_record(self, value: str, length: int) -> bytes:
|
|
encoded = value.encode("ascii", errors="replace")
|
|
record = bytearray(length)
|
|
if encoded:
|
|
record[0] = encoded[0]
|
|
if len(encoded) > 1:
|
|
record[1 : min(len(encoded), length)] = encoded[1:length]
|
|
return bytes(record)
|
|
|
|
async def _dispatch_raw(self, raw: bytes) -> dict[str, Any]:
|
|
try:
|
|
request = json.loads(raw.decode("utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
return {"id": None, "method": None, "rc": EW_PROTOCOL, "message": f"invalid JSON: {exc.msg}", "result": None}
|
|
|
|
request_id = request.get("id")
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
if not isinstance(method, str):
|
|
return {"id": request_id, "method": method, "rc": EW_PROTOCOL, "message": "method must be a string", "result": None}
|
|
if not isinstance(params, dict):
|
|
return {"id": request_id, "method": method, "rc": EW_PROTOCOL, "message": "params must be an object", "result": None}
|
|
|
|
rc, result, message = await self.dispatch(method, params)
|
|
return {
|
|
"id": request_id,
|
|
"method": method,
|
|
"rc": rc,
|
|
"message": message or RC_LABELS.get(rc, f"RC_{rc}"),
|
|
"result": result,
|
|
}
|
|
|
|
async def dispatch(self, method: str, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
if method not in ADMIN_METHODS:
|
|
forced = self.store.consume_forced_error(method)
|
|
if forced:
|
|
rc, message = forced
|
|
return rc, None, message
|
|
|
|
if method not in ADMIN_METHODS and method not in set(self.profile.get("exports", [])):
|
|
return EW_VERSION, None, f"{method} is not exported by profile {self.profile['profile_name']}"
|
|
|
|
handler = self._handlers.get(method)
|
|
if handler is None:
|
|
return EW_FUNC, None, f"{method} is exported but not implemented by the mock server"
|
|
return await handler(params)
|
|
|
|
def _require_handle(self, params: dict[str, Any]) -> tuple[int | None, dict[str, Any] | None]:
|
|
handle_value = params.get("FlibHndl", params.get("handle"))
|
|
if handle_value is None:
|
|
return None, None
|
|
try:
|
|
handle = int(handle_value)
|
|
except (TypeError, ValueError):
|
|
return None, None
|
|
return handle, self._handles.get(handle)
|
|
|
|
async def _connect(self, params: dict[str, Any]) -> tuple[int, dict[str, Any], str | None]:
|
|
handle = next(self._next_handle)
|
|
self._handles[handle] = {
|
|
"ipaddr": params.get("ipaddr", "127.0.0.1"),
|
|
"port": int(params.get("port", self.port)),
|
|
"timeout": int(params.get("timeout", 10)),
|
|
"path": 1,
|
|
}
|
|
return EW_OK, {"FlibHndl": handle, "profile": self.profile["profile_name"]}, None
|
|
|
|
async def _disconnect(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
handle, _ = self._require_handle(params)
|
|
if handle is None or handle not in self._handles:
|
|
return EW_HANDLE, None, "invalid library handle"
|
|
self._handles.pop(handle, None)
|
|
return EW_OK, {"FlibHndl": handle}, None
|
|
|
|
async def _sysinfo(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"sysinfo": self.store.snapshot()["sysinfo"]}, None
|
|
|
|
async def _statinfo(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"statinfo": self.store.snapshot()["statinfo"]}, None
|
|
|
|
async def _rddynamic2(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
state = self.store.snapshot()
|
|
axis_selector = params.get("axis", 1)
|
|
dynamic = state["dynamic"]
|
|
axis_names = state["axis_names"]
|
|
axis_name = axis_names[0]
|
|
if isinstance(axis_selector, int) and axis_selector > 0 and axis_selector <= len(axis_names):
|
|
axis_name = axis_names[axis_selector - 1]
|
|
pos = dynamic["axes"].get(axis_name, next(iter(dynamic["axes"].values())))
|
|
payload = {
|
|
"rddynamic2": {
|
|
"axis": axis_selector,
|
|
"alarm": dynamic["alarm"],
|
|
"prgnum": dynamic["prgnum"],
|
|
"prgmnum": dynamic["prgmnum"],
|
|
"seqnum": dynamic["seqnum"],
|
|
"actf": dynamic["actf"],
|
|
"acts": dynamic["acts"],
|
|
"pos": pos,
|
|
}
|
|
}
|
|
return EW_OK, payload, None
|
|
|
|
async def _actf(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._simple_handle_value(params, "actf", "actf")
|
|
|
|
async def _acts(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._simple_handle_value(params, "acts", "acts")
|
|
|
|
async def _acts2(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
acts2 = self.store.snapshot()["acts2"]
|
|
return EW_OK, {"acts2": acts2}, None
|
|
|
|
async def _getpath(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
handle, session = self._require_handle(params)
|
|
if handle is None or session is None:
|
|
return EW_HANDLE, None, "invalid library handle"
|
|
max_path = int(self.store.snapshot()["paths"]["max"])
|
|
return EW_OK, {"path_no": session["path"], "max_path": max_path}, None
|
|
|
|
async def _setpath(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
handle, session = self._require_handle(params)
|
|
if handle is None or session is None:
|
|
return EW_HANDLE, None, "invalid library handle"
|
|
path_no = int(params.get("path", params.get("path_no", 1)))
|
|
max_path = int(self.store.snapshot()["paths"]["max"])
|
|
if path_no < 1 or path_no > max_path:
|
|
return EW_PATH, None, f"path {path_no} is outside 1..{max_path}"
|
|
session["path"] = path_no
|
|
return EW_OK, {"path_no": path_no}, None
|
|
|
|
async def _rdaxisname(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"axis_names": self.store.snapshot()["axis_names"]}, None
|
|
|
|
async def _rdspdlname(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"spindle_names": self.store.snapshot()["spindle_names"]}, None
|
|
|
|
async def _rdparam(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
key = str(params.get("datano"))
|
|
parameter = self.store.snapshot()["parameters"].get(key)
|
|
if parameter is None:
|
|
return EW_NUMBER, None, f"parameter {key} is not mocked"
|
|
return EW_OK, {"param": {"datano": int(key), **parameter}}, None
|
|
|
|
async def _wrparam(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
incoming = params.get("param", params)
|
|
if not isinstance(incoming, dict):
|
|
return EW_PARAM, None, "cnc_wrparam requires parameter fields"
|
|
datano = incoming.get("datano", params.get("datano"))
|
|
if datano is None:
|
|
return EW_PARAM, None, "cnc_wrparam requires datano"
|
|
key = str(datano)
|
|
current = self.store.snapshot()["parameters"].get(key, {})
|
|
parameter = dict(current)
|
|
value = incoming.get("value", incoming.get("ldata", incoming.get("idata", incoming.get("cdata"))))
|
|
if value is not None:
|
|
parameter["value"] = value
|
|
parameter["type"] = incoming.get("type", parameter.get("type", "long"))
|
|
parameter["decimal"] = incoming.get("decimal", incoming.get("dec_val", parameter.get("decimal", 0)))
|
|
if "description" in incoming:
|
|
parameter["description"] = incoming["description"]
|
|
self.store.merge_patch({"parameters": {key: parameter}})
|
|
return EW_OK, {"param": {"datano": int(key), **parameter}}, None
|
|
|
|
async def _rdmacro(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
key = str(params.get("number", params.get("datano")))
|
|
macro = self.store.snapshot()["macros"].get(key)
|
|
if macro is None:
|
|
return EW_NUMBER, None, f"macro {key} is not mocked"
|
|
return EW_OK, {"macro": {"number": int(key), **macro}}, None
|
|
|
|
async def _wrmacro(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
incoming = params.get("macro", params)
|
|
if not isinstance(incoming, dict):
|
|
return EW_PARAM, None, "cnc_wrmacro requires macro fields"
|
|
number = incoming.get("number", incoming.get("datano", params.get("number", params.get("datano"))))
|
|
if number is None:
|
|
return EW_PARAM, None, "cnc_wrmacro requires number"
|
|
key = str(number)
|
|
current = self.store.snapshot()["macros"].get(key, {})
|
|
macro = dict(current)
|
|
value = incoming.get("value", incoming.get("mcr_val"))
|
|
if value is not None:
|
|
macro["value"] = value
|
|
macro["decimal"] = incoming.get("decimal", incoming.get("dec_val", macro.get("decimal", 0)))
|
|
self.store.merge_patch({"macros": {key: macro}})
|
|
return EW_OK, {"macro": {"number": int(key), **macro}}, None
|
|
|
|
async def _rdalmmsg2(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"alarms": self.store.snapshot()["alarms"]}, None
|
|
|
|
async def _pmc_rdpmcrng(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
area, data_type, start, end = self._pmc_range(params)
|
|
if start is None:
|
|
return EW_PARAM, None, "pmc_rdpmcrng requires start/datano_s"
|
|
pmc_state = self.store.snapshot().get("pmc", {})
|
|
area_state = pmc_state.get(area, {})
|
|
addresses = []
|
|
values = []
|
|
for address in range(start, end + 1):
|
|
entry = area_state.get(str(address), {"type": data_type, "value": 0})
|
|
if not isinstance(entry, dict):
|
|
entry = {"type": data_type, "value": entry}
|
|
value = entry.get("value", 0)
|
|
values.append(value)
|
|
addresses.append({"address": address, **entry})
|
|
return EW_OK, {
|
|
"pmc": {
|
|
"area": area,
|
|
"data_type": data_type,
|
|
"start": start,
|
|
"end": end,
|
|
"values": values,
|
|
"addresses": addresses,
|
|
}
|
|
}, None
|
|
|
|
async def _pmc_wrpmcrng(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
area, data_type, start, end = self._pmc_range(params)
|
|
if start is None:
|
|
return EW_PARAM, None, "pmc_wrpmcrng requires start/datano_s"
|
|
values = params.get("values", params.get("data", params.get("cdata", params.get("idata", params.get("ldata")))))
|
|
if values is None:
|
|
value = params.get("value")
|
|
values = [value] if value is not None else []
|
|
if not isinstance(values, list):
|
|
values = [values]
|
|
if not values:
|
|
return EW_PARAM, None, "pmc_wrpmcrng requires values"
|
|
if not any(key in params for key in ("end", "datano_e", "d", "length", "count", "e")):
|
|
end = start + len(values) - 1
|
|
|
|
patch = {"pmc": {area: {}}}
|
|
for offset, value in enumerate(values):
|
|
address = start + offset
|
|
if address > end:
|
|
break
|
|
patch["pmc"][area][str(address)] = {"type": data_type, "value": value}
|
|
state = self.store.merge_patch(patch)
|
|
area_state = state["pmc"].get(area, {})
|
|
addresses = [{"address": int(address), **entry} for address, entry in sorted(area_state.items(), key=lambda item: int(item[0])) if start <= int(address) <= end]
|
|
return EW_OK, {"pmc": {"area": area, "data_type": data_type, "start": start, "end": end, "addresses": addresses}}, None
|
|
|
|
async def _rdopmsg(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"operator_messages": self.store.snapshot()["operator_messages"]}, None
|
|
|
|
async def _rdopmode(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"operation_mode": self.store.snapshot()["operation_mode"]}, None
|
|
|
|
async def _rdprgnum(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
program = self.store.snapshot()["program"]
|
|
return EW_OK, {"program_numbers": {"current": program["current"], "main": program["main"]}}, None
|
|
|
|
async def _exeprgname2(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
program = self.store.snapshot()["program"]
|
|
return EW_OK, {"program_name": program["executing_path"]}, None
|
|
|
|
async def _rdexecprog(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
program = self.store.snapshot()["program"]
|
|
return EW_OK, {"program": program["executing"]}, None
|
|
|
|
async def _rdseqnum(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._program_value(params, "sequence", "sequence")
|
|
|
|
async def _rdblkcount(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._program_value(params, "block_count", "block_count")
|
|
|
|
async def _rdproginfo(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
program = self.store.snapshot()["program"]
|
|
current = next((item for item in program["directory"] if item["number"] == program["current"]), None)
|
|
return EW_OK, {"program_info": current or {}}, None
|
|
|
|
async def _rdprogdir3(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {"directory": self.store.snapshot()["program"]["directory"]}, None
|
|
|
|
async def _rdtimer(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
timers = self.store.snapshot()["timers"]
|
|
timer_kind = params.get("type")
|
|
if timer_kind is None:
|
|
return EW_OK, {"timers": timers}, None
|
|
key_map = {0: "power_on", 1: "operating", 2: "cutting", 3: "cycle"}
|
|
key = key_map.get(int(timer_kind))
|
|
if key is None:
|
|
return EW_PARAM, None, f"timer type {timer_kind} is not mapped"
|
|
return EW_OK, {"timer": {"type": int(timer_kind), "name": key, "value": timers[key]}}, None
|
|
|
|
async def _rdspmeter(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._spindle_group(params, "meter", "spindle_meter")
|
|
|
|
async def _rdsvmeter(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._spindle_group(params, "servo_meter", "servo_meter")
|
|
|
|
async def _rdspload(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._spindle_group(params, "load", "spindle_load")
|
|
|
|
async def _rdspgear(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._spindle_group(params, "gear", "spindle_gear")
|
|
|
|
async def _rdspmaxrpm(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
return await self._spindle_group(params, "max_rpm", "spindle_max_rpm")
|
|
|
|
async def _rddiagnum(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
numbers = sorted(int(key) for key in self.store.snapshot()["diagnostics"].keys())
|
|
return EW_OK, {"diagnostic_numbers": {"count": len(numbers), "min": numbers[0], "max": numbers[-1]}}, None
|
|
|
|
async def _rddiaginfo(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
diagnostics = self.store.snapshot()["diagnostics"]
|
|
return EW_OK, {"diagnostic_info": [{"number": int(key), **value} for key, value in diagnostics.items()]}, None
|
|
|
|
async def _diagnoss(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
key = str(params.get("datano"))
|
|
diag = self.store.snapshot()["diagnostics"].get(key)
|
|
if diag is None:
|
|
return EW_NUMBER, None, f"diagnostic {key} is not mocked"
|
|
return EW_OK, {"diagnostic": {"datano": int(key), **diag}}, None
|
|
|
|
async def _mock_get_state(self, params: dict[str, Any]) -> tuple[int, dict[str, Any], str | None]:
|
|
return EW_OK, {"profile": self.profile["profile_name"], "state": self.store.snapshot()}, None
|
|
|
|
async def _mock_patch(self, params: dict[str, Any]) -> tuple[int, dict[str, Any], str | None]:
|
|
state_patch = params.get("state", params)
|
|
if not isinstance(state_patch, dict):
|
|
return EW_PARAM, None, "mock_patch requires an object under 'state' or as params"
|
|
state = self.store.merge_patch(state_patch)
|
|
return EW_OK, {"profile": self.profile["profile_name"], "state": state}, None
|
|
|
|
async def _mock_reset(self, params: dict[str, Any]) -> tuple[int, dict[str, Any], str | None]:
|
|
self._cancel_alarm_schedule()
|
|
return EW_OK, {"profile": self.profile["profile_name"], "state": self.store.reset()}, None
|
|
|
|
async def _mock_load_profile(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
profile_name = params.get("profile")
|
|
if not isinstance(profile_name, str):
|
|
return EW_PARAM, None, "mock_load_profile requires a string 'profile'"
|
|
self._cancel_alarm_schedule()
|
|
self.profile = load_profile(profile_name)
|
|
self.store = MockDataStore(self.profile)
|
|
self._handles.clear()
|
|
self._next_handle = count(1)
|
|
return EW_OK, {"profile": self.profile["profile_name"]}, None
|
|
|
|
async def _mock_list_methods(self, params: dict[str, Any]) -> tuple[int, dict[str, Any], str | None]:
|
|
return EW_OK, {
|
|
"profile": self.profile["profile_name"],
|
|
"connection_methods": self.profile.get("connection_methods", []),
|
|
"mock_methods": self.profile.get("mock_methods", []),
|
|
}, None
|
|
|
|
async def _mock_schedule_alarms(self, params: dict[str, Any]) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
sequence = params.get("sequence")
|
|
if not isinstance(sequence, list):
|
|
return EW_PARAM, None, "mock_schedule_alarms requires a sequence list"
|
|
normalized = []
|
|
for item in sequence:
|
|
if not isinstance(item, dict):
|
|
return EW_PARAM, None, "each alarm schedule item must be an object"
|
|
at_ms = int(item.get("at_ms", 0))
|
|
alarms = item.get("alarms", [])
|
|
if not isinstance(alarms, list):
|
|
return EW_PARAM, None, "alarm schedule item alarms must be a list"
|
|
normalized.append({"at_ms": max(at_ms, 0), "alarms": alarms})
|
|
normalized.sort(key=lambda item: item["at_ms"])
|
|
self._cancel_alarm_schedule()
|
|
self._alarm_schedule_task = asyncio.create_task(self._run_alarm_schedule(normalized))
|
|
return EW_OK, {"scheduled": len(normalized)}, None
|
|
|
|
async def _run_alarm_schedule(self, sequence: list[dict[str, Any]]) -> None:
|
|
previous_ms = 0
|
|
try:
|
|
for item in sequence:
|
|
at_ms = int(item["at_ms"])
|
|
await asyncio.sleep(max(at_ms - previous_ms, 0) / 1000)
|
|
self.store.merge_patch({"alarms": item["alarms"]})
|
|
previous_ms = at_ms
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
def _cancel_alarm_schedule(self) -> None:
|
|
if self._alarm_schedule_task is not None:
|
|
self._alarm_schedule_task.cancel()
|
|
self._alarm_schedule_task = None
|
|
|
|
def _validate_handle(self, params: dict[str, Any]) -> int:
|
|
handle, session = self._require_handle(params)
|
|
if handle is None or session is None:
|
|
return EW_HANDLE
|
|
return EW_OK
|
|
|
|
def _pmc_range(self, params: dict[str, Any]) -> tuple[str, str, int | None, int]:
|
|
area = params.get("area", params.get("adr_type", params.get("type_a", params.get("a", "R"))))
|
|
data_type = params.get("data_type", params.get("type_d", params.get("b", "byte")))
|
|
area = self._pmc_area_name(area)
|
|
data_type = self._pmc_data_type(data_type)
|
|
start_value = params.get("start", params.get("datano_s", params.get("c")))
|
|
if start_value is None:
|
|
return str(area), str(data_type), None, 0
|
|
start = int(start_value)
|
|
if "end" in params:
|
|
end = int(params["end"])
|
|
elif "datano_e" in params:
|
|
end = int(params["datano_e"])
|
|
elif "d" in params:
|
|
end = int(params["d"])
|
|
else:
|
|
length = int(params.get("length", params.get("count", params.get("e", 1))))
|
|
end = start + max(length, 1) - 1
|
|
return str(area), str(data_type), start, end
|
|
|
|
def _pmc_area_name(self, area: Any) -> str:
|
|
if isinstance(area, int):
|
|
return {
|
|
0: "G",
|
|
1: "F",
|
|
2: "Y",
|
|
3: "X",
|
|
4: "A",
|
|
5: "R",
|
|
6: "T",
|
|
7: "K",
|
|
8: "C",
|
|
9: "D",
|
|
10: "E",
|
|
}.get(area, "R")
|
|
text = str(area).upper()
|
|
return text if text else "R"
|
|
|
|
def _pmc_data_type(self, data_type: Any) -> str:
|
|
if isinstance(data_type, int):
|
|
return {
|
|
0: "byte",
|
|
1: "word",
|
|
2: "long",
|
|
4: "real",
|
|
5: "double",
|
|
}.get(data_type, "byte")
|
|
text = str(data_type).lower()
|
|
return text if text else "byte"
|
|
|
|
async def _simple_handle_value(
|
|
self,
|
|
params: dict[str, Any],
|
|
state_key: str,
|
|
result_key: str,
|
|
) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {result_key: self.store.snapshot()[state_key]}, None
|
|
|
|
async def _program_value(
|
|
self,
|
|
params: dict[str, Any],
|
|
state_key: str,
|
|
result_key: str,
|
|
) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {result_key: self.store.snapshot()["program"][state_key]}, None
|
|
|
|
async def _spindle_group(
|
|
self,
|
|
params: dict[str, Any],
|
|
state_key: str,
|
|
result_key: str,
|
|
) -> tuple[int, dict[str, Any] | None, str | None]:
|
|
rc = self._validate_handle(params)
|
|
if rc:
|
|
return rc, None, "invalid library handle"
|
|
return EW_OK, {result_key: self.store.snapshot()["spindle"][state_key]}, None
|