"""Command line interface for the MXAccess Gateway Python client.""" from __future__ import annotations import asyncio import json import os import sys from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import Any import click from click.testing import CliRunner from google.protobuf.json_format import MessageToDict from zb_mom_ww_mxgateway import __version__ from zb_mom_ww_mxgateway.auth import redact_secret from zb_mom_ww_mxgateway.client import GatewayClient from zb_mom_ww_mxgateway.errors import MxGatewayError from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb from zb_mom_ww_mxgateway.options import ClientOptions from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value MAX_AGGREGATE_EVENTS = 10_000 _BATCH_EOR = "__MXGW_BATCH_EOR__" @click.group() def main() -> None: """MXAccess Gateway Python test CLI.""" @main.command() @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def version(output_json: bool) -> None: """Print client package version information.""" payload = { "client": "mxgw-py", "package": "mxaccess-gateway-client", "version": __version__, } _emit(payload, output_json=output_json, text=f"mxgw-py {__version__}") @main.command() def batch() -> None: """Read commands from stdin and execute each, writing output + __MXGW_BATCH_EOR__ after each. Each non-empty line of stdin is a complete argument string (no quoting support — the harness never passes whitespace-containing arguments). Lines are split on runs of ASCII whitespace and dispatched through the normal CLI parser. On EOF or an empty line, exit 0. Errors do NOT terminate the loop. Each command's output (including any error JSON) is written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``. """ runner = CliRunner() for raw_line in sys.stdin: line = raw_line.rstrip("\n").rstrip("\r") if not line: # Empty line signals clean exit (matches the spec and .NET behaviour). break args = line.split() try: result = runner.invoke(main, args, catch_exceptions=True) except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die _batch_write_error(exc.__class__.__name__, str(exc)) _batch_flush_eor() continue if result.exit_code == 0: # Normal success — write captured output as-is. sys.stdout.write(result.output) else: # Something went wrong. If the command already emitted a JSON object # (e.g. the output starts with '{'), trust that and relay it verbatim. # Otherwise synthesise the standard {"error": ..., "type": ...} shape. output = result.output or "" exc = result.exception if output.lstrip().startswith("{"): # Already JSON — relay verbatim (may or may not end with newline). sys.stdout.write(output) if not output.endswith("\n"): sys.stdout.write("\n") elif exc is not None and not isinstance(exc, SystemExit): _batch_write_error(type(exc).__name__, str(exc)) else: # Click's default error format is "Error: \n"; extract the # message so the harness gets clean JSON. msg = output.strip() if msg.startswith("Error: "): msg = msg[len("Error: "):] exc_type = ( type(exc).__name__ if exc is not None and not isinstance(exc, SystemExit) else "CliError" ) _batch_write_error(exc_type, msg) _batch_flush_eor() def _batch_write_error(exc_type: str, message: str) -> None: """Write a JSON error record to stdout in the standard batch error shape.""" sys.stdout.write(json.dumps({"error": message, "type": exc_type}) + "\n") def _batch_flush_eor() -> None: """Write the end-of-record sentinel and flush stdout.""" sys.stdout.write(_BATCH_EOR + "\n") sys.stdout.flush() def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]: """Apply the shared gateway connection options to a Click command.""" command = click.option("--endpoint", default="localhost:5000", show_default=True)(command) command = click.option("--api-key", default=None, help="Gateway API key.")(command) command = click.option( "--api-key-env", default=None, help="Environment variable containing the gateway API key.", )(command) command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command) command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command) command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command) command = click.option( "--server-name-override", default=None, help="TLS server name override for test environments.", )(command) command = click.option("--call-timeout", default=30.0, type=float, show_default=True)(command) command = click.option("--stream-timeout", default=None, type=float)(command) return command @main.command("open-session") @gateway_options @click.option("--client-name", default="", help="Client session name.") @click.option("--requested-backend", default="", help="Requested backend name.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def open_session(**kwargs: Any) -> None: """Open a gateway session.""" _run( _open_session(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("close-session") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def close_session(**kwargs: Any) -> None: """Close a gateway session.""" _run( _close_session(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--message", default="ping", show_default=True, help="Ping payload.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def ping(**kwargs: Any) -> None: """Send a diagnostic ping command.""" _run(_ping(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--client-name", required=True, help="MXAccess client name.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def register(**kwargs: Any) -> None: """Invoke MXAccess Register.""" _run( _register(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("add-item") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item", required=True, help="MXAccess item definition.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def add_item(**kwargs: Any) -> None: """Invoke MXAccess AddItem.""" _run( _add_item(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def advise(**kwargs: Any) -> None: """Invoke MXAccess Advise.""" _run(_advise(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("subscribe-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--items", required=True, help="Comma-separated MXAccess item definitions.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def subscribe_bulk(**kwargs: Any) -> None: """Invoke MXAccess SubscribeBulk.""" _run( _subscribe_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("unsubscribe-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def unsubscribe_bulk(**kwargs: Any) -> None: """Invoke MXAccess UnsubscribeBulk.""" _run( _unsubscribe_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("read-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.") @click.option("--timeout-ms", default=0, type=int, show_default=True, help="Per-tag snapshot timeout in milliseconds. 0 = worker default.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def read_bulk(**kwargs: Any) -> None: """Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise.""" _run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("write-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--values", required=True, help="Comma-separated values, one per item handle.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write_bulk(**kwargs: Any) -> None: """Invoke MXAccess WriteBulk — sequential Write per entry.""" _run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("write2-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--values", required=True, help="Comma-separated values, one per item handle.") @click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write2_bulk(**kwargs: Any) -> None: """Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry.""" _run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("write-secured-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--values", required=True, help="Comma-separated values, one per item handle.") @click.option("--current-user-id", default=0, type=int, show_default=True) @click.option("--verifier-user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write_secured_bulk(**kwargs: Any) -> None: """Invoke MXAccess WriteSecuredBulk — credential-sensitive.""" _run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("write-secured2-bulk") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--values", required=True, help="Comma-separated values, one per item handle.") @click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") @click.option("--current-user-id", default=0, type=int, show_default=True) @click.option("--verifier-user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write_secured2_bulk(**kwargs: Any) -> None: """Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive.""" _run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("bench-read-bulk") @gateway_options @click.option("--client-name", default="mxgw-python-bench", show_default=True) @click.option("--duration-seconds", default=30, type=int, show_default=True) @click.option("--warmup-seconds", default=3, type=int, show_default=True) @click.option("--bulk-size", default=6, type=int, show_default=True) @click.option("--tag-start", default=1, type=int, show_default=True) @click.option("--tag-prefix", default="TestMachine_", show_default=True) @click.option("--tag-attribute", default="TestChangingInt", show_default=True) @click.option("--timeout-ms", default=1500, type=int, show_default=True) @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def bench_read_bulk(**kwargs: Any) -> None: """Cross-language ReadBulk stress benchmark. Opens its own session, subscribes to bulk-size tags so the worker value cache populates from real OnDataChange events, runs ReadBulk in a tight loop for duration-seconds, and emits the shared JSON stats schema the scripts/bench-read-bulk.ps1 driver collates across all five clients. """ _run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--after-worker-sequence", default=0, type=int, show_default=True) @click.option("--max-events", default=1, type=int, show_default=True) @click.option("--timeout", default=5.0, type=float, show_default=True) @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def stream_events(**kwargs: Any) -> None: """Stream a bounded number of events.""" _run( _stream_events(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("stream-alarms") @gateway_options @click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.") @click.option("--max-messages", default=1, type=int, show_default=True) @click.option("--timeout", default=5.0, type=float, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def stream_alarms(**kwargs: Any) -> None: """Stream a bounded number of messages from the gateway's central alarm feed.""" _run( _stream_alarms(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command("acknowledge-alarm") @gateway_options @click.option("--reference", required=True, help="Alarm full reference to acknowledge.") @click.option("--comment", default="", help="Acknowledgement comment.") @click.option("--operator", default="", help="Operator user name.") @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def acknowledge_alarm(**kwargs: Any) -> None: """Acknowledge an active MXAccess alarm condition (session-less).""" _run( _acknowledge_alarm(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs), ) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--value", required=True, help="Value to write.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write(**kwargs: Any) -> None: """Invoke MXAccess Write.""" _run(_write(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") @click.option("--item-handle", required=True, type=int, help="MXAccess item handle.") @click.option("--type", "value_type", default="string", show_default=True) @click.option("--value", required=True, help="Value to write.") @click.option("--timestamp", required=True, help="ISO-8601 timestamp value.") @click.option("--user-id", default=0, type=int, show_default=True) @click.option("--correlation-id", default="", help="Client correlation id.") @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def write2(**kwargs: Any) -> None: """Invoke MXAccess Write2.""" _run(_write2(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) @main.command() @gateway_options @click.option("--client-name", default="mxgw-py-smoke", show_default=True) @click.option("--item", required=True, help="MXAccess item definition.") @click.option("--max-events", default=1, type=int, show_default=True) @click.option("--timeout", default=5.0, type=float, show_default=True) @click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") def smoke(**kwargs: Any) -> None: """Run a bounded open/register/add/advise/stream/close smoke flow.""" _run(_smoke(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) async def _open_session(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.open_session_raw( pb.OpenSessionRequest( requested_backend=kwargs["requested_backend"], client_session_name=kwargs["client_name"], client_correlation_id=kwargs["correlation_id"], ), ) return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)} async def _close_session(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.close_session_raw( pb.CloseSessionRequest( session_id=kwargs["session_id"], client_correlation_id=kwargs["correlation_id"], ), ) return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)} async def _ping(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.invoke_raw( pb.MxCommandRequest( session_id=kwargs["session_id"], command=pb.MxCommand( kind=pb.MX_COMMAND_KIND_PING, ping=pb.PingCommand(message=kwargs["message"]), ), ), ) return {"kind": "ping", "rawReply": _message_dict(reply)} async def _register(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) server_handle = await session.register( kwargs["client_name"], correlation_id=kwargs["correlation_id"], ) return {"serverHandle": server_handle} async def _add_item(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) item_handle = await session.add_item( kwargs["server_handle"], kwargs["item"], correlation_id=kwargs["correlation_id"], ) return {"itemHandle": item_handle} async def _advise(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.advise( kwargs["server_handle"], kwargs["item_handle"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _subscribe_bulk(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.subscribe_bulk( kwargs["server_handle"], _parse_string_list(kwargs["items"]), correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.unsubscribe_bulk( kwargs["server_handle"], _parse_int_list(kwargs["item_handles"]), correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _read_bulk(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.read_bulk( kwargs["server_handle"], _parse_string_list(kwargs["items"]), timeout_ms=kwargs["timeout_ms"], correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} def _build_write_bulk_entries(kwargs: dict[str, Any]): """Build (item_handle, MxValue) pairs for the bulk-write families. The CLI accepts a single ``--type`` plus ``--values`` (comma-separated string-encoded values, one per ``--item-handles`` entry). Returns the parsed item-handle list and the per-entry MxValue protobuf instances — callers wrap these into the appropriate per-entry message type. """ handles = _parse_int_list(kwargs["item_handles"]) value_texts = _parse_string_list(kwargs["values"]) if len(handles) != len(value_texts): raise click.UsageError( f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})", ) parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts] values = [to_mx_value(v) for v in parsed] return handles, values async def _write_bulk(**kwargs: Any) -> dict[str, Any]: handles, values = _build_write_bulk_entries(kwargs) entries = [ pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value) for handle, value in zip(handles, values) ] async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.write_bulk( kwargs["server_handle"], entries, correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _write2_bulk(**kwargs: Any) -> dict[str, Any]: handles, values = _build_write_bulk_entries(kwargs) timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) entries = [ pb.Write2BulkEntry( item_handle=handle, user_id=kwargs["user_id"], value=value, timestamp_value=timestamp_value, ) for handle, value in zip(handles, values) ] async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.write2_bulk( kwargs["server_handle"], entries, correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]: handles, values = _build_write_bulk_entries(kwargs) entries = [ pb.WriteSecuredBulkEntry( item_handle=handle, current_user_id=kwargs["current_user_id"], verifier_user_id=kwargs["verifier_user_id"], value=value, ) for handle, value in zip(handles, values) ] async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.write_secured_bulk( kwargs["server_handle"], entries, correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: handles, values = _build_write_bulk_entries(kwargs) timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) entries = [ pb.WriteSecured2BulkEntry( item_handle=handle, current_user_id=kwargs["current_user_id"], verifier_user_id=kwargs["verifier_user_id"], value=value, timestamp_value=timestamp_value, ) for handle, value in zip(handles, values) ] async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) results = await session.write_secured2_bulk( kwargs["server_handle"], entries, correlation_id=kwargs["correlation_id"], ) return {"results": [_message_dict(result) for result in results]} async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: """ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema.""" import time bulk_size = int(kwargs["bulk_size"]) if bulk_size < 1: raise click.UsageError("bulk-size must be positive") duration_seconds = int(kwargs["duration_seconds"]) warmup_seconds = int(kwargs["warmup_seconds"]) tag_start = int(kwargs["tag_start"]) tag_prefix = kwargs["tag_prefix"] tag_attribute = kwargs["tag_attribute"] timeout_ms = int(kwargs["timeout_ms"]) client_name = kwargs["client_name"] tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)] async with await _connect(kwargs) as client: session = await client.open_session(client_session_name=client_name) server_handle = 0 item_handles: list[int] = [] try: server_handle = await session.register(client_name) subscribe_results = await session.subscribe_bulk(server_handle, tags) item_handles = [r.item_handle for r in subscribe_results if r.was_successful] # Warm-up window so JIT / connection pool / first-call costs are # amortised before the measurement window opens. warmup_deadline = time.perf_counter() + warmup_seconds while time.perf_counter() < warmup_deadline: await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) latencies_ms: list[float] = [] total_results = 0 cached_results = 0 successful = 0 failed = 0 steady_start = time.perf_counter() steady_deadline = steady_start + duration_seconds while time.perf_counter() < steady_deadline: call_start = time.perf_counter() try: results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) except Exception: failed += 1 latencies_ms.append((time.perf_counter() - call_start) * 1000.0) continue latencies_ms.append((time.perf_counter() - call_start) * 1000.0) successful += 1 for r in results: total_results += 1 if r.was_cached: cached_results += 1 steady_elapsed = time.perf_counter() - steady_start total_calls = successful + failed calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0 finally: if item_handles: try: await session.unsubscribe_bulk(server_handle, item_handles) except Exception: pass try: await session.close() except Exception: pass return { "language": "python", "command": "bench-read-bulk", "endpoint": kwargs.get("endpoint"), "clientName": client_name, "bulkSize": bulk_size, "durationSeconds": duration_seconds, "warmupSeconds": warmup_seconds, "durationMs": int(steady_elapsed * 1000), "tags": tags, "totalCalls": total_calls, "successfulCalls": successful, "failedCalls": failed, "totalReadResults": total_results, "cachedReadResults": cached_results, "callsPerSecond": round(calls_per_second, 2), "latencyMs": _percentile_summary(latencies_ms), } def _percentile_summary(sample: list[float]) -> dict[str, float]: if not sample: return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0} sorted_sample = sorted(sample) return { "p50": round(_percentile(sorted_sample, 0.50), 3), "p95": round(_percentile(sorted_sample, 0.95), 3), "p99": round(_percentile(sorted_sample, 0.99), 3), "max": round(sorted_sample[-1], 3), "mean": round(sum(sample) / len(sample), 3), } def _percentile(sorted_sample: list[float], quantile: float) -> float: """Nearest-rank with linear interpolation; matches every other client.""" n = len(sorted_sample) if n == 0: return 0.0 if n == 1: return sorted_sample[0] rank = quantile * (n - 1) lower = int(rank) upper = min(lower + 1, n - 1) fraction = rank - lower return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) events = await _collect_events( session.stream_events(after_worker_sequence=kwargs["after_worker_sequence"]), max_events=kwargs["max_events"], timeout=kwargs["timeout"], ) return {"events": [_message_dict(event) for event in events]} async def _stream_alarms(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: messages = await _collect_alarm_messages( client.stream_alarms( pb.StreamAlarmsRequest( client_correlation_id=kwargs["correlation_id"], alarm_filter_prefix=kwargs["filter_prefix"], ), ), max_messages=kwargs["max_messages"], timeout=kwargs["timeout"], ) return {"messages": [_message_dict(message) for message in messages]} async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: reply = await client.acknowledge_alarm( pb.AcknowledgeAlarmRequest( client_correlation_id=kwargs["correlation_id"], alarm_full_reference=kwargs["reference"], comment=kwargs["comment"], operator_user=kwargs["operator"], ), ) return _message_dict(reply) async def _write(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.write( kwargs["server_handle"], kwargs["item_handle"], value, user_id=kwargs["user_id"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _write2(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) timestamp = _parse_datetime(kwargs["timestamp"]) async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) await session.write2( kwargs["server_handle"], kwargs["item_handle"], value, timestamp, user_id=kwargs["user_id"], correlation_id=kwargs["correlation_id"], ) return {"ok": True} async def _smoke(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = await client.open_session(client_session_name=kwargs["client_name"]) closed = False try: server_handle = await session.register(kwargs["client_name"]) item_handle = await session.add_item(server_handle, kwargs["item"]) await session.advise(server_handle, item_handle) events = await _collect_events( session.stream_events(), max_events=kwargs["max_events"], timeout=kwargs["timeout"], ) return { "sessionId": session.session_id, "serverHandle": server_handle, "itemHandle": item_handle, "events": [_message_dict(event) for event in events], } finally: if not closed: await session.close() async def _connect(kwargs: dict[str, Any]) -> GatewayClient: api_key = kwargs.get("api_key") or _api_key_from_env(kwargs.get("api_key_env")) return await GatewayClient.connect( ClientOptions( endpoint=kwargs["endpoint"], api_key=api_key, plaintext=_use_plaintext(kwargs), ca_file=kwargs.get("ca_file"), server_name_override=kwargs.get("server_name_override"), call_timeout=kwargs.get("call_timeout"), stream_timeout=kwargs.get("stream_timeout"), ), ) def _session(client: GatewayClient, session_id: str): from zb_mom_ww_mxgateway.session import Session return Session(client=client, session_id=session_id) def _use_plaintext(kwargs: dict[str, Any]) -> bool: if kwargs.get("use_tls"): return False if kwargs.get("plaintext"): return True return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:") def _api_key_from_env(name: str | None) -> str | None: if not name: return None return os.environ.get(name) def _secrets(kwargs: dict[str, Any]) -> list[str | None]: return [ kwargs.get("api_key"), _api_key_from_env(kwargs.get("api_key_env")), ] def _run( awaitable: Awaitable[dict[str, Any]], *, output_json: bool, secrets: list[str | None], ) -> None: try: payload = asyncio.run(awaitable) except MxGatewayError as error: raise click.ClickException(redact_secret(str(error), secrets)) from error _emit(payload, output_json=output_json) def _emit( payload: dict[str, Any], *, output_json: bool, text: str | None = None, ) -> None: if output_json: click.echo(json.dumps(payload, sort_keys=True)) return click.echo(text or json.dumps(payload, sort_keys=True)) async def _collect_events( events: Any, *, max_events: int, timeout: float, ) -> list[pb.MxEvent]: if max_events > MAX_AGGREGATE_EVENTS: raise click.BadParameter( f"must be less than or equal to {MAX_AGGREGATE_EVENTS}", param_hint="--max-events", ) collected: list[pb.MxEvent] = [] iterator = events.__aiter__() try: while len(collected) < max_events: collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) except StopAsyncIteration: pass except asyncio.TimeoutError: pass finally: close = getattr(iterator, "aclose", None) if close is not None: await close() return collected async def _collect_alarm_messages( messages: Any, *, max_messages: int, timeout: float, ) -> list[pb.AlarmFeedMessage]: if max_messages > MAX_AGGREGATE_EVENTS: raise click.BadParameter( f"must be less than or equal to {MAX_AGGREGATE_EVENTS}", param_hint="--max-messages", ) collected: list[pb.AlarmFeedMessage] = [] iterator = messages.__aiter__() try: while len(collected) < max_messages: collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) except StopAsyncIteration: pass except asyncio.TimeoutError: pass finally: close = getattr(iterator, "aclose", None) if close is not None: await close() return collected def _parse_value(raw_value: str, value_type: str) -> MxValueInput: normalized = value_type.lower() if normalized == "bool": return raw_value.lower() in ("1", "true", "yes", "on") if normalized in ("int", "int32", "int64"): return int(raw_value) if normalized in ("float", "double"): return float(raw_value) if normalized in ("time", "timestamp"): return _parse_datetime(raw_value) if normalized == "raw": return raw_value.encode("utf-8") if normalized == "string": return raw_value raise click.BadParameter(f"unsupported value type: {value_type}", param_hint="--type") def _parse_datetime(raw_value: str) -> datetime: if raw_value.endswith("Z"): raw_value = raw_value[:-1] + "+00:00" parsed = datetime.fromisoformat(raw_value) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed def _parse_string_list(raw_value: str) -> list[str]: values = [item.strip() for item in raw_value.split(",") if item.strip()] if not values: raise click.BadParameter("at least one item is required", param_hint="--items") return values def _parse_int_list(raw_value: str) -> list[int]: values = [item.strip() for item in raw_value.split(",") if item.strip()] if not values: raise click.BadParameter("at least one item handle is required", param_hint="--item-handles") return [int(item) for item in values] def _message_dict(message: Any) -> dict[str, Any]: return MessageToDict( message, preserving_proto_field_name=False, use_integers_for_enums=False, )