Python client: port bulk read/write SDK methods + CLI subcommands

Mirrors the .NET / Go ports of divergent branch commit f220908. HEAD's
Session class had only the subscribe-style bulks; this commit adds the
value-bulk SDK surface plus matching CLI subcommands and a
bench-read-bulk harness.

SDK (zb_mom_ww_mxgateway/session.py):
- async def write_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write2_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write_secured_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write_secured2_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def read_bulk(server_handle, tag_addresses, *, timeout_ms=0,
  correlation_id="") → list[pb.BulkReadResult]

All five reuse the existing _ensure_bulk_size validator and route
through the existing invoke() pipeline. read_bulk additionally enforces
timeout_ms >= 0.

CLI (zb_mom_ww_mxgateway_cli/commands.py):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk /
  write-secured2-bulk registered as click @main.command(...). The
  write families share a _build_write_bulk_entries() helper that parses
  --item-handles and --values with a single --type, validates count
  match, converts via to_mx_value, and assembles the correct per-entry
  proto message.
- bench-read-bulk: opens its own session, subscribes to --bulk-size
  TestMachine_NNN.TestChangingInt tags, runs warmup then steady-state
  ReadBulk for --duration-seconds with time.perf_counter() latency
  capture, and emits the shared JSON schema (language, durationMs,
  totalCalls, successfulCalls, failedCalls, totalReadResults,
  cachedReadResults, callsPerSecond, latencyMs:{p50,p95,p99,max,mean})
  so scripts/bench-read-bulk.ps1 collates Python alongside the four
  other clients. _percentile_summary + linear-interpolation
  _percentile helper match the Go / .NET implementations.

to_mx_value is added to the existing values-module import line in
commands.py since the bulk-write commands need it.

Verification: python -m pip install -e . --quiet --no-deps; pytest
42/42 passing. Manual smoke against live gateway on localhost:5120:
open-session → register → subscribe-bulk on two
TestMachine_NNN.TestChangingInt tags (both wasSuccessful=true) →
read-bulk (both wasSuccessful=true / wasCached=true / int32 values
present) → close-session SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 04:50:10 -04:00
parent 325106920f
commit 6add4b4acc
2 changed files with 466 additions and 1 deletions
@@ -334,6 +334,138 @@ class Session:
)
return list(reply.unsubscribe_bulk.results)
async def write_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteBulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteBulk` and return one BulkWriteResult per entry.
Per-entry MXAccess failures appear as results with ``was_successful = False``
and a populated ``error_message`` / ``hresult``; this method does not raise
on per-entry failure, mirroring the existing add/advise bulk surface.
"""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_BULK,
write_bulk=pb.WriteBulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_bulk.results)
async def write2_bulk(
self,
server_handle: int,
entries: Sequence[pb.Write2BulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `Write2Bulk` (timestamped) and return per-entry results."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE2_BULK,
write2_bulk=pb.Write2BulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write2_bulk.results)
async def write_secured_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteSecuredBulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteSecuredBulk` — credential-sensitive values must not be logged."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK,
write_secured_bulk=pb.WriteSecuredBulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_secured_bulk.results)
async def write_secured2_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteSecured2BulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteSecured2Bulk` (timestamped + verified)."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK,
write_secured2_bulk=pb.WriteSecured2BulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_secured2_bulk.results)
async def read_bulk(
self,
server_handle: int,
tag_addresses: Sequence[str],
*,
timeout_ms: int = 0,
correlation_id: str = "",
) -> list[pb.BulkReadResult]:
"""Invoke `ReadBulk` — snapshot the current value of each requested tag.
MXAccess COM has no synchronous read; the worker returns the cached
``OnDataChange`` value for any tag that is already advised (``was_cached =
True``) without modifying the existing subscription, and falls back to
a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle
otherwise. ``timeout_ms`` bounds the per-tag wait in the snapshot case;
pass ``0`` to use the worker default (1000 ms).
"""
if tag_addresses is None:
raise TypeError("tag_addresses is required")
_ensure_bulk_size("tag_addresses", len(tag_addresses))
if timeout_ms < 0:
raise ValueError("timeout_ms must be non-negative")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_READ_BULK,
read_bulk=pb.ReadBulkCommand(
server_handle=server_handle,
tag_addresses=tag_addresses,
timeout_ms=timeout_ms,
),
),
correlation_id=correlation_id,
)
return list(reply.read_bulk.results)
async def write(
self,
server_handle: int,
@@ -20,7 +20,7 @@ from zb_mom_ww_mxgateway.client import GatewayClient
from zb_mom_ww_mxgateway.errors import MxGatewayError
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
from zb_mom_ww_mxgateway.options import ClientOptions
from zb_mom_ww_mxgateway.values import MxValueInput
from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value
MAX_AGGREGATE_EVENTS = 10_000
@@ -263,6 +263,112 @@ def unsubscribe_bulk(**kwargs: Any) -> None:
)
@main.command("read-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.")
@click.option("--timeout-ms", default=0, type=int, show_default=True,
help="Per-tag snapshot timeout in milliseconds. 0 = worker default.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def read_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise."""
_run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteBulk — sequential Write per entry."""
_run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write2-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write2_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry."""
_run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-secured-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--current-user-id", default=0, type=int, show_default=True)
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_secured_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteSecuredBulk — credential-sensitive."""
_run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-secured2-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
@click.option("--current-user-id", default=0, type=int, show_default=True)
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_secured2_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive."""
_run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("bench-read-bulk")
@gateway_options
@click.option("--client-name", default="mxgw-python-bench", show_default=True)
@click.option("--duration-seconds", default=30, type=int, show_default=True)
@click.option("--warmup-seconds", default=3, type=int, show_default=True)
@click.option("--bulk-size", default=6, type=int, show_default=True)
@click.option("--tag-start", default=1, type=int, show_default=True)
@click.option("--tag-prefix", default="TestMachine_", show_default=True)
@click.option("--tag-attribute", default="TestChangingInt", show_default=True)
@click.option("--timeout-ms", default=1500, type=int, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def bench_read_bulk(**kwargs: Any) -> None:
"""Cross-language ReadBulk stress benchmark.
Opens its own session, subscribes to bulk-size tags so the worker value
cache populates from real OnDataChange events, runs ReadBulk in a tight
loop for duration-seconds, and emits the shared JSON stats schema the
scripts/bench-read-bulk.ps1 driver collates across all five clients.
"""
_run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("stream-events")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@@ -417,6 +523,233 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]:
return {"results": [_message_dict(result) for result in results]}
async def _read_bulk(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.read_bulk(
kwargs["server_handle"],
_parse_string_list(kwargs["items"]),
timeout_ms=kwargs["timeout_ms"],
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
def _build_write_bulk_entries(kwargs: dict[str, Any]):
"""Build (item_handle, MxValue) pairs for the bulk-write families.
The CLI accepts a single ``--type`` plus ``--values`` (comma-separated
string-encoded values, one per ``--item-handles`` entry). Returns the
parsed item-handle list and the per-entry MxValue protobuf instances —
callers wrap these into the appropriate per-entry message type.
"""
handles = _parse_int_list(kwargs["item_handles"])
value_texts = _parse_string_list(kwargs["values"])
if len(handles) != len(value_texts):
raise click.UsageError(
f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})",
)
parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts]
values = [to_mx_value(v) for v in parsed]
return handles, values
async def _write_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
entries = [
pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write2_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
entries = [
pb.Write2BulkEntry(
item_handle=handle,
user_id=kwargs["user_id"],
value=value,
timestamp_value=timestamp_value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write2_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
entries = [
pb.WriteSecuredBulkEntry(
item_handle=handle,
current_user_id=kwargs["current_user_id"],
verifier_user_id=kwargs["verifier_user_id"],
value=value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_secured_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
entries = [
pb.WriteSecured2BulkEntry(
item_handle=handle,
current_user_id=kwargs["current_user_id"],
verifier_user_id=kwargs["verifier_user_id"],
value=value,
timestamp_value=timestamp_value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_secured2_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]:
"""ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema."""
import time
bulk_size = int(kwargs["bulk_size"])
if bulk_size < 1:
raise click.UsageError("bulk-size must be positive")
duration_seconds = int(kwargs["duration_seconds"])
warmup_seconds = int(kwargs["warmup_seconds"])
tag_start = int(kwargs["tag_start"])
tag_prefix = kwargs["tag_prefix"]
tag_attribute = kwargs["tag_attribute"]
timeout_ms = int(kwargs["timeout_ms"])
client_name = kwargs["client_name"]
tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)]
async with await _connect(kwargs) as client:
session = await client.open_session(client_session_name=client_name)
server_handle = 0
item_handles: list[int] = []
try:
server_handle = await session.register(client_name)
subscribe_results = await session.subscribe_bulk(server_handle, tags)
item_handles = [r.item_handle for r in subscribe_results if r.was_successful]
# Warm-up window so JIT / connection pool / first-call costs are
# amortised before the measurement window opens.
warmup_deadline = time.perf_counter() + warmup_seconds
while time.perf_counter() < warmup_deadline:
await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms)
latencies_ms: list[float] = []
total_results = 0
cached_results = 0
successful = 0
failed = 0
steady_start = time.perf_counter()
steady_deadline = steady_start + duration_seconds
while time.perf_counter() < steady_deadline:
call_start = time.perf_counter()
try:
results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms)
except Exception:
failed += 1
latencies_ms.append((time.perf_counter() - call_start) * 1000.0)
continue
latencies_ms.append((time.perf_counter() - call_start) * 1000.0)
successful += 1
for r in results:
total_results += 1
if r.was_cached:
cached_results += 1
steady_elapsed = time.perf_counter() - steady_start
total_calls = successful + failed
calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0
finally:
if item_handles:
try:
await session.unsubscribe_bulk(server_handle, item_handles)
except Exception:
pass
try:
await session.close()
except Exception:
pass
return {
"language": "python",
"command": "bench-read-bulk",
"endpoint": kwargs.get("endpoint"),
"clientName": client_name,
"bulkSize": bulk_size,
"durationSeconds": duration_seconds,
"warmupSeconds": warmup_seconds,
"durationMs": int(steady_elapsed * 1000),
"tags": tags,
"totalCalls": total_calls,
"successfulCalls": successful,
"failedCalls": failed,
"totalReadResults": total_results,
"cachedReadResults": cached_results,
"callsPerSecond": round(calls_per_second, 2),
"latencyMs": _percentile_summary(latencies_ms),
}
def _percentile_summary(sample: list[float]) -> dict[str, float]:
if not sample:
return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0}
sorted_sample = sorted(sample)
return {
"p50": round(_percentile(sorted_sample, 0.50), 3),
"p95": round(_percentile(sorted_sample, 0.95), 3),
"p99": round(_percentile(sorted_sample, 0.99), 3),
"max": round(sorted_sample[-1], 3),
"mean": round(sum(sample) / len(sample), 3),
}
def _percentile(sorted_sample: list[float], quantile: float) -> float:
"""Nearest-rank with linear interpolation; matches every other client."""
n = len(sorted_sample)
if n == 0:
return 0.0
if n == 1:
return sorted_sample[0]
rank = quantile * (n - 1)
lower = int(rank)
upper = min(lower + 1, n - 1)
fraction = rank - lower
return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])