Resolve Client.Python-022..026: TLS-by-default, batch CLI, README

Client.Python-022  README CLI examples for stream-alarms and
                   acknowledge-alarm now use the correct flags;
                   regression test parses every documented line through
                   Click.
Client.Python-023  Re-applied Client.Python-013 — _use_plaintext drops
                   the silent localhost / 127.0.0.1 auto-downgrade
                   branch; --plaintext and --tls are mutually exclusive
                   and TLS is the default.
Client.Python-024  batch dispatch routes through main.main(...,
                   standalone_mode=False) under a redirected stdout
                   instead of click.testing.CliRunner; recursive batch
                   lines are rejected outright.
Client.Python-025  Added behavioural tests for the five bulk SDK methods,
                   stream_alarms, and the new CLI subcommands.
Client.Python-026  _bench_read_bulk hoists 'import time' to module scope
                   and logs cleanup failures instead of swallowing them.

All resolved at 2026-05-24; python -m pytest is 65/65 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 08:50:27 -04:00
parent 4a0f88b17d
commit f5b50c4484
4 changed files with 1002 additions and 54 deletions
+2 -2
View File
@@ -198,8 +198,8 @@ mxgw-py register --session-id <id> --client-name python-client --json
mxgw-py add-item --session-id <id> --server-handle 1 --item Object.Attribute --json
mxgw-py advise --session-id <id> --server-handle 1 --item-handle 2 --json
mxgw-py stream-events --session-id <id> --max-events 1 --json
mxgw-py stream-alarms --session-id <id> --max-messages 1 --json
mxgw-py acknowledge-alarm --session-id <id> --alarm-reference "\\Galaxy\Area001.Pump001.PumpFault" --json
mxgw-py stream-alarms --max-messages 1 --json
mxgw-py acknowledge-alarm --reference "\\Galaxy\Area001.Pump001.PumpFault" --json
mxgw-py write --session-id <id> --server-handle 1 --item-handle 2 --type int32 --value 123 --json
```
@@ -3,15 +3,18 @@
from __future__ import annotations
import asyncio
import contextlib
import io
import json
import logging
import os
import sys
import time
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import Any
import click
from click.testing import CliRunner
from google.protobuf.json_format import MessageToDict
from zb_mom_ww_mxgateway import __version__
@@ -22,6 +25,8 @@ from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
from zb_mom_ww_mxgateway.options import ClientOptions
from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value
logger = logging.getLogger(__name__)
MAX_AGGREGATE_EVENTS = 10_000
_BATCH_EOR = "__MXGW_BATCH_EOR__"
@@ -56,9 +61,10 @@ def batch() -> None:
Errors do NOT terminate the loop. Each command's output (including any error JSON) is
written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then
stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``.
"""
runner = CliRunner()
Recursive ``batch`` lines are rejected (Client.Python-024) — re-entering the batch
dispatcher would silently spawn a nested loop reading from the same exhausted stdin.
"""
for raw_line in sys.stdin:
line = raw_line.rstrip("\n").rstrip("\r")
@@ -68,44 +74,77 @@ def batch() -> None:
args = line.split()
try:
result = runner.invoke(main, args, catch_exceptions=True)
except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die
_batch_write_error(exc.__class__.__name__, str(exc))
# Reject a recursive `batch` line outright: the nested invocation would
# read from the already-exhausted stdin (or, depending on harness, the
# same stream the outer batch is consuming line-by-line) and silently
# exit. Surface it as an explicit error block so callers can audit the
# mis-routed line.
if args and args[0] == "batch":
_batch_write_error(
"RecursiveBatchError",
"nested 'batch' invocation is not allowed inside batch mode",
)
_batch_flush_eor()
continue
if result.exit_code == 0:
# Normal success — write captured output as-is.
sys.stdout.write(result.output)
_dispatch_batch_line(args)
def _dispatch_batch_line(args: list[str]) -> None:
"""Run a single batch line through the Click parser directly (no CliRunner).
Captures the subcommand's stdout via :func:`contextlib.redirect_stdout` and
synthesises the standard ``{"error": ..., "type": ...}`` shape on failure.
Click exceptions (`ClickException`, `UsageError`) are caught and rendered;
`SystemExit(0)` from a Click command is treated as a clean exit, while a
non-zero `SystemExit` is rendered as a CLI error. All other exceptions are
captured and rendered as `{"error": str(exc), "type": exc.__class__.__name__}`
so the loop never dies.
"""
buffer = io.StringIO()
exit_code = 0
exc: BaseException | None = None
try:
with contextlib.redirect_stdout(buffer):
try:
# `standalone_mode=False` makes Click raise instead of calling
# `sys.exit`; we still need to handle SystemExit because some
# commands explicitly raise it (or `click.UsageError` converts
# to a SystemExit under some entry-point paths).
main.main(args=args, standalone_mode=False, prog_name="mxgw-py")
except click.exceptions.Exit as click_exit:
exit_code = click_exit.exit_code
except click.ClickException as click_exc:
exit_code = click_exc.exit_code
exc = click_exc
click.echo(f"Error: {click_exc.format_message()}", err=False)
except SystemExit as sys_exit:
code = sys_exit.code
exit_code = int(code) if isinstance(code, int) else (0 if code is None else 1)
except Exception as captured: # noqa: BLE001 — never let batch loop die
exc = captured
exit_code = 1
output = buffer.getvalue()
if exit_code == 0 and exc is None:
sys.stdout.write(output)
else:
if output.lstrip().startswith("{"):
# Inner command already emitted JSON (e.g. a structured error) —
# relay verbatim.
sys.stdout.write(output)
if output and not output.endswith("\n"):
sys.stdout.write("\n")
elif exc is not None:
_batch_write_error(type(exc).__name__, str(exc))
else:
# Something went wrong. If the command already emitted a JSON object
# (e.g. the output starts with '{'), trust that and relay it verbatim.
# Otherwise synthesise the standard {"error": ..., "type": ...} shape.
output = result.output or ""
exc = result.exception
msg = output.strip()
if msg.startswith("Error: "):
msg = msg[len("Error: "):]
_batch_write_error("CliError", msg)
if output.lstrip().startswith("{"):
# Already JSON — relay verbatim (may or may not end with newline).
sys.stdout.write(output)
if not output.endswith("\n"):
sys.stdout.write("\n")
elif exc is not None and not isinstance(exc, SystemExit):
_batch_write_error(type(exc).__name__, str(exc))
else:
# Click's default error format is "Error: <message>\n"; extract the
# message so the harness gets clean JSON.
msg = output.strip()
if msg.startswith("Error: "):
msg = msg[len("Error: "):]
exc_type = (
type(exc).__name__
if exc is not None and not isinstance(exc, SystemExit)
else "CliError"
)
_batch_write_error(exc_type, msg)
_batch_flush_eor()
_batch_flush_eor()
def _batch_write_error(exc_type: str, message: str) -> None:
@@ -673,7 +712,6 @@ async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]:
async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]:
"""ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema."""
import time
bulk_size = int(kwargs["bulk_size"])
if bulk_size < 1:
@@ -730,12 +768,12 @@ async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]:
if item_handles:
try:
await session.unsubscribe_bulk(server_handle, item_handles)
except Exception:
pass
except Exception as exc: # noqa: BLE001 — bench is best-effort
logger.warning("bench-read-bulk: unsubscribe_bulk cleanup failed: %s", exc)
try:
await session.close()
except Exception:
pass
except Exception as exc: # noqa: BLE001 — bench is best-effort
logger.warning("bench-read-bulk: session.close cleanup failed: %s", exc)
return {
"language": "python",
@@ -899,11 +937,21 @@ def _session(client: GatewayClient, session_id: str):
def _use_plaintext(kwargs: dict[str, Any]) -> bool:
if kwargs.get("use_tls"):
return False
if kwargs.get("plaintext"):
return True
return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:")
"""Resolve the plaintext / TLS contract from the CLI flags.
TLS is the default. ``--plaintext`` is the only way to opt in to an
unencrypted channel; ``--tls`` is accepted as a redundant explicit
affirmation. Combining the two is a usage error (regression-guarded by
Client.Python-023 — the previous silent ``localhost:`` /
``127.0.0.1:`` auto-plaintext branch leaked the API-key bearer over a
plaintext channel when a user ran the gateway behind TLS on loopback).
"""
plaintext = bool(kwargs.get("plaintext"))
use_tls = bool(kwargs.get("use_tls"))
if plaintext and use_tls:
raise click.UsageError("--plaintext and --tls are mutually exclusive")
return plaintext
def _api_key_from_env(name: str | None) -> str | None:
@@ -0,0 +1,789 @@
"""Regression tests for Client.Python-022..026.
Each test corresponds to a finding from the latest re-review. Tests are
TDD-first — they failed against the pre-fix source and pass against the
fixed source.
"""
from __future__ import annotations
import json
import re
import time as _time_module_ref
from pathlib import Path
from typing import Any
import pytest
from click.testing import CliRunner
from zb_mom_ww_mxgateway import ClientOptions, GatewayClient
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
from zb_mom_ww_mxgateway_cli import commands as cli_commands
from zb_mom_ww_mxgateway_cli.commands import _use_plaintext, main
_BATCH_EOR = "__MXGW_BATCH_EOR__"
# ---------------------------------------------------------------------------
# Client.Python-022 — README CLI examples must parse against the implementation.
# ---------------------------------------------------------------------------
def _readme_path() -> Path:
return Path(__file__).resolve().parent.parent / "README.md"
def _extract_mxgw_py_examples() -> list[list[str]]:
"""Return the README's ``mxgw-py ...`` example lines as click arg lists.
Replaces angle-bracket placeholders (``<id>``) with safe stub values and
leaves real flag names untouched. The returned arg lists drop the
``mxgw-py`` prefix.
"""
text = _readme_path().read_text(encoding="utf-8")
args: list[list[str]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line.startswith("mxgw-py "):
continue
# Strip the leading "mxgw-py " token.
body = line[len("mxgw-py ") :]
# Replace common placeholders so click does not error on the placeholder.
body = body.replace("<id>", "session-1")
# Backtick-quoted hostnames in the TLS example are not represented
# in CLI; safe to leave as-is.
tokens = _split_cli_tokens(body)
# Keep only examples that exercise a real subcommand. Skip TLS
# multi-flag example (we only need the README CLI examples added in
# commits 8738735 — stream-alarms / acknowledge-alarm).
args.append(tokens)
return args
def _split_cli_tokens(body: str) -> list[str]:
"""Split a CLI body into argv tokens, honouring double-quoted strings."""
tokens: list[str] = []
pattern = re.compile(r'"([^"]*)"|(\S+)')
for match in pattern.finditer(body):
quoted, plain = match.group(1), match.group(2)
tokens.append(quoted if quoted is not None else plain)
return tokens
def test_readme_alarm_examples_parse_against_cli() -> None:
"""README `stream-alarms` / `acknowledge-alarm` examples must parse without
triggering Click's ``no such option`` error.
Drives every README ``mxgw-py`` example through Click's ``--help`` style
parser by re-invoking the documented argv with a trailing ``--help`` flag so
only the parser runs (no RPC is attempted). If a documented flag does not
exist on the subcommand, Click prints ``no such option: --<flag>`` and
exits 2 — that is the regression we want to catch.
"""
runner = CliRunner()
examples = _extract_mxgw_py_examples()
assert any(
"stream-alarms" in args for args in examples
), "README must include a stream-alarms example."
assert any(
"acknowledge-alarm" in args for args in examples
), "README must include an acknowledge-alarm example."
for argv in examples:
# Strip "--json" (already a real flag) and any value-bearing flag that
# requires a host/file/value, then append --help so we exercise the
# parser only.
# We just append --help — Click parses all options up to --help and
# then prints help; an unknown option still errors out first.
result = runner.invoke(main, [*argv, "--help"])
# Either help text printed (exit 0) or some other parser issue (exit 2);
# we only want to assert NO "no such option" error.
assert "no such option" not in result.output.lower(), (
f"README example failed Click parsing: argv={argv!r}\n"
f"output={result.output!r}"
)
# ---------------------------------------------------------------------------
# Client.Python-023 — REGRESSION of Client.Python-013. _use_plaintext must
# not silently auto-downgrade on localhost / 127.0.0.1.
# ---------------------------------------------------------------------------
def test_use_plaintext_does_not_auto_downgrade_for_localhost_endpoint() -> None:
"""A bare ``localhost:...`` endpoint with no flags must default to TLS."""
assert _use_plaintext({
"endpoint": "localhost:5001",
"plaintext": False,
"use_tls": False,
}) is False
def test_use_plaintext_does_not_auto_downgrade_for_loopback_ipv4_endpoint() -> None:
"""A bare ``127.0.0.1:...`` endpoint with no flags must default to TLS."""
assert _use_plaintext({
"endpoint": "127.0.0.1:5001",
"plaintext": False,
"use_tls": False,
}) is False
def test_use_plaintext_requires_explicit_plaintext_flag() -> None:
"""``--plaintext`` is the only way to opt in."""
assert _use_plaintext({
"endpoint": "localhost:5001",
"plaintext": True,
"use_tls": False,
}) is True
def test_use_plaintext_tls_flag_explicitly_disables_plaintext() -> None:
"""``--tls`` is accepted as an explicit affirmation of the default."""
assert _use_plaintext({
"endpoint": "localhost:5001",
"plaintext": False,
"use_tls": True,
}) is False
def test_use_plaintext_rejects_plaintext_and_tls_combined() -> None:
"""``--plaintext`` and ``--tls`` together must be rejected as ambiguous."""
import click as _click
with pytest.raises(_click.UsageError):
_use_plaintext({
"endpoint": "localhost:5001",
"plaintext": True,
"use_tls": True,
})
def test_cli_localhost_endpoint_with_no_flags_uses_tls_channel(monkeypatch) -> None:
"""End-to-end CLI: against ``localhost:...`` with no flags, the resolved
``ClientOptions.plaintext`` flowing into ``GatewayClient.connect`` must be
``False`` (TLS), so the API key bearer cannot leak over plaintext.
"""
captured: dict[str, Any] = {}
class _FakeStub:
def __init__(self) -> None:
pass
async def OpenSession(self, request: Any, *, metadata: tuple[Any, ...]) -> Any:
captured["metadata"] = metadata
return pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
)
real_connect = GatewayClient.connect
@classmethod
async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient:
captured["options"] = options
return await real_connect(options, stub=_FakeStub())
monkeypatch.setattr(GatewayClient, "connect", _spy_connect)
runner = CliRunner()
result = runner.invoke(
main,
[
"open-session",
"--endpoint",
"localhost:5000",
"--api-key",
"mxgw_test_secret",
"--json",
],
)
assert result.exit_code == 0, result.output
assert "options" in captured
assert captured["options"].plaintext is False, (
"localhost endpoint without --plaintext must NOT auto-downgrade to plaintext"
)
# ---------------------------------------------------------------------------
# Client.Python-024 — `batch` must not use CliRunner from production code,
# and a recursive `batch` line must not silently re-enter.
# ---------------------------------------------------------------------------
def test_batch_command_does_not_use_clirunner_in_production() -> None:
"""`commands.py` must not import or instantiate the test-only CliRunner helper.
Docstring references explaining what the module deliberately avoids are
permitted; what is forbidden is an actual ``import`` of ``click.testing``
or an actual ``CliRunner()`` instantiation in executable code.
"""
source = Path(cli_commands.__file__).read_text(encoding="utf-8")
assert "from click.testing" not in source, (
"click.testing is a test-only helper and must not be used by production code"
)
assert "import click.testing" not in source, (
"click.testing is a test-only helper and must not be used by production code"
)
# `CliRunner()` (instantiation) must not appear in production code.
assert "CliRunner(" not in source, (
"CliRunner() must not be instantiated in production code"
)
def test_batch_recursive_batch_line_is_bounded() -> None:
"""A `batch` line nested inside `batch` stdin must not be silently spawned.
The pre-fix implementation re-invoked the test runner with empty stdin,
so `batch` inside `batch` exited cleanly with no error. The fix either
rejects the nested invocation or surfaces it as an error block so the
behaviour is auditable.
"""
runner = CliRunner()
result = runner.invoke(
main,
["batch"],
input="batch\nversion --json\n",
)
# Outer batch must still exit 0 and process both lines.
assert result.exit_code == 0
assert result.output.count(_BATCH_EOR) == 2
blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block]
# The first block — the recursive `batch` line — must surface an error
# JSON. (Either an explicit rejection, or some non-empty error block —
# NOT a silently empty block.)
first_block = blocks[0].strip()
assert first_block, "recursive batch line must not be silently swallowed"
payload = json.loads(first_block.splitlines()[-1])
assert "error" in payload, (
f"recursive batch line should surface an error: got {payload!r}"
)
# ---------------------------------------------------------------------------
# Client.Python-025 — Behavioural tests for new bulk SDK methods,
# stream_alarms, and the new CLI subcommands.
# ---------------------------------------------------------------------------
class _AlarmFakeStream:
def __init__(self, messages: list[pb.AlarmFeedMessage]) -> None:
self._messages = list(messages)
self.cancelled = False
def __aiter__(self) -> "_AlarmFakeStream":
return self
async def __anext__(self) -> pb.AlarmFeedMessage:
if not self._messages:
raise StopAsyncIteration
return self._messages.pop(0)
def cancel(self) -> None:
self.cancelled = True
class _BulkFakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class _BulkFakeStub:
def __init__(self) -> None:
self.open_session = _BulkFakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = _BulkFakeUnary([])
self.OpenSession = self.open_session
self.Invoke = self.invoke
self.stream_alarms_metadata: tuple[tuple[str, str], ...] | None = None
self._alarm_stream = _AlarmFakeStream([])
def set_invoke_replies(self, replies: list[Any]) -> None:
self.invoke.replies = replies
def set_alarm_stream(self, stream: _AlarmFakeStream) -> None:
self._alarm_stream = stream
def StreamAlarms(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any:
self.stream_alarms_request = request
self.stream_alarms_metadata = metadata
return self._alarm_stream
@pytest.mark.asyncio
async def test_session_read_bulk_sends_expected_request_shape() -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_READ_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
read_bulk=pb.BulkReadReply(
results=[
pb.BulkReadResult(
tag_address="Tank01.Level",
was_successful=True,
),
],
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
results = await session.read_bulk(12, ["Tank01.Level"], timeout_ms=1500)
assert len(results) == 1
assert results[0].tag_address == "Tank01.Level"
request = stub.invoke.requests[0]
assert request.command.kind == pb.MX_COMMAND_KIND_READ_BULK
assert request.command.read_bulk.server_handle == 12
assert list(request.command.read_bulk.tag_addresses) == ["Tank01.Level"]
assert request.command.read_bulk.timeout_ms == 1500
@pytest.mark.asyncio
async def test_session_write_bulk_sends_expected_request_shape() -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
write_bulk=pb.BulkWriteReply(
results=[pb.BulkWriteResult(was_successful=True)],
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
from zb_mom_ww_mxgateway.values import to_mx_value
entries = [
pb.WriteBulkEntry(item_handle=34, user_id=99, value=to_mx_value(123)),
]
results = await session.write_bulk(12, entries)
assert results[0].was_successful is True
cmd = stub.invoke.requests[0].command
assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK
assert cmd.write_bulk.server_handle == 12
assert cmd.write_bulk.entries[0].item_handle == 34
assert cmd.write_bulk.entries[0].user_id == 99
@pytest.mark.asyncio
async def test_session_write2_bulk_sends_expected_request_shape() -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE2_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
write2_bulk=pb.BulkWriteReply(
results=[pb.BulkWriteResult(was_successful=True)],
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
from zb_mom_ww_mxgateway.values import to_mx_value
entries = [
pb.Write2BulkEntry(
item_handle=34,
user_id=99,
value=to_mx_value(123),
timestamp_value=to_mx_value(1.5),
),
]
results = await session.write2_bulk(12, entries)
assert results[0].was_successful is True
cmd = stub.invoke.requests[0].command
assert cmd.kind == pb.MX_COMMAND_KIND_WRITE2_BULK
assert cmd.write2_bulk.server_handle == 12
assert cmd.write2_bulk.entries[0].item_handle == 34
@pytest.mark.asyncio
async def test_session_write_secured_bulk_sends_expected_request_shape() -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
write_secured_bulk=pb.BulkWriteReply(
results=[pb.BulkWriteResult(was_successful=True)],
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
from zb_mom_ww_mxgateway.values import to_mx_value
entries = [
pb.WriteSecuredBulkEntry(
item_handle=34,
current_user_id=42,
verifier_user_id=43,
value=to_mx_value("secret"),
),
]
results = await session.write_secured_bulk(12, entries)
assert results[0].was_successful is True
cmd = stub.invoke.requests[0].command
assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED_BULK
assert cmd.write_secured_bulk.server_handle == 12
assert cmd.write_secured_bulk.entries[0].current_user_id == 42
assert cmd.write_secured_bulk.entries[0].verifier_user_id == 43
@pytest.mark.asyncio
async def test_session_write_secured2_bulk_sends_expected_request_shape() -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
write_secured2_bulk=pb.BulkWriteReply(
results=[pb.BulkWriteResult(was_successful=True)],
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
from zb_mom_ww_mxgateway.values import to_mx_value
entries = [
pb.WriteSecured2BulkEntry(
item_handle=34,
current_user_id=42,
verifier_user_id=43,
value=to_mx_value("secret"),
timestamp_value=to_mx_value(1.5),
),
]
results = await session.write_secured2_bulk(12, entries)
assert results[0].was_successful is True
cmd = stub.invoke.requests[0].command
assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK
assert cmd.write_secured2_bulk.entries[0].current_user_id == 42
@pytest.mark.asyncio
async def test_stream_alarms_yields_feed_messages_and_cancels_on_close() -> None:
transitions = [
pb.AlarmFeedMessage(
transition=pb.OnAlarmTransitionEvent(
alarm_full_reference="Tank01.Level.HiHi",
transition_kind=pb.ALARM_TRANSITION_KIND_RAISE,
),
),
]
stream = _AlarmFakeStream(transitions)
stub = _BulkFakeStub()
stub.set_alarm_stream(stream)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
iterator = client.stream_alarms(pb.StreamAlarmsRequest(alarm_filter_prefix="Tank01."))
first = await anext(iterator)
await iterator.aclose()
assert first.transition.alarm_full_reference == "Tank01.Level.HiHi"
assert stream.cancelled
assert stub.stream_alarms_metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert stub.stream_alarms_request.alarm_filter_prefix == "Tank01."
# ---- CLI happy-path coverage for the new subcommands ----
def _install_fake_connect(monkeypatch, stub: Any) -> dict[str, Any]:
"""Patch `GatewayClient.connect` so the CLI uses the supplied fake stub."""
captured: dict[str, Any] = {}
real_connect = GatewayClient.connect
@classmethod
async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient:
captured["options"] = options
return await real_connect(options, stub=stub)
monkeypatch.setattr(GatewayClient, "connect", _spy_connect)
return captured
def test_cli_read_bulk_happy_path(monkeypatch) -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_READ_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
read_bulk=pb.BulkReadReply(
results=[
pb.BulkReadResult(
tag_address="Tank01.Level",
was_successful=True,
),
],
),
),
],
)
_install_fake_connect(monkeypatch, stub)
runner = CliRunner()
result = runner.invoke(
main,
[
"read-bulk",
"--endpoint",
"localhost:5000",
"--plaintext",
"--session-id",
"session-1",
"--server-handle",
"12",
"--items",
"Tank01.Level",
"--timeout-ms",
"1500",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["results"][0]["tagAddress"] == "Tank01.Level"
def test_cli_write_bulk_happy_path(monkeypatch) -> None:
stub = _BulkFakeStub()
stub.set_invoke_replies(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
write_bulk=pb.BulkWriteReply(
results=[pb.BulkWriteResult(was_successful=True)],
),
),
],
)
_install_fake_connect(monkeypatch, stub)
runner = CliRunner()
result = runner.invoke(
main,
[
"write-bulk",
"--endpoint",
"localhost:5000",
"--plaintext",
"--session-id",
"session-1",
"--server-handle",
"12",
"--item-handles",
"34",
"--values",
"123",
"--type",
"int32",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["results"][0]["wasSuccessful"] is True
cmd = stub.invoke.requests[0].command
assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK
def test_cli_stream_alarms_happy_path(monkeypatch) -> None:
transitions = [
pb.AlarmFeedMessage(
transition=pb.OnAlarmTransitionEvent(
alarm_full_reference="Tank01.Level.HiHi",
transition_kind=pb.ALARM_TRANSITION_KIND_RAISE,
),
),
]
stream = _AlarmFakeStream(transitions)
stub = _BulkFakeStub()
stub.set_alarm_stream(stream)
_install_fake_connect(monkeypatch, stub)
runner = CliRunner()
result = runner.invoke(
main,
[
"stream-alarms",
"--endpoint",
"localhost:5000",
"--plaintext",
"--max-messages",
"1",
"--timeout",
"5.0",
"--filter-prefix",
"Tank01.",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["messages"][0]["transition"]["alarmFullReference"] == "Tank01.Level.HiHi"
def test_cli_acknowledge_alarm_happy_path(monkeypatch) -> None:
stub = _BulkFakeStub()
stub.acknowledge_alarm = _BulkFakeUnary(
[
pb.AcknowledgeAlarmReply(
correlation_id="corr-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK),
),
],
)
stub.AcknowledgeAlarm = stub.acknowledge_alarm
_install_fake_connect(monkeypatch, stub)
runner = CliRunner()
result = runner.invoke(
main,
[
"acknowledge-alarm",
"--endpoint",
"localhost:5000",
"--plaintext",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"investigating",
"--operator",
"alice",
"--json",
],
)
assert result.exit_code == 0, result.output
captured_request = stub.acknowledge_alarm.requests[0]
assert captured_request.alarm_full_reference == "Tank01.Level.HiHi"
assert captured_request.comment == "investigating"
assert captured_request.operator_user == "alice"
# ---------------------------------------------------------------------------
# Client.Python-026 — `import time` at module scope; tighter cleanup excepts.
# ---------------------------------------------------------------------------
def test_commands_module_imports_time_at_module_scope() -> None:
"""`time` must be imported at module scope, not inside `_bench_read_bulk`.
`inspect.getsource(_bench_read_bulk)` must not contain a function-local
``import time`` statement.
"""
import inspect
source = inspect.getsource(cli_commands._bench_read_bulk)
# The function body must NOT contain a function-local `import time` line.
for line in source.splitlines():
stripped = line.strip()
assert stripped != "import time", (
f"_bench_read_bulk must not have function-local `import time`: {line!r}"
)
# And the module-level `time` attribute must be present.
assert hasattr(cli_commands, "time"), (
"`time` must be imported at module scope on commands.py"
)
assert cli_commands.time is _time_module_ref
def test_commands_module_bench_read_bulk_does_not_use_bare_except_pass() -> None:
"""The two `except Exception: pass` cleanup blocks in `_bench_read_bulk`
must be removed in favour of either logging or a narrower exception class.
"""
import inspect
source = inspect.getsource(cli_commands._bench_read_bulk)
# Reject the bare `except Exception:` followed by `pass` pattern in
# `_bench_read_bulk`. We tolerate `except Exception as <name>:` because the
# fix logs the exception.
pattern = re.compile(r"except\s+Exception\s*:\s*\n\s*pass\b")
assert not pattern.search(source), (
"_bench_read_bulk cleanup blocks must log or narrow the except clause"
)
+117 -6
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-24 |
| Commit reviewed | `42b0037` |
| Status | Re-reviewed |
| Open findings | 5 |
| Open findings | 0 |
## Checklist coverage
@@ -835,7 +835,7 @@ parity fix.
| Severity | High |
| Category | Documentation & comments |
| Location | `clients/python/README.md:201-202`, `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:389-420` |
| Status | Open |
| Status | Resolved |
**Description:** The README CLI examples added by commit `8738735` for the
new alarm subcommands cite flags the CLI does not accept:
@@ -868,6 +868,19 @@ rename the CLI option to `--alarm-reference` and add a test that copy-pastes
the README examples through `CliRunner` to assert they parse. Option (1) is
the smaller change.
**Resolution:** 2026-05-24 — Fixed the README examples to match the
implementation (option 1, smaller change). `clients/python/README.md:201-202`
now reads `mxgw-py stream-alarms --max-messages 1 --json` and
`mxgw-py acknowledge-alarm --reference "\\Galaxy\Area001.Pump001.PumpFault" --json`
`--session-id` is dropped from both lines (the alarm feed is gateway-served,
session-less) and `--alarm-reference` is renamed to the real `--reference` flag.
Regression test
`tests/test_review_findings_022_to_026.py::test_readme_alarm_examples_parse_against_cli`
extracts every `mxgw-py …` line from the README, appends `--help` so only the
parser runs, and asserts that no example produces a `no such option` Click error.
Failed before the fix (the original `stream-alarms --session-id <id> …` line
emitted `Error: No such option: --session-id`), passes after.
### Client.Python-023
| Field | Value |
@@ -875,7 +888,7 @@ the smaller change.
| Severity | Medium |
| Category | Security |
| Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:901-906` |
| Status | Open |
| Status | Resolved |
**Description:** Client.Python-013 (severity Medium, Security) was marked
**Resolved** on 2026-05-20 with the explicit claim that the silent
@@ -919,6 +932,31 @@ is marked Resolved with a 2026-05-20 commit reference, do **not** silently
re-resolve this finding — keep it Open with a fresh ID so the regression
audit trail is preserved.
**Resolution:** 2026-05-24 — Re-applied the Client.Python-013 fix on the
renamed CLI module. Dropped the `endpoint.startswith("localhost:") or
endpoint.startswith("127.0.0.1:")` auto-plaintext branch from
`_use_plaintext` in `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py`.
TLS is now the default and `--plaintext` is the only way to opt in to
plaintext; `--tls` is accepted as a redundant affirmation and the two
flags combined raise `click.UsageError`. Regression tests live in
`tests/test_review_findings_022_to_026.py`:
`test_use_plaintext_does_not_auto_downgrade_for_localhost_endpoint` and
`test_use_plaintext_does_not_auto_downgrade_for_loopback_ipv4_endpoint`
exercise the bare-endpoint path,
`test_use_plaintext_requires_explicit_plaintext_flag` and
`test_use_plaintext_tls_flag_explicitly_disables_plaintext` pin the explicit
opt-in / opt-out contract,
`test_use_plaintext_rejects_plaintext_and_tls_combined` asserts mutual
exclusivity, and
`test_cli_localhost_endpoint_with_no_flags_uses_tls_channel` is an
end-to-end CliRunner test that intercepts `GatewayClient.connect` and
asserts the resolved `ClientOptions.plaintext` is `False` for a
`localhost:5000` endpoint without `--plaintext`. All five tests failed
against the pre-fix source and pass against the fix. **Behaviour change for
callers:** scripts that previously relied on
`mxgw-py … --endpoint localhost:5000 …` selecting plaintext silently must
now add an explicit `--plaintext` flag (or set up TLS on the gateway).
### Client.Python-024
| Field | Value |
@@ -926,7 +964,7 @@ audit trail is preserved.
| Severity | Medium |
| Category | Code organization & conventions |
| Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:13,48-119` |
| Status | Open |
| Status | Resolved |
**Description:** The new `batch` subcommand (commit `71d2c39`) implements
the cross-language batch protocol by importing `click.testing.CliRunner`
@@ -965,6 +1003,33 @@ batch loop can interleave inner-command output with the
a regression test that drives `batch` with `batch\n` on stdin and asserts
recursive invocation is either rejected or correctly bounded.
**Resolution:** 2026-05-24 — Removed the `from click.testing import CliRunner`
import and the `CliRunner()` instantiation from
`clients/python/src/zb_mom_ww_mxgateway_cli/commands.py`. The `batch`
command body now dispatches each stdin line through a new
`_dispatch_batch_line` helper that calls `main.main(args=…,
standalone_mode=False, prog_name="mxgw-py")` directly and captures the
subcommand's stdout via `contextlib.redirect_stdout(io.StringIO())`. Click
exit conditions (`click.exceptions.Exit`, `click.ClickException`,
`SystemExit`) are caught and rendered as
`{"error": …, "type": …}` JSON; arbitrary exceptions are caught with a
broad `except Exception` so the batch loop never dies. A nested `batch`
line is rejected outright with a `RecursiveBatchError` JSON record before
the dispatcher runs, eliminating the silent-recursive-spawn footgun the
original `CliRunner.invoke(main, ["batch"], …)` path enabled. Regression
tests:
`tests/test_review_findings_022_to_026.py::test_batch_command_does_not_use_clirunner_in_production`
asserts the production module no longer imports `from click.testing` or
calls `CliRunner(`; and
`test_batch_recursive_batch_line_is_bounded` drives a `batch\nversion --json\n`
stdin payload and asserts the recursive `batch` line emits an error JSON
record rather than silently exiting. The pre-existing batch tests
(`test_batch_runs_version_command_and_writes_eor`,
`test_batch_terminates_on_empty_line`,
`test_batch_continues_after_error_line`) still pass against the new
implementation, confirming the wire-level contract (one EOR per line,
clean JSON error blocks) is preserved.
### Client.Python-025
| Field | Value |
@@ -972,7 +1037,7 @@ recursive invocation is either rejected or correctly bounded.
| Severity | Low |
| Category | Testing coverage |
| Location | `clients/python/tests/test_cli.py`, `clients/python/src/zb_mom_ww_mxgateway/{client.py,session.py}`, `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py` |
| Status | Open |
| Status | Resolved |
**Description:** Commits `6add4b4` and `828e3e6` added five new SDK
methods (`Session.read_bulk`, `Session.write_bulk`, `Session.write2_bulk`,
@@ -1020,6 +1085,32 @@ applied to the renamed bench). At minimum, add a request-shape test for
`write_secured_bulk` since the secured family is the highest-risk
parity surface.
**Resolution:** 2026-05-24 — Added behavioural test coverage for the five
new bulk SDK methods, `stream_alarms`, and the new CLI subcommand bodies
in `tests/test_review_findings_022_to_026.py`. Request-shape tests
(`test_session_read_bulk_sends_expected_request_shape`,
`test_session_write_bulk_sends_expected_request_shape`,
`test_session_write2_bulk_sends_expected_request_shape`,
`test_session_write_secured_bulk_sends_expected_request_shape`,
`test_session_write_secured2_bulk_sends_expected_request_shape`) drive
each `Session.*_bulk` method against a fake `Invoke` stub and assert
the captured `MxCommand`'s `kind`, sub-message, `server_handle`, and
per-entry fields (including `current_user_id` / `verifier_user_id`
on the secured family — the highest-risk parity surface the finding
calls out). `test_stream_alarms_yields_feed_messages_and_cancels_on_close`
covers the `GatewayClient.stream_alarms` happy path including the
`_canceling_alarm_feed_iterator` cancel-on-close contract and the
authorization metadata header. CLI happy-path tests
(`test_cli_read_bulk_happy_path`, `test_cli_write_bulk_happy_path`,
`test_cli_stream_alarms_happy_path`, `test_cli_acknowledge_alarm_happy_path`)
each drive their subcommand through `CliRunner` against a fake stub
injected via a monkeypatched `GatewayClient.connect` and assert the
emitted JSON shape and that the captured RPC request carries the
expected fields. The four CLI happy-path tests passed even before any
production fix (the implementations were correct; the finding is a
coverage gap), but they now exist as regression guards against future
drift. No source change — pure coverage finding.
### Client.Python-026
| Field | Value |
@@ -1027,7 +1118,7 @@ parity surface.
| Severity | Low |
| Category | Correctness & logic bugs |
| Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:674-738` |
| Status | Open |
| Status | Resolved |
**Description:** Two minor quality issues in the new `_bench_read_bulk`
body (commit `6add4b4`):
@@ -1060,3 +1151,23 @@ module-level `logger = logging.getLogger(__name__)`. No behavioural
change in the happy path; failure path becomes diagnosable. No new test
required for the import hoist; the logger change is exercised by the
existing bench smoke test once `caplog` is added to the test signature.
**Resolution:** 2026-05-24 — Hoisted `import time` to the module-level
import block in `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py`
alongside the existing standard-library imports; the function-local
`import time` line at the top of `_bench_read_bulk` is gone. Added a
module-level `logger = logging.getLogger(__name__)` and rewrote the two
`finally` cleanup blocks to bind the swallowed exception and log it at
`WARNING` level — `unsubscribe_bulk` failures now emit
`"bench-read-bulk: unsubscribe_bulk cleanup failed: %s"` and the
`session.close()` failure path emits the equivalent — so a future
regression in the cleanup path is diagnosable at the next benchmark run
rather than silently corrupting subscription bookkeeping. Regression
tests in `tests/test_review_findings_022_to_026.py`:
`test_commands_module_imports_time_at_module_scope` uses
`inspect.getsource(_bench_read_bulk)` to assert no function-local
`import time` line, and asserts the module exposes `time` at module
scope; `test_commands_module_bench_read_bulk_does_not_use_bare_except_pass`
greps the function source for the `except Exception:\n pass` pattern
and rejects it. Both tests failed against the pre-fix source and pass
against the fix.