diff --git a/clients/python/src/zb_mom_ww_mxgateway/session.py b/clients/python/src/zb_mom_ww_mxgateway/session.py index 75f647b..afde922 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/session.py +++ b/clients/python/src/zb_mom_ww_mxgateway/session.py @@ -334,6 +334,138 @@ class Session: ) return list(reply.unsubscribe_bulk.results) + async def write_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteBulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteBulk` and return one BulkWriteResult per entry. + + Per-entry MXAccess failures appear as results with ``was_successful = False`` + and a populated ``error_message`` / ``hresult``; this method does not raise + on per-entry failure, mirroring the existing add/advise bulk surface. + """ + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_BULK, + write_bulk=pb.WriteBulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_bulk.results) + + async def write2_bulk( + self, + server_handle: int, + entries: Sequence[pb.Write2BulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `Write2Bulk` (timestamped) and return per-entry results.""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE2_BULK, + write2_bulk=pb.Write2BulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write2_bulk.results) + + async def write_secured_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteSecuredBulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteSecuredBulk` — credential-sensitive values must not be logged.""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK, + write_secured_bulk=pb.WriteSecuredBulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_secured_bulk.results) + + async def write_secured2_bulk( + self, + server_handle: int, + entries: Sequence[pb.WriteSecured2BulkEntry], + *, + correlation_id: str = "", + ) -> list[pb.BulkWriteResult]: + """Invoke MXAccess `WriteSecured2Bulk` (timestamped + verified).""" + if entries is None: + raise TypeError("entries is required") + _ensure_bulk_size("entries", len(entries)) + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK, + write_secured2_bulk=pb.WriteSecured2BulkCommand( + server_handle=server_handle, + entries=entries, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.write_secured2_bulk.results) + + async def read_bulk( + self, + server_handle: int, + tag_addresses: Sequence[str], + *, + timeout_ms: int = 0, + correlation_id: str = "", + ) -> list[pb.BulkReadResult]: + """Invoke `ReadBulk` — snapshot the current value of each requested tag. + + MXAccess COM has no synchronous read; the worker returns the cached + ``OnDataChange`` value for any tag that is already advised (``was_cached = + True``) without modifying the existing subscription, and falls back to + a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle + otherwise. ``timeout_ms`` bounds the per-tag wait in the snapshot case; + pass ``0`` to use the worker default (1000 ms). + """ + if tag_addresses is None: + raise TypeError("tag_addresses is required") + _ensure_bulk_size("tag_addresses", len(tag_addresses)) + if timeout_ms < 0: + raise ValueError("timeout_ms must be non-negative") + reply = await self.invoke( + pb.MxCommand( + kind=pb.MX_COMMAND_KIND_READ_BULK, + read_bulk=pb.ReadBulkCommand( + server_handle=server_handle, + tag_addresses=tag_addresses, + timeout_ms=timeout_ms, + ), + ), + correlation_id=correlation_id, + ) + return list(reply.read_bulk.results) + async def write( self, server_handle: int, diff --git a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py index f655987..fe05355 100644 --- a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py +++ b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py @@ -20,7 +20,7 @@ from zb_mom_ww_mxgateway.client import GatewayClient from zb_mom_ww_mxgateway.errors import MxGatewayError from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb from zb_mom_ww_mxgateway.options import ClientOptions -from zb_mom_ww_mxgateway.values import MxValueInput +from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value MAX_AGGREGATE_EVENTS = 10_000 @@ -263,6 +263,112 @@ def unsubscribe_bulk(**kwargs: Any) -> None: ) +@main.command("read-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.") +@click.option("--timeout-ms", default=0, type=int, show_default=True, + help="Per-tag snapshot timeout in milliseconds. 0 = worker default.") +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def read_bulk(**kwargs: Any) -> None: + """Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise.""" + + _run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteBulk — sequential Write per entry.""" + + _run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry.""" + + _run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecuredBulk — credential-sensitive.""" + + _run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive.""" + + _run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("bench-read-bulk") +@gateway_options +@click.option("--client-name", default="mxgw-python-bench", show_default=True) +@click.option("--duration-seconds", default=30, type=int, show_default=True) +@click.option("--warmup-seconds", default=3, type=int, show_default=True) +@click.option("--bulk-size", default=6, type=int, show_default=True) +@click.option("--tag-start", default=1, type=int, show_default=True) +@click.option("--tag-prefix", default="TestMachine_", show_default=True) +@click.option("--tag-attribute", default="TestChangingInt", show_default=True) +@click.option("--timeout-ms", default=1500, type=int, show_default=True) +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def bench_read_bulk(**kwargs: Any) -> None: + """Cross-language ReadBulk stress benchmark. + + Opens its own session, subscribes to bulk-size tags so the worker value + cache populates from real OnDataChange events, runs ReadBulk in a tight + loop for duration-seconds, and emits the shared JSON stats schema the + scripts/bench-read-bulk.ps1 driver collates across all five clients. + """ + + _run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -417,6 +523,233 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]: return {"results": [_message_dict(result) for result in results]} +async def _read_bulk(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.read_bulk( + kwargs["server_handle"], + _parse_string_list(kwargs["items"]), + timeout_ms=kwargs["timeout_ms"], + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +def _build_write_bulk_entries(kwargs: dict[str, Any]): + """Build (item_handle, MxValue) pairs for the bulk-write families. + + The CLI accepts a single ``--type`` plus ``--values`` (comma-separated + string-encoded values, one per ``--item-handles`` entry). Returns the + parsed item-handle list and the per-entry MxValue protobuf instances — + callers wrap these into the appropriate per-entry message type. + """ + + handles = _parse_int_list(kwargs["item_handles"]) + value_texts = _parse_string_list(kwargs["values"]) + if len(handles) != len(value_texts): + raise click.UsageError( + f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})", + ) + parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts] + values = [to_mx_value(v) for v in parsed] + return handles, values + + +async def _write_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.Write2BulkEntry( + item_handle=handle, + user_id=kwargs["user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteSecuredBulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.WriteSecured2BulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: + """ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema.""" + import time + + bulk_size = int(kwargs["bulk_size"]) + if bulk_size < 1: + raise click.UsageError("bulk-size must be positive") + duration_seconds = int(kwargs["duration_seconds"]) + warmup_seconds = int(kwargs["warmup_seconds"]) + tag_start = int(kwargs["tag_start"]) + tag_prefix = kwargs["tag_prefix"] + tag_attribute = kwargs["tag_attribute"] + timeout_ms = int(kwargs["timeout_ms"]) + client_name = kwargs["client_name"] + tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)] + + async with await _connect(kwargs) as client: + session = await client.open_session(client_session_name=client_name) + server_handle = 0 + item_handles: list[int] = [] + try: + server_handle = await session.register(client_name) + subscribe_results = await session.subscribe_bulk(server_handle, tags) + item_handles = [r.item_handle for r in subscribe_results if r.was_successful] + + # Warm-up window so JIT / connection pool / first-call costs are + # amortised before the measurement window opens. + warmup_deadline = time.perf_counter() + warmup_seconds + while time.perf_counter() < warmup_deadline: + await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + + latencies_ms: list[float] = [] + total_results = 0 + cached_results = 0 + successful = 0 + failed = 0 + steady_start = time.perf_counter() + steady_deadline = steady_start + duration_seconds + while time.perf_counter() < steady_deadline: + call_start = time.perf_counter() + try: + results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + except Exception: + failed += 1 + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + continue + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + successful += 1 + for r in results: + total_results += 1 + if r.was_cached: + cached_results += 1 + steady_elapsed = time.perf_counter() - steady_start + total_calls = successful + failed + calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0 + finally: + if item_handles: + try: + await session.unsubscribe_bulk(server_handle, item_handles) + except Exception: + pass + try: + await session.close() + except Exception: + pass + + return { + "language": "python", + "command": "bench-read-bulk", + "endpoint": kwargs.get("endpoint"), + "clientName": client_name, + "bulkSize": bulk_size, + "durationSeconds": duration_seconds, + "warmupSeconds": warmup_seconds, + "durationMs": int(steady_elapsed * 1000), + "tags": tags, + "totalCalls": total_calls, + "successfulCalls": successful, + "failedCalls": failed, + "totalReadResults": total_results, + "cachedReadResults": cached_results, + "callsPerSecond": round(calls_per_second, 2), + "latencyMs": _percentile_summary(latencies_ms), + } + + +def _percentile_summary(sample: list[float]) -> dict[str, float]: + if not sample: + return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0} + sorted_sample = sorted(sample) + return { + "p50": round(_percentile(sorted_sample, 0.50), 3), + "p95": round(_percentile(sorted_sample, 0.95), 3), + "p99": round(_percentile(sorted_sample, 0.99), 3), + "max": round(sorted_sample[-1], 3), + "mean": round(sum(sample) / len(sample), 3), + } + + +def _percentile(sorted_sample: list[float], quantile: float) -> float: + """Nearest-rank with linear interpolation; matches every other client.""" + n = len(sorted_sample) + if n == 0: + return 0.0 + if n == 1: + return sorted_sample[0] + rank = quantile * (n - 1) + lower = int(rank) + upper = min(lower + 1, n - 1) + fraction = rank - lower + return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction + + async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"])