diff --git a/clients/python/README.md b/clients/python/README.md index 037196f..43a8e1e 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -198,8 +198,8 @@ mxgw-py register --session-id --client-name python-client --json mxgw-py add-item --session-id --server-handle 1 --item Object.Attribute --json mxgw-py advise --session-id --server-handle 1 --item-handle 2 --json mxgw-py stream-events --session-id --max-events 1 --json -mxgw-py stream-alarms --session-id --max-messages 1 --json -mxgw-py acknowledge-alarm --session-id --alarm-reference "\\Galaxy\Area001.Pump001.PumpFault" --json +mxgw-py stream-alarms --max-messages 1 --json +mxgw-py acknowledge-alarm --reference "\\Galaxy\Area001.Pump001.PumpFault" --json mxgw-py write --session-id --server-handle 1 --item-handle 2 --type int32 --value 123 --json ``` diff --git a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py index c486b69..21929d7 100644 --- a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py +++ b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py @@ -3,15 +3,18 @@ from __future__ import annotations import asyncio +import contextlib +import io import json +import logging import os import sys +import time from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import Any import click -from click.testing import CliRunner from google.protobuf.json_format import MessageToDict from zb_mom_ww_mxgateway import __version__ @@ -22,6 +25,8 @@ from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb from zb_mom_ww_mxgateway.options import ClientOptions from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value +logger = logging.getLogger(__name__) + MAX_AGGREGATE_EVENTS = 10_000 _BATCH_EOR = "__MXGW_BATCH_EOR__" @@ -56,9 +61,10 @@ def batch() -> None: Errors do NOT terminate the loop. Each command's output (including any error JSON) is written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``. - """ - runner = CliRunner() + Recursive ``batch`` lines are rejected (Client.Python-024) — re-entering the batch + dispatcher would silently spawn a nested loop reading from the same exhausted stdin. + """ for raw_line in sys.stdin: line = raw_line.rstrip("\n").rstrip("\r") @@ -68,44 +74,77 @@ def batch() -> None: args = line.split() - try: - result = runner.invoke(main, args, catch_exceptions=True) - except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die - _batch_write_error(exc.__class__.__name__, str(exc)) + # Reject a recursive `batch` line outright: the nested invocation would + # read from the already-exhausted stdin (or, depending on harness, the + # same stream the outer batch is consuming line-by-line) and silently + # exit. Surface it as an explicit error block so callers can audit the + # mis-routed line. + if args and args[0] == "batch": + _batch_write_error( + "RecursiveBatchError", + "nested 'batch' invocation is not allowed inside batch mode", + ) _batch_flush_eor() continue - if result.exit_code == 0: - # Normal success — write captured output as-is. - sys.stdout.write(result.output) + _dispatch_batch_line(args) + + +def _dispatch_batch_line(args: list[str]) -> None: + """Run a single batch line through the Click parser directly (no CliRunner). + + Captures the subcommand's stdout via :func:`contextlib.redirect_stdout` and + synthesises the standard ``{"error": ..., "type": ...}`` shape on failure. + Click exceptions (`ClickException`, `UsageError`) are caught and rendered; + `SystemExit(0)` from a Click command is treated as a clean exit, while a + non-zero `SystemExit` is rendered as a CLI error. All other exceptions are + captured and rendered as `{"error": str(exc), "type": exc.__class__.__name__}` + so the loop never dies. + """ + + buffer = io.StringIO() + exit_code = 0 + exc: BaseException | None = None + try: + with contextlib.redirect_stdout(buffer): + try: + # `standalone_mode=False` makes Click raise instead of calling + # `sys.exit`; we still need to handle SystemExit because some + # commands explicitly raise it (or `click.UsageError` converts + # to a SystemExit under some entry-point paths). + main.main(args=args, standalone_mode=False, prog_name="mxgw-py") + except click.exceptions.Exit as click_exit: + exit_code = click_exit.exit_code + except click.ClickException as click_exc: + exit_code = click_exc.exit_code + exc = click_exc + click.echo(f"Error: {click_exc.format_message()}", err=False) + except SystemExit as sys_exit: + code = sys_exit.code + exit_code = int(code) if isinstance(code, int) else (0 if code is None else 1) + except Exception as captured: # noqa: BLE001 — never let batch loop die + exc = captured + exit_code = 1 + + output = buffer.getvalue() + if exit_code == 0 and exc is None: + sys.stdout.write(output) + else: + if output.lstrip().startswith("{"): + # Inner command already emitted JSON (e.g. a structured error) — + # relay verbatim. + sys.stdout.write(output) + if output and not output.endswith("\n"): + sys.stdout.write("\n") + elif exc is not None: + _batch_write_error(type(exc).__name__, str(exc)) else: - # Something went wrong. If the command already emitted a JSON object - # (e.g. the output starts with '{'), trust that and relay it verbatim. - # Otherwise synthesise the standard {"error": ..., "type": ...} shape. - output = result.output or "" - exc = result.exception + msg = output.strip() + if msg.startswith("Error: "): + msg = msg[len("Error: "):] + _batch_write_error("CliError", msg) - if output.lstrip().startswith("{"): - # Already JSON — relay verbatim (may or may not end with newline). - sys.stdout.write(output) - if not output.endswith("\n"): - sys.stdout.write("\n") - elif exc is not None and not isinstance(exc, SystemExit): - _batch_write_error(type(exc).__name__, str(exc)) - else: - # Click's default error format is "Error: \n"; extract the - # message so the harness gets clean JSON. - msg = output.strip() - if msg.startswith("Error: "): - msg = msg[len("Error: "):] - exc_type = ( - type(exc).__name__ - if exc is not None and not isinstance(exc, SystemExit) - else "CliError" - ) - _batch_write_error(exc_type, msg) - - _batch_flush_eor() + _batch_flush_eor() def _batch_write_error(exc_type: str, message: str) -> None: @@ -673,7 +712,6 @@ async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: """ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema.""" - import time bulk_size = int(kwargs["bulk_size"]) if bulk_size < 1: @@ -730,12 +768,12 @@ async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: if item_handles: try: await session.unsubscribe_bulk(server_handle, item_handles) - except Exception: - pass + except Exception as exc: # noqa: BLE001 — bench is best-effort + logger.warning("bench-read-bulk: unsubscribe_bulk cleanup failed: %s", exc) try: await session.close() - except Exception: - pass + except Exception as exc: # noqa: BLE001 — bench is best-effort + logger.warning("bench-read-bulk: session.close cleanup failed: %s", exc) return { "language": "python", @@ -899,11 +937,21 @@ def _session(client: GatewayClient, session_id: str): def _use_plaintext(kwargs: dict[str, Any]) -> bool: - if kwargs.get("use_tls"): - return False - if kwargs.get("plaintext"): - return True - return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:") + """Resolve the plaintext / TLS contract from the CLI flags. + + TLS is the default. ``--plaintext`` is the only way to opt in to an + unencrypted channel; ``--tls`` is accepted as a redundant explicit + affirmation. Combining the two is a usage error (regression-guarded by + Client.Python-023 — the previous silent ``localhost:`` / + ``127.0.0.1:`` auto-plaintext branch leaked the API-key bearer over a + plaintext channel when a user ran the gateway behind TLS on loopback). + """ + + plaintext = bool(kwargs.get("plaintext")) + use_tls = bool(kwargs.get("use_tls")) + if plaintext and use_tls: + raise click.UsageError("--plaintext and --tls are mutually exclusive") + return plaintext def _api_key_from_env(name: str | None) -> str | None: diff --git a/clients/python/tests/test_review_findings_022_to_026.py b/clients/python/tests/test_review_findings_022_to_026.py new file mode 100644 index 0000000..b9a046e --- /dev/null +++ b/clients/python/tests/test_review_findings_022_to_026.py @@ -0,0 +1,789 @@ +"""Regression tests for Client.Python-022..026. + +Each test corresponds to a finding from the latest re-review. Tests are +TDD-first — they failed against the pre-fix source and pass against the +fixed source. +""" + +from __future__ import annotations + +import json +import re +import time as _time_module_ref +from pathlib import Path +from typing import Any + +import pytest +from click.testing import CliRunner + +from zb_mom_ww_mxgateway import ClientOptions, GatewayClient +from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb +from zb_mom_ww_mxgateway_cli import commands as cli_commands +from zb_mom_ww_mxgateway_cli.commands import _use_plaintext, main + +_BATCH_EOR = "__MXGW_BATCH_EOR__" + + +# --------------------------------------------------------------------------- +# Client.Python-022 — README CLI examples must parse against the implementation. +# --------------------------------------------------------------------------- + + +def _readme_path() -> Path: + return Path(__file__).resolve().parent.parent / "README.md" + + +def _extract_mxgw_py_examples() -> list[list[str]]: + """Return the README's ``mxgw-py ...`` example lines as click arg lists. + + Replaces angle-bracket placeholders (````) with safe stub values and + leaves real flag names untouched. The returned arg lists drop the + ``mxgw-py`` prefix. + """ + + text = _readme_path().read_text(encoding="utf-8") + args: list[list[str]] = [] + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line.startswith("mxgw-py "): + continue + # Strip the leading "mxgw-py " token. + body = line[len("mxgw-py ") :] + # Replace common placeholders so click does not error on the placeholder. + body = body.replace("", "session-1") + # Backtick-quoted hostnames in the TLS example are not represented + # in CLI; safe to leave as-is. + tokens = _split_cli_tokens(body) + # Keep only examples that exercise a real subcommand. Skip TLS + # multi-flag example (we only need the README CLI examples added in + # commits 8738735 — stream-alarms / acknowledge-alarm). + args.append(tokens) + return args + + +def _split_cli_tokens(body: str) -> list[str]: + """Split a CLI body into argv tokens, honouring double-quoted strings.""" + + tokens: list[str] = [] + pattern = re.compile(r'"([^"]*)"|(\S+)') + for match in pattern.finditer(body): + quoted, plain = match.group(1), match.group(2) + tokens.append(quoted if quoted is not None else plain) + return tokens + + +def test_readme_alarm_examples_parse_against_cli() -> None: + """README `stream-alarms` / `acknowledge-alarm` examples must parse without + triggering Click's ``no such option`` error. + + Drives every README ``mxgw-py`` example through Click's ``--help`` style + parser by re-invoking the documented argv with a trailing ``--help`` flag so + only the parser runs (no RPC is attempted). If a documented flag does not + exist on the subcommand, Click prints ``no such option: --`` and + exits 2 — that is the regression we want to catch. + """ + + runner = CliRunner() + examples = _extract_mxgw_py_examples() + assert any( + "stream-alarms" in args for args in examples + ), "README must include a stream-alarms example." + assert any( + "acknowledge-alarm" in args for args in examples + ), "README must include an acknowledge-alarm example." + + for argv in examples: + # Strip "--json" (already a real flag) and any value-bearing flag that + # requires a host/file/value, then append --help so we exercise the + # parser only. + # We just append --help — Click parses all options up to --help and + # then prints help; an unknown option still errors out first. + result = runner.invoke(main, [*argv, "--help"]) + # Either help text printed (exit 0) or some other parser issue (exit 2); + # we only want to assert NO "no such option" error. + assert "no such option" not in result.output.lower(), ( + f"README example failed Click parsing: argv={argv!r}\n" + f"output={result.output!r}" + ) + + +# --------------------------------------------------------------------------- +# Client.Python-023 — REGRESSION of Client.Python-013. _use_plaintext must +# not silently auto-downgrade on localhost / 127.0.0.1. +# --------------------------------------------------------------------------- + + +def test_use_plaintext_does_not_auto_downgrade_for_localhost_endpoint() -> None: + """A bare ``localhost:...`` endpoint with no flags must default to TLS.""" + + assert _use_plaintext({ + "endpoint": "localhost:5001", + "plaintext": False, + "use_tls": False, + }) is False + + +def test_use_plaintext_does_not_auto_downgrade_for_loopback_ipv4_endpoint() -> None: + """A bare ``127.0.0.1:...`` endpoint with no flags must default to TLS.""" + + assert _use_plaintext({ + "endpoint": "127.0.0.1:5001", + "plaintext": False, + "use_tls": False, + }) is False + + +def test_use_plaintext_requires_explicit_plaintext_flag() -> None: + """``--plaintext`` is the only way to opt in.""" + + assert _use_plaintext({ + "endpoint": "localhost:5001", + "plaintext": True, + "use_tls": False, + }) is True + + +def test_use_plaintext_tls_flag_explicitly_disables_plaintext() -> None: + """``--tls`` is accepted as an explicit affirmation of the default.""" + + assert _use_plaintext({ + "endpoint": "localhost:5001", + "plaintext": False, + "use_tls": True, + }) is False + + +def test_use_plaintext_rejects_plaintext_and_tls_combined() -> None: + """``--plaintext`` and ``--tls`` together must be rejected as ambiguous.""" + + import click as _click + + with pytest.raises(_click.UsageError): + _use_plaintext({ + "endpoint": "localhost:5001", + "plaintext": True, + "use_tls": True, + }) + + +def test_cli_localhost_endpoint_with_no_flags_uses_tls_channel(monkeypatch) -> None: + """End-to-end CLI: against ``localhost:...`` with no flags, the resolved + ``ClientOptions.plaintext`` flowing into ``GatewayClient.connect`` must be + ``False`` (TLS), so the API key bearer cannot leak over plaintext. + """ + + captured: dict[str, Any] = {} + + class _FakeStub: + def __init__(self) -> None: + pass + + async def OpenSession(self, request: Any, *, metadata: tuple[Any, ...]) -> Any: + captured["metadata"] = metadata + return pb.OpenSessionReply( + session_id="session-1", + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + ) + + real_connect = GatewayClient.connect + + @classmethod + async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient: + captured["options"] = options + return await real_connect(options, stub=_FakeStub()) + + monkeypatch.setattr(GatewayClient, "connect", _spy_connect) + + runner = CliRunner() + result = runner.invoke( + main, + [ + "open-session", + "--endpoint", + "localhost:5000", + "--api-key", + "mxgw_test_secret", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + assert "options" in captured + assert captured["options"].plaintext is False, ( + "localhost endpoint without --plaintext must NOT auto-downgrade to plaintext" + ) + + +# --------------------------------------------------------------------------- +# Client.Python-024 — `batch` must not use CliRunner from production code, +# and a recursive `batch` line must not silently re-enter. +# --------------------------------------------------------------------------- + + +def test_batch_command_does_not_use_clirunner_in_production() -> None: + """`commands.py` must not import or instantiate the test-only CliRunner helper. + + Docstring references explaining what the module deliberately avoids are + permitted; what is forbidden is an actual ``import`` of ``click.testing`` + or an actual ``CliRunner()`` instantiation in executable code. + """ + + source = Path(cli_commands.__file__).read_text(encoding="utf-8") + assert "from click.testing" not in source, ( + "click.testing is a test-only helper and must not be used by production code" + ) + assert "import click.testing" not in source, ( + "click.testing is a test-only helper and must not be used by production code" + ) + # `CliRunner()` (instantiation) must not appear in production code. + assert "CliRunner(" not in source, ( + "CliRunner() must not be instantiated in production code" + ) + + +def test_batch_recursive_batch_line_is_bounded() -> None: + """A `batch` line nested inside `batch` stdin must not be silently spawned. + + The pre-fix implementation re-invoked the test runner with empty stdin, + so `batch` inside `batch` exited cleanly with no error. The fix either + rejects the nested invocation or surfaces it as an error block so the + behaviour is auditable. + """ + + runner = CliRunner() + result = runner.invoke( + main, + ["batch"], + input="batch\nversion --json\n", + ) + + # Outer batch must still exit 0 and process both lines. + assert result.exit_code == 0 + assert result.output.count(_BATCH_EOR) == 2 + + blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block] + # The first block — the recursive `batch` line — must surface an error + # JSON. (Either an explicit rejection, or some non-empty error block — + # NOT a silently empty block.) + first_block = blocks[0].strip() + assert first_block, "recursive batch line must not be silently swallowed" + payload = json.loads(first_block.splitlines()[-1]) + assert "error" in payload, ( + f"recursive batch line should surface an error: got {payload!r}" + ) + + +# --------------------------------------------------------------------------- +# Client.Python-025 — Behavioural tests for new bulk SDK methods, +# stream_alarms, and the new CLI subcommands. +# --------------------------------------------------------------------------- + + +class _AlarmFakeStream: + def __init__(self, messages: list[pb.AlarmFeedMessage]) -> None: + self._messages = list(messages) + self.cancelled = False + + def __aiter__(self) -> "_AlarmFakeStream": + return self + + async def __anext__(self) -> pb.AlarmFeedMessage: + if not self._messages: + raise StopAsyncIteration + return self._messages.pop(0) + + def cancel(self) -> None: + self.cancelled = True + + +class _BulkFakeUnary: + def __init__(self, replies: list[Any]) -> None: + self.replies = replies + self.requests: list[Any] = [] + self.metadata: tuple[tuple[str, str], ...] | None = None + + async def __call__(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any: + self.requests.append(request) + self.metadata = metadata + return self.replies.pop(0) + + +class _BulkFakeStub: + def __init__(self) -> None: + self.open_session = _BulkFakeUnary( + [ + pb.OpenSessionReply( + session_id="session-1", + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + ), + ], + ) + self.invoke = _BulkFakeUnary([]) + self.OpenSession = self.open_session + self.Invoke = self.invoke + self.stream_alarms_metadata: tuple[tuple[str, str], ...] | None = None + self._alarm_stream = _AlarmFakeStream([]) + + def set_invoke_replies(self, replies: list[Any]) -> None: + self.invoke.replies = replies + + def set_alarm_stream(self, stream: _AlarmFakeStream) -> None: + self._alarm_stream = stream + + def StreamAlarms(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any: + self.stream_alarms_request = request + self.stream_alarms_metadata = metadata + return self._alarm_stream + + +@pytest.mark.asyncio +async def test_session_read_bulk_sends_expected_request_shape() -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_READ_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + read_bulk=pb.BulkReadReply( + results=[ + pb.BulkReadResult( + tag_address="Tank01.Level", + was_successful=True, + ), + ], + ), + ), + ], + ) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + session = await client.open_session() + + results = await session.read_bulk(12, ["Tank01.Level"], timeout_ms=1500) + + assert len(results) == 1 + assert results[0].tag_address == "Tank01.Level" + request = stub.invoke.requests[0] + assert request.command.kind == pb.MX_COMMAND_KIND_READ_BULK + assert request.command.read_bulk.server_handle == 12 + assert list(request.command.read_bulk.tag_addresses) == ["Tank01.Level"] + assert request.command.read_bulk.timeout_ms == 1500 + + +@pytest.mark.asyncio +async def test_session_write_bulk_sends_expected_request_shape() -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_WRITE_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + write_bulk=pb.BulkWriteReply( + results=[pb.BulkWriteResult(was_successful=True)], + ), + ), + ], + ) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + session = await client.open_session() + + from zb_mom_ww_mxgateway.values import to_mx_value + + entries = [ + pb.WriteBulkEntry(item_handle=34, user_id=99, value=to_mx_value(123)), + ] + results = await session.write_bulk(12, entries) + + assert results[0].was_successful is True + cmd = stub.invoke.requests[0].command + assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK + assert cmd.write_bulk.server_handle == 12 + assert cmd.write_bulk.entries[0].item_handle == 34 + assert cmd.write_bulk.entries[0].user_id == 99 + + +@pytest.mark.asyncio +async def test_session_write2_bulk_sends_expected_request_shape() -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_WRITE2_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + write2_bulk=pb.BulkWriteReply( + results=[pb.BulkWriteResult(was_successful=True)], + ), + ), + ], + ) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + session = await client.open_session() + + from zb_mom_ww_mxgateway.values import to_mx_value + + entries = [ + pb.Write2BulkEntry( + item_handle=34, + user_id=99, + value=to_mx_value(123), + timestamp_value=to_mx_value(1.5), + ), + ] + results = await session.write2_bulk(12, entries) + + assert results[0].was_successful is True + cmd = stub.invoke.requests[0].command + assert cmd.kind == pb.MX_COMMAND_KIND_WRITE2_BULK + assert cmd.write2_bulk.server_handle == 12 + assert cmd.write2_bulk.entries[0].item_handle == 34 + + +@pytest.mark.asyncio +async def test_session_write_secured_bulk_sends_expected_request_shape() -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + write_secured_bulk=pb.BulkWriteReply( + results=[pb.BulkWriteResult(was_successful=True)], + ), + ), + ], + ) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + session = await client.open_session() + + from zb_mom_ww_mxgateway.values import to_mx_value + + entries = [ + pb.WriteSecuredBulkEntry( + item_handle=34, + current_user_id=42, + verifier_user_id=43, + value=to_mx_value("secret"), + ), + ] + results = await session.write_secured_bulk(12, entries) + + assert results[0].was_successful is True + cmd = stub.invoke.requests[0].command + assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED_BULK + assert cmd.write_secured_bulk.server_handle == 12 + assert cmd.write_secured_bulk.entries[0].current_user_id == 42 + assert cmd.write_secured_bulk.entries[0].verifier_user_id == 43 + + +@pytest.mark.asyncio +async def test_session_write_secured2_bulk_sends_expected_request_shape() -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + write_secured2_bulk=pb.BulkWriteReply( + results=[pb.BulkWriteResult(was_successful=True)], + ), + ), + ], + ) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + session = await client.open_session() + + from zb_mom_ww_mxgateway.values import to_mx_value + + entries = [ + pb.WriteSecured2BulkEntry( + item_handle=34, + current_user_id=42, + verifier_user_id=43, + value=to_mx_value("secret"), + timestamp_value=to_mx_value(1.5), + ), + ] + results = await session.write_secured2_bulk(12, entries) + + assert results[0].was_successful is True + cmd = stub.invoke.requests[0].command + assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK + assert cmd.write_secured2_bulk.entries[0].current_user_id == 42 + + +@pytest.mark.asyncio +async def test_stream_alarms_yields_feed_messages_and_cancels_on_close() -> None: + transitions = [ + pb.AlarmFeedMessage( + transition=pb.OnAlarmTransitionEvent( + alarm_full_reference="Tank01.Level.HiHi", + transition_kind=pb.ALARM_TRANSITION_KIND_RAISE, + ), + ), + ] + stream = _AlarmFakeStream(transitions) + stub = _BulkFakeStub() + stub.set_alarm_stream(stream) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + iterator = client.stream_alarms(pb.StreamAlarmsRequest(alarm_filter_prefix="Tank01.")) + first = await anext(iterator) + await iterator.aclose() + + assert first.transition.alarm_full_reference == "Tank01.Level.HiHi" + assert stream.cancelled + assert stub.stream_alarms_metadata == (("authorization", "Bearer mxgw_test_secret"),) + assert stub.stream_alarms_request.alarm_filter_prefix == "Tank01." + + +# ---- CLI happy-path coverage for the new subcommands ---- + + +def _install_fake_connect(monkeypatch, stub: Any) -> dict[str, Any]: + """Patch `GatewayClient.connect` so the CLI uses the supplied fake stub.""" + + captured: dict[str, Any] = {} + real_connect = GatewayClient.connect + + @classmethod + async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient: + captured["options"] = options + return await real_connect(options, stub=stub) + + monkeypatch.setattr(GatewayClient, "connect", _spy_connect) + return captured + + +def test_cli_read_bulk_happy_path(monkeypatch) -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_READ_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + read_bulk=pb.BulkReadReply( + results=[ + pb.BulkReadResult( + tag_address="Tank01.Level", + was_successful=True, + ), + ], + ), + ), + ], + ) + _install_fake_connect(monkeypatch, stub) + + runner = CliRunner() + result = runner.invoke( + main, + [ + "read-bulk", + "--endpoint", + "localhost:5000", + "--plaintext", + "--session-id", + "session-1", + "--server-handle", + "12", + "--items", + "Tank01.Level", + "--timeout-ms", + "1500", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["results"][0]["tagAddress"] == "Tank01.Level" + + +def test_cli_write_bulk_happy_path(monkeypatch) -> None: + stub = _BulkFakeStub() + stub.set_invoke_replies( + [ + pb.MxCommandReply( + session_id="session-1", + kind=pb.MX_COMMAND_KIND_WRITE_BULK, + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + write_bulk=pb.BulkWriteReply( + results=[pb.BulkWriteResult(was_successful=True)], + ), + ), + ], + ) + _install_fake_connect(monkeypatch, stub) + + runner = CliRunner() + result = runner.invoke( + main, + [ + "write-bulk", + "--endpoint", + "localhost:5000", + "--plaintext", + "--session-id", + "session-1", + "--server-handle", + "12", + "--item-handles", + "34", + "--values", + "123", + "--type", + "int32", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["results"][0]["wasSuccessful"] is True + cmd = stub.invoke.requests[0].command + assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK + + +def test_cli_stream_alarms_happy_path(monkeypatch) -> None: + transitions = [ + pb.AlarmFeedMessage( + transition=pb.OnAlarmTransitionEvent( + alarm_full_reference="Tank01.Level.HiHi", + transition_kind=pb.ALARM_TRANSITION_KIND_RAISE, + ), + ), + ] + stream = _AlarmFakeStream(transitions) + stub = _BulkFakeStub() + stub.set_alarm_stream(stream) + _install_fake_connect(monkeypatch, stub) + + runner = CliRunner() + result = runner.invoke( + main, + [ + "stream-alarms", + "--endpoint", + "localhost:5000", + "--plaintext", + "--max-messages", + "1", + "--timeout", + "5.0", + "--filter-prefix", + "Tank01.", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["messages"][0]["transition"]["alarmFullReference"] == "Tank01.Level.HiHi" + + +def test_cli_acknowledge_alarm_happy_path(monkeypatch) -> None: + stub = _BulkFakeStub() + stub.acknowledge_alarm = _BulkFakeUnary( + [ + pb.AcknowledgeAlarmReply( + correlation_id="corr-1", + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK), + ), + ], + ) + stub.AcknowledgeAlarm = stub.acknowledge_alarm + _install_fake_connect(monkeypatch, stub) + + runner = CliRunner() + result = runner.invoke( + main, + [ + "acknowledge-alarm", + "--endpoint", + "localhost:5000", + "--plaintext", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "investigating", + "--operator", + "alice", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + captured_request = stub.acknowledge_alarm.requests[0] + assert captured_request.alarm_full_reference == "Tank01.Level.HiHi" + assert captured_request.comment == "investigating" + assert captured_request.operator_user == "alice" + + +# --------------------------------------------------------------------------- +# Client.Python-026 — `import time` at module scope; tighter cleanup excepts. +# --------------------------------------------------------------------------- + + +def test_commands_module_imports_time_at_module_scope() -> None: + """`time` must be imported at module scope, not inside `_bench_read_bulk`. + + `inspect.getsource(_bench_read_bulk)` must not contain a function-local + ``import time`` statement. + """ + + import inspect + + source = inspect.getsource(cli_commands._bench_read_bulk) + # The function body must NOT contain a function-local `import time` line. + for line in source.splitlines(): + stripped = line.strip() + assert stripped != "import time", ( + f"_bench_read_bulk must not have function-local `import time`: {line!r}" + ) + + # And the module-level `time` attribute must be present. + assert hasattr(cli_commands, "time"), ( + "`time` must be imported at module scope on commands.py" + ) + assert cli_commands.time is _time_module_ref + + +def test_commands_module_bench_read_bulk_does_not_use_bare_except_pass() -> None: + """The two `except Exception: pass` cleanup blocks in `_bench_read_bulk` + must be removed in favour of either logging or a narrower exception class. + """ + + import inspect + + source = inspect.getsource(cli_commands._bench_read_bulk) + # Reject the bare `except Exception:` followed by `pass` pattern in + # `_bench_read_bulk`. We tolerate `except Exception as :` because the + # fix logs the exception. + pattern = re.compile(r"except\s+Exception\s*:\s*\n\s*pass\b") + assert not pattern.search(source), ( + "_bench_read_bulk cleanup blocks must log or narrow the except clause" + ) diff --git a/code-reviews/Client.Python/findings.md b/code-reviews/Client.Python/findings.md index 6920ce0..b4a83f5 100644 --- a/code-reviews/Client.Python/findings.md +++ b/code-reviews/Client.Python/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-24 | | Commit reviewed | `42b0037` | | Status | Re-reviewed | -| Open findings | 5 | +| Open findings | 0 | ## Checklist coverage @@ -835,7 +835,7 @@ parity fix. | Severity | High | | Category | Documentation & comments | | Location | `clients/python/README.md:201-202`, `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:389-420` | -| Status | Open | +| Status | Resolved | **Description:** The README CLI examples added by commit `8738735` for the new alarm subcommands cite flags the CLI does not accept: @@ -868,6 +868,19 @@ rename the CLI option to `--alarm-reference` and add a test that copy-pastes the README examples through `CliRunner` to assert they parse. Option (1) is the smaller change. +**Resolution:** 2026-05-24 — Fixed the README examples to match the +implementation (option 1, smaller change). `clients/python/README.md:201-202` +now reads `mxgw-py stream-alarms --max-messages 1 --json` and +`mxgw-py acknowledge-alarm --reference "\\Galaxy\Area001.Pump001.PumpFault" --json` +— `--session-id` is dropped from both lines (the alarm feed is gateway-served, +session-less) and `--alarm-reference` is renamed to the real `--reference` flag. +Regression test +`tests/test_review_findings_022_to_026.py::test_readme_alarm_examples_parse_against_cli` +extracts every `mxgw-py …` line from the README, appends `--help` so only the +parser runs, and asserts that no example produces a `no such option` Click error. +Failed before the fix (the original `stream-alarms --session-id …` line +emitted `Error: No such option: --session-id`), passes after. + ### Client.Python-023 | Field | Value | @@ -875,7 +888,7 @@ the smaller change. | Severity | Medium | | Category | Security | | Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:901-906` | -| Status | Open | +| Status | Resolved | **Description:** Client.Python-013 (severity Medium, Security) was marked **Resolved** on 2026-05-20 with the explicit claim that the silent @@ -919,6 +932,31 @@ is marked Resolved with a 2026-05-20 commit reference, do **not** silently re-resolve this finding — keep it Open with a fresh ID so the regression audit trail is preserved. +**Resolution:** 2026-05-24 — Re-applied the Client.Python-013 fix on the +renamed CLI module. Dropped the `endpoint.startswith("localhost:") or +endpoint.startswith("127.0.0.1:")` auto-plaintext branch from +`_use_plaintext` in `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py`. +TLS is now the default and `--plaintext` is the only way to opt in to +plaintext; `--tls` is accepted as a redundant affirmation and the two +flags combined raise `click.UsageError`. Regression tests live in +`tests/test_review_findings_022_to_026.py`: +`test_use_plaintext_does_not_auto_downgrade_for_localhost_endpoint` and +`test_use_plaintext_does_not_auto_downgrade_for_loopback_ipv4_endpoint` +exercise the bare-endpoint path, +`test_use_plaintext_requires_explicit_plaintext_flag` and +`test_use_plaintext_tls_flag_explicitly_disables_plaintext` pin the explicit +opt-in / opt-out contract, +`test_use_plaintext_rejects_plaintext_and_tls_combined` asserts mutual +exclusivity, and +`test_cli_localhost_endpoint_with_no_flags_uses_tls_channel` is an +end-to-end CliRunner test that intercepts `GatewayClient.connect` and +asserts the resolved `ClientOptions.plaintext` is `False` for a +`localhost:5000` endpoint without `--plaintext`. All five tests failed +against the pre-fix source and pass against the fix. **Behaviour change for +callers:** scripts that previously relied on +`mxgw-py … --endpoint localhost:5000 …` selecting plaintext silently must +now add an explicit `--plaintext` flag (or set up TLS on the gateway). + ### Client.Python-024 | Field | Value | @@ -926,7 +964,7 @@ audit trail is preserved. | Severity | Medium | | Category | Code organization & conventions | | Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:13,48-119` | -| Status | Open | +| Status | Resolved | **Description:** The new `batch` subcommand (commit `71d2c39`) implements the cross-language batch protocol by importing `click.testing.CliRunner` @@ -965,6 +1003,33 @@ batch loop can interleave inner-command output with the a regression test that drives `batch` with `batch\n` on stdin and asserts recursive invocation is either rejected or correctly bounded. +**Resolution:** 2026-05-24 — Removed the `from click.testing import CliRunner` +import and the `CliRunner()` instantiation from +`clients/python/src/zb_mom_ww_mxgateway_cli/commands.py`. The `batch` +command body now dispatches each stdin line through a new +`_dispatch_batch_line` helper that calls `main.main(args=…, +standalone_mode=False, prog_name="mxgw-py")` directly and captures the +subcommand's stdout via `contextlib.redirect_stdout(io.StringIO())`. Click +exit conditions (`click.exceptions.Exit`, `click.ClickException`, +`SystemExit`) are caught and rendered as +`{"error": …, "type": …}` JSON; arbitrary exceptions are caught with a +broad `except Exception` so the batch loop never dies. A nested `batch` +line is rejected outright with a `RecursiveBatchError` JSON record before +the dispatcher runs, eliminating the silent-recursive-spawn footgun the +original `CliRunner.invoke(main, ["batch"], …)` path enabled. Regression +tests: +`tests/test_review_findings_022_to_026.py::test_batch_command_does_not_use_clirunner_in_production` +asserts the production module no longer imports `from click.testing` or +calls `CliRunner(`; and +`test_batch_recursive_batch_line_is_bounded` drives a `batch\nversion --json\n` +stdin payload and asserts the recursive `batch` line emits an error JSON +record rather than silently exiting. The pre-existing batch tests +(`test_batch_runs_version_command_and_writes_eor`, +`test_batch_terminates_on_empty_line`, +`test_batch_continues_after_error_line`) still pass against the new +implementation, confirming the wire-level contract (one EOR per line, +clean JSON error blocks) is preserved. + ### Client.Python-025 | Field | Value | @@ -972,7 +1037,7 @@ recursive invocation is either rejected or correctly bounded. | Severity | Low | | Category | Testing coverage | | Location | `clients/python/tests/test_cli.py`, `clients/python/src/zb_mom_ww_mxgateway/{client.py,session.py}`, `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py` | -| Status | Open | +| Status | Resolved | **Description:** Commits `6add4b4` and `828e3e6` added five new SDK methods (`Session.read_bulk`, `Session.write_bulk`, `Session.write2_bulk`, @@ -1020,6 +1085,32 @@ applied to the renamed bench). At minimum, add a request-shape test for `write_secured_bulk` since the secured family is the highest-risk parity surface. +**Resolution:** 2026-05-24 — Added behavioural test coverage for the five +new bulk SDK methods, `stream_alarms`, and the new CLI subcommand bodies +in `tests/test_review_findings_022_to_026.py`. Request-shape tests +(`test_session_read_bulk_sends_expected_request_shape`, +`test_session_write_bulk_sends_expected_request_shape`, +`test_session_write2_bulk_sends_expected_request_shape`, +`test_session_write_secured_bulk_sends_expected_request_shape`, +`test_session_write_secured2_bulk_sends_expected_request_shape`) drive +each `Session.*_bulk` method against a fake `Invoke` stub and assert +the captured `MxCommand`'s `kind`, sub-message, `server_handle`, and +per-entry fields (including `current_user_id` / `verifier_user_id` +on the secured family — the highest-risk parity surface the finding +calls out). `test_stream_alarms_yields_feed_messages_and_cancels_on_close` +covers the `GatewayClient.stream_alarms` happy path including the +`_canceling_alarm_feed_iterator` cancel-on-close contract and the +authorization metadata header. CLI happy-path tests +(`test_cli_read_bulk_happy_path`, `test_cli_write_bulk_happy_path`, +`test_cli_stream_alarms_happy_path`, `test_cli_acknowledge_alarm_happy_path`) +each drive their subcommand through `CliRunner` against a fake stub +injected via a monkeypatched `GatewayClient.connect` and assert the +emitted JSON shape and that the captured RPC request carries the +expected fields. The four CLI happy-path tests passed even before any +production fix (the implementations were correct; the finding is a +coverage gap), but they now exist as regression guards against future +drift. No source change — pure coverage finding. + ### Client.Python-026 | Field | Value | @@ -1027,7 +1118,7 @@ parity surface. | Severity | Low | | Category | Correctness & logic bugs | | Location | `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py:674-738` | -| Status | Open | +| Status | Resolved | **Description:** Two minor quality issues in the new `_bench_read_bulk` body (commit `6add4b4`): @@ -1060,3 +1151,23 @@ module-level `logger = logging.getLogger(__name__)`. No behavioural change in the happy path; failure path becomes diagnosable. No new test required for the import hoist; the logger change is exercised by the existing bench smoke test once `caplog` is added to the test signature. + +**Resolution:** 2026-05-24 — Hoisted `import time` to the module-level +import block in `clients/python/src/zb_mom_ww_mxgateway_cli/commands.py` +alongside the existing standard-library imports; the function-local +`import time` line at the top of `_bench_read_bulk` is gone. Added a +module-level `logger = logging.getLogger(__name__)` and rewrote the two +`finally` cleanup blocks to bind the swallowed exception and log it at +`WARNING` level — `unsubscribe_bulk` failures now emit +`"bench-read-bulk: unsubscribe_bulk cleanup failed: %s"` and the +`session.close()` failure path emits the equivalent — so a future +regression in the cleanup path is diagnosable at the next benchmark run +rather than silently corrupting subscription bookkeeping. Regression +tests in `tests/test_review_findings_022_to_026.py`: +`test_commands_module_imports_time_at_module_scope` uses +`inspect.getsource(_bench_read_bulk)` to assert no function-local +`import time` line, and asserts the module exposes `time` at module +scope; `test_commands_module_bench_read_bulk_does_not_use_bare_except_pass` +greps the function source for the `except Exception:\n pass` pattern +and rejects it. Both tests failed against the pre-fix source and pass +against the fix.