"""Command line interface for the MXAccess Gateway Python client.""" from __future__ import annotations import asyncio import json import os from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import Any import click from google.protobuf.json_format import MessageToDict from mxgateway import __version__ from mxgateway.auth import redact_secret from mxgateway.client import GatewayClient from mxgateway.errors import MxGatewayError from mxgateway.generated import mxaccess_gateway_pb2 as pb from mxgateway.options import ClientOptions from mxgateway.values import MxValueInput @click.group() def main() -> None: """MXAccess Gateway Python test CLI.""" @main.command() @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def version(output_json: bool) -> None: """Print client package version information.""" payload = { "client": "mxgw-py", "package": "mxaccess-gateway-client", "version": __version__, } _emit(payload, output_json=output_json, text=f"mxgw-py {__version__}") def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]: command = click.option("--endpoint", default="localhost:5000", show_default=True)(command) command = click.option("--api-key", default=None, help="Gateway API key.")(command) command = click.option( "--api-key-env", default=None, help="Environment variable containing the gateway API key.", )(command) command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command) command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command) command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command) command = click.option( "--server-name-override", default=None, help="TLS server name override for test environments.", )(command) return command @main.command("open-session") @gateway_options @click.option("--client-name", default="", help="Client session name.") @click.option("--requested-backend", default="", help="Requested backend name.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def open_session(**kwargs: Any) -> None: """Open a gateway session.""" _run( _open_session(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("close-session") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def close_session(**kwargs: Any) -> None: """Close a gateway session.""" _run( _close_session(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--message", default="ping", show_default=True, help="Ping payload.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def ping(**kwargs: Any) -> None: """Send a diagnostic ping command.""" _run(_ping(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--client-name", required=True, help="MXAccess client name.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def register(**kwargs: Any) -> None: """Invoke MXAccess Register.""" _run( _register(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("add-item") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item", required=True, help="MXAccess item definition.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def add_item(**kwargs: Any) -> None: """Invoke MXAccess AddItem.""" _run( _add_item(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def advise(**kwargs: Any) -> None: """Invoke MXAccess Advise.""" _run(_advise(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--after-worker-sequence", default=0, type=int, show_default=True) @click.option("--max-events", default=1, type=int, show_default=True) @click.option("--timeout", default=5.0, type=float, show_default=True) @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def stream_events(**kwargs: Any) -> None: """Stream a bounded number of events.""" _run( _stream_events(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--value", required=True, help="Value to write.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write(**kwargs: Any) -> None: """Invoke MXAccess Write.""" _run(_write(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--value", required=True, help="Value to write.") @click.option("--timestamp", required=True, help="ISO-8601 timestamp value.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write2(**kwargs: Any) -> None: """Invoke MXAccess Write2.""" _run(_write2(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--client-name", default="mxgw-py-smoke", show_default=True) @click.option("--item", required=True, help="MXAccess item definition.") @click.option("--max-events", default=1, type=int, show_default=True) @click.option("--timeout", default=5.0, type=float, show_default=True) @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def smoke(**kwargs: Any) -> None: """Run a bounded open/register/add/advise/stream/close smoke flow.""" _run(_smoke(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) async def _open_session(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.open_session_raw( pb.OpenSessionRequest( requested_backend=kwargs["requested_backend"], client_session_name=kwargs["client_name"], client_correlation_id=kwargs["correlation_id"], ), ) return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)} async def _close_session(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.close_session_raw( pb.CloseSessionRequest( session_id=kwargs["session_id"], client_correlation_id=kwargs["correlation_id"], ), ) return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)} async def _ping(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.invoke_raw( pb.MxCommandRequest( session_id=kwargs["session_id"], command=pb.MxCommand( kind=pb.MX_COMMAND_KIND_PING, ping=pb.PingCommand(message=kwargs["message"]), ), ), ) return {"kind": "ping", "rawReply": _message_dict(reply)} async def _register(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) server_handle = await session.register( kwargs["client_name"], correlation_id=kwargs["correlation_id"], ) return {"serverHandle": server_handle} async def _add_item(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) item_handle = await session.add_item( kwargs["server_handle"], kwargs["item"], correlation_id=kwargs["correlation_id"], ) return {"itemHandle": item_handle} async def _advise(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.advise( kwargs["server_handle"], kwargs["item_handle"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) events = await _collect_events( session.stream_events(after_worker_sequence=kwargs["after_worker_sequence"]), max_events=kwargs["max_events"], timeout=kwargs["timeout"], ) return {"events": [_message_dict(event) for event in events]} async def _write(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.write( kwargs["server_handle"], kwargs["item_handle"], value, user_id=kwargs["user_id"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _write2(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) timestamp = _parse_datetime(kwargs["timestamp"]) async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.write2( kwargs["server_handle"], kwargs["item_handle"], value, timestamp, user_id=kwargs["user_id"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _smoke(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = await client.open_session(client_session_name=kwargs["client_name"]) closed = False try: server_handle = await session.register(kwargs["client_name"]) item_handle = await session.add_item(server_handle, kwargs["item"]) await session.advise(server_handle, item_handle) events = await _collect_events( session.stream_events(), max_events=kwargs["max_events"], timeout=kwargs["timeout"], ) return { "sessionId": session.session_id, "serverHandle": server_handle, "itemHandle": item_handle, "events": [_message_dict(event) for event in events], } finally: if not closed: await session.close() async def _connect(kwargs: dict[str, Any]) -> GatewayClient: api_key = kwargs.get("api_key") or _api_key_from_env(kwargs.get("api_key_env")) return await GatewayClient.connect( ClientOptions( endpoint=kwargs["endpoint"], api_key=api_key, plaintext=_use_plaintext(kwargs), ca_file=kwargs.get("ca_file"), server_name_override=kwargs.get("server_name_override"), ), ) def _session(client: GatewayClient, session_id: str): from mxgateway.session import Session return Session(client=client, session_id=session_id) def _use_plaintext(kwargs: dict[str, Any]) -> bool: if kwargs.get("use_tls"): return False if kwargs.get("plaintext"): return True return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:") def _api_key_from_env(name: str | None) -> str | None: if not name: return None return os.environ.get(name) def _secrets(kwargs: dict[str, Any]) -> list[str | None]: return [ kwargs.get("api_key"), _api_key_from_env(kwargs.get("api_key_env")), ] def _run( awaitable: Awaitable[dict[str, Any]], *, output_json: bool, secrets: list[str | None], ) -> None: try: payload = asyncio.run(awaitable) except MxGatewayError as error: raise click.ClickException(redact_secret(str(error), secrets)) from error _emit(payload, output_json=output_json) def _emit( payload: dict[str, Any], *, output_json: bool, text: str | None = None, ) -> None: if output_json: click.echo(json.dumps(payload, sort_keys=True)) return click.echo(text or json.dumps(payload, sort_keys=True)) async def _collect_events( events: Any, *, max_events: int, timeout: float, ) -> list[pb.MxEvent]: collected: list[pb.MxEvent] = [] iterator = events.__aiter__() try: while len(collected) < max_events: collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) except StopAsyncIteration: pass finally: close = getattr(iterator, "aclose", None) if close is not None: await close() return collected def _parse_value(raw_value: str, value_type: str) -> MxValueInput: normalized = value_type.lower() if normalized == "bool": return raw_value.lower() in ("1", "true", "yes", "on") if normalized in ("int", "int32", "int64"): return int(raw_value) if normalized in ("float", "double"): return float(raw_value) if normalized in ("time", "timestamp"): return _parse_datetime(raw_value) if normalized == "raw": return raw_value.encode("utf-8") if normalized == "string": return raw_value raise click.BadParameter(f"unsupported value type: {value_type}", param_hint="--type") def _parse_datetime(raw_value: str) -> datetime: if raw_value.endswith("Z"): raw_value = raw_value[:-1] + "+00:00" parsed = datetime.fromisoformat(raw_value) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed def _message_dict(message: Any) -> dict[str, Any]: return MessageToDict( message, preserving_proto_field_name=False, use_integers_for_enums=False, )