From 6add4b4acc0acbb4a479e006bc96e854b9c9d784 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 04:50:10 -0400 Subject: [PATCH] Python client: port bulk read/write SDK methods + CLI subcommands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the .NET / Go ports of divergent branch commit f220908. HEAD's Session class had only the subscribe-style bulks; this commit adds the value-bulk SDK surface plus matching CLI subcommands and a bench-read-bulk harness. SDK (zb_mom_ww_mxgateway/session.py): - async def write_bulk(server_handle, entries, *, correlation_id="") → list[pb.BulkWriteResult] - async def write2_bulk(server_handle, entries, *, correlation_id="") → list[pb.BulkWriteResult] - async def write_secured_bulk(server_handle, entries, *, correlation_id="") → list[pb.BulkWriteResult] - async def write_secured2_bulk(server_handle, entries, *, correlation_id="") → list[pb.BulkWriteResult] - async def read_bulk(server_handle, tag_addresses, *, timeout_ms=0, correlation_id="") → list[pb.BulkReadResult] All five reuse the existing _ensure_bulk_size validator and route through the existing invoke() pipeline. read_bulk additionally enforces timeout_ms >= 0. CLI (zb_mom_ww_mxgateway_cli/commands.py): - read-bulk / write-bulk / write2-bulk / write-secured-bulk / write-secured2-bulk registered as click @main.command(...). The write families share a _build_write_bulk_entries() helper that parses --item-handles and --values with a single --type, validates count match, converts via to_mx_value, and assembles the correct per-entry proto message. - bench-read-bulk: opens its own session, subscribes to --bulk-size TestMachine_NNN.TestChangingInt tags, runs warmup then steady-state ReadBulk for --duration-seconds with time.perf_counter() latency capture, and emits the shared JSON schema (language, durationMs, totalCalls, successfulCalls, failedCalls, totalReadResults, cachedReadResults, callsPerSecond, latencyMs:{p50,p95,p99,max,mean}) so scripts/bench-read-bulk.ps1 collates Python alongside the four other clients. _percentile_summary + linear-interpolation _percentile helper match the Go / .NET implementations. to_mx_value is added to the existing values-module import line in commands.py since the bulk-write commands need it. Verification: python -m pip install -e . --quiet --no-deps; pytest 42/42 passing. Manual smoke against live gateway on localhost:5120: open-session → register → subscribe-bulk on two TestMachine_NNN.TestChangingInt tags (both wasSuccessful=true) → read-bulk (both wasSuccessful=true / wasCached=true / int32 values present) → close-session SESSION_STATE_CLOSED. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../python/src/zb_mom_ww_mxgateway/session.py | 132 +++++++ .../src/zb_mom_ww_mxgateway_cli/commands.py | 335 +++++++++++++++++- 2 files changed, 466 insertions(+), 1 deletion(-) diff --git a/clients/python/src/zb_mom_ww_mxgateway/session.py b/clients/python/src/zb_mom_ww_mxgateway/session.py index 75f647b..afde922 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/session.py +++ b/clients/python/src/zb_mom_ww_mxgateway/session.py @@ -334,6 +334,138 @@ class Session: ) return list(reply.unsubscribe_bulk.results) + async def write_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteBulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteBulk` and return one BulkWriteResult per entry. + + Per-entry MXAccess failures appear as results with ``was_successful = False`` + and a populated ``error_message`` / ``hresult``; this method does not raise + on per-entry failure, mirroring the existing add/advise bulk surface. + """ + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_BULK, + write_bulk=pb.WriteBulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_bulk.results) + + async def write2_bulk( + self, + server_handle: int, + entries: Sequence[pb.Write2BulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `Write2Bulk` (timestamped) and return per-entry results.""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE2_BULK, + write2_bulk=pb.Write2BulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write2_bulk.results) + + async def write_secured_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteSecuredBulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteSecuredBulk` — credential-sensitive values must not be logged.""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK, + write_secured_bulk=pb.WriteSecuredBulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_secured_bulk.results) + + async def write_secured2_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteSecured2BulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteSecured2Bulk` (timestamped + verified).""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK, + write_secured2_bulk=pb.WriteSecured2BulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_secured2_bulk.results) + + async def read_bulk( + self, + server_handle: int, + tag_addresses: Sequence[str], + *, + timeout_ms: int = 0, + correlation_id: str = "", + ) -> list[pb.BulkReadResult]: + """Invoke `ReadBulk` — snapshot the current value of each requested tag. + + MXAccess COM has no synchronous read; the worker returns the cached + ``OnDataChange`` value for any tag that is already advised (``was_cached = + True``) without modifying the existing subscription, and falls back to + a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle + otherwise. ``timeout_ms`` bounds the per-tag wait in the snapshot case; + pass ``0`` to use the worker default (1000 ms). + """ + if tag_addresses is None: + raise TypeError("tag_addresses is required") + _ensure_bulk_size("tag_addresses", len(tag_addresses)) + if timeout_ms < 0: + raise ValueError("timeout_ms must be non-negative") + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_READ_BULK, + read_bulk=pb.ReadBulkCommand( + server_handle=server_handle, + tag_addresses=tag_addresses, + timeout_ms=timeout_ms, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.read_bulk.results) + async def write( self, server_handle: int, diff --git a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py index f655987..fe05355 100644 --- a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py +++ b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py @@ -20,7 +20,7 @@ from zb_mom_ww_mxgateway.client import GatewayClient from zb_mom_ww_mxgateway.errors import MxGatewayError from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb from zb_mom_ww_mxgateway.options import ClientOptions -from zb_mom_ww_mxgateway.values import MxValueInput +from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value MAX_AGGREGATE_EVENTS = 10_000 @@ -263,6 +263,112 @@ def unsubscribe_bulk(**kwargs: Any) -> None: ) +@main.command("read-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.") +@click.option("--timeout-ms", default=0, type=int, show_default=True, + help="Per-tag snapshot timeout in milliseconds. 0 = worker default.") +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def read_bulk(**kwargs: Any) -> None: + """Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise.""" + + _run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteBulk — sequential Write per entry.""" + + _run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry.""" + + _run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecuredBulk — credential-sensitive.""" + + _run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive.""" + + _run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("bench-read-bulk") +@gateway_options +@click.option("--client-name", default="mxgw-python-bench", show_default=True) +@click.option("--duration-seconds", default=30, type=int, show_default=True) +@click.option("--warmup-seconds", default=3, type=int, show_default=True) +@click.option("--bulk-size", default=6, type=int, show_default=True) +@click.option("--tag-start", default=1, type=int, show_default=True) +@click.option("--tag-prefix", default="TestMachine_", show_default=True) +@click.option("--tag-attribute", default="TestChangingInt", show_default=True) +@click.option("--timeout-ms", default=1500, type=int, show_default=True) +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def bench_read_bulk(**kwargs: Any) -> None: + """Cross-language ReadBulk stress benchmark. + + Opens its own session, subscribes to bulk-size tags so the worker value + cache populates from real OnDataChange events, runs ReadBulk in a tight + loop for duration-seconds, and emits the shared JSON stats schema the + scripts/bench-read-bulk.ps1 driver collates across all five clients. + """ + + _run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -417,6 +523,233 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]: return {"results": [_message_dict(result) for result in results]} +async def _read_bulk(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.read_bulk( + kwargs["server_handle"], + _parse_string_list(kwargs["items"]), + timeout_ms=kwargs["timeout_ms"], + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +def _build_write_bulk_entries(kwargs: dict[str, Any]): + """Build (item_handle, MxValue) pairs for the bulk-write families. + + The CLI accepts a single ``--type`` plus ``--values`` (comma-separated + string-encoded values, one per ``--item-handles`` entry). Returns the + parsed item-handle list and the per-entry MxValue protobuf instances — + callers wrap these into the appropriate per-entry message type. + """ + + handles = _parse_int_list(kwargs["item_handles"]) + value_texts = _parse_string_list(kwargs["values"]) + if len(handles) != len(value_texts): + raise click.UsageError( + f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})", + ) + parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts] + values = [to_mx_value(v) for v in parsed] + return handles, values + + +async def _write_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.Write2BulkEntry( + item_handle=handle, + user_id=kwargs["user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteSecuredBulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.WriteSecured2BulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: + """ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema.""" + import time + + bulk_size = int(kwargs["bulk_size"]) + if bulk_size < 1: + raise click.UsageError("bulk-size must be positive") + duration_seconds = int(kwargs["duration_seconds"]) + warmup_seconds = int(kwargs["warmup_seconds"]) + tag_start = int(kwargs["tag_start"]) + tag_prefix = kwargs["tag_prefix"] + tag_attribute = kwargs["tag_attribute"] + timeout_ms = int(kwargs["timeout_ms"]) + client_name = kwargs["client_name"] + tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)] + + async with await _connect(kwargs) as client: + session = await client.open_session(client_session_name=client_name) + server_handle = 0 + item_handles: list[int] = [] + try: + server_handle = await session.register(client_name) + subscribe_results = await session.subscribe_bulk(server_handle, tags) + item_handles = [r.item_handle for r in subscribe_results if r.was_successful] + + # Warm-up window so JIT / connection pool / first-call costs are + # amortised before the measurement window opens. + warmup_deadline = time.perf_counter() + warmup_seconds + while time.perf_counter() < warmup_deadline: + await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + + latencies_ms: list[float] = [] + total_results = 0 + cached_results = 0 + successful = 0 + failed = 0 + steady_start = time.perf_counter() + steady_deadline = steady_start + duration_seconds + while time.perf_counter() < steady_deadline: + call_start = time.perf_counter() + try: + results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + except Exception: + failed += 1 + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + continue + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + successful += 1 + for r in results: + total_results += 1 + if r.was_cached: + cached_results += 1 + steady_elapsed = time.perf_counter() - steady_start + total_calls = successful + failed + calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0 + finally: + if item_handles: + try: + await session.unsubscribe_bulk(server_handle, item_handles) + except Exception: + pass + try: + await session.close() + except Exception: + pass + + return { + "language": "python", + "command": "bench-read-bulk", + "endpoint": kwargs.get("endpoint"), + "clientName": client_name, + "bulkSize": bulk_size, + "durationSeconds": duration_seconds, + "warmupSeconds": warmup_seconds, + "durationMs": int(steady_elapsed * 1000), + "tags": tags, + "totalCalls": total_calls, + "successfulCalls": successful, + "failedCalls": failed, + "totalReadResults": total_results, + "cachedReadResults": cached_results, + "callsPerSecond": round(calls_per_second, 2), + "latencyMs": _percentile_summary(latencies_ms), + } + + +def _percentile_summary(sample: list[float]) -> dict[str, float]: + if not sample: + return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0} + sorted_sample = sorted(sample) + return { + "p50": round(_percentile(sorted_sample, 0.50), 3), + "p95": round(_percentile(sorted_sample, 0.95), 3), + "p99": round(_percentile(sorted_sample, 0.99), 3), + "max": round(sorted_sample[-1], 3), + "mean": round(sum(sample) / len(sample), 3), + } + + +def _percentile(sorted_sample: list[float], quantile: float) -> float: + """Nearest-rank with linear interpolation; matches every other client.""" + n = len(sorted_sample) + if n == 0: + return 0.0 + if n == 1: + return sorted_sample[0] + rank = quantile * (n - 1) + lower = int(rank) + upper = min(lower + 1, n - 1) + fraction = rank - lower + return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction + + async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"])