"""Regression tests for Client.Python-022..026. Each test corresponds to a finding from the latest re-review. Tests are TDD-first — they failed against the pre-fix source and pass against the fixed source. """ from __future__ import annotations import json import re import time as _time_module_ref from pathlib import Path from typing import Any import pytest from click.testing import CliRunner from zb_mom_ww_mxgateway import ClientOptions, GatewayClient from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb from zb_mom_ww_mxgateway_cli import commands as cli_commands from zb_mom_ww_mxgateway_cli.commands import _use_plaintext, main _BATCH_EOR = "__MXGW_BATCH_EOR__" # --------------------------------------------------------------------------- # Client.Python-022 — README CLI examples must parse against the implementation. # --------------------------------------------------------------------------- def _readme_path() -> Path: return Path(__file__).resolve().parent.parent / "README.md" def _extract_mxgw_py_examples() -> list[list[str]]: """Return the README's ``mxgw-py ...`` example lines as click arg lists. Replaces angle-bracket placeholders (````) with safe stub values and leaves real flag names untouched. The returned arg lists drop the ``mxgw-py`` prefix. """ text = _readme_path().read_text(encoding="utf-8") args: list[list[str]] = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line.startswith("mxgw-py "): continue # Strip the leading "mxgw-py " token. body = line[len("mxgw-py ") :] # Replace common placeholders so click does not error on the placeholder. body = body.replace("", "session-1") # Backtick-quoted hostnames in the TLS example are not represented # in CLI; safe to leave as-is. tokens = _split_cli_tokens(body) # Keep only examples that exercise a real subcommand. Skip TLS # multi-flag example (we only need the README CLI examples added in # commits 8738735 — stream-alarms / acknowledge-alarm). args.append(tokens) return args def _split_cli_tokens(body: str) -> list[str]: """Split a CLI body into argv tokens, honouring double-quoted strings.""" tokens: list[str] = [] pattern = re.compile(r'"([^"]*)"|(\S+)') for match in pattern.finditer(body): quoted, plain = match.group(1), match.group(2) tokens.append(quoted if quoted is not None else plain) return tokens def test_readme_alarm_examples_parse_against_cli() -> None: """README `stream-alarms` / `acknowledge-alarm` examples must parse without triggering Click's ``no such option`` error. Drives every README ``mxgw-py`` example through Click's ``--help`` style parser by re-invoking the documented argv with a trailing ``--help`` flag so only the parser runs (no RPC is attempted). If a documented flag does not exist on the subcommand, Click prints ``no such option: --`` and exits 2 — that is the regression we want to catch. """ runner = CliRunner() examples = _extract_mxgw_py_examples() assert any( "stream-alarms" in args for args in examples ), "README must include a stream-alarms example." assert any( "acknowledge-alarm" in args for args in examples ), "README must include an acknowledge-alarm example." for argv in examples: # Strip "--json" (already a real flag) and any value-bearing flag that # requires a host/file/value, then append --help so we exercise the # parser only. # We just append --help — Click parses all options up to --help and # then prints help; an unknown option still errors out first. result = runner.invoke(main, [*argv, "--help"]) # Either help text printed (exit 0) or some other parser issue (exit 2); # we only want to assert NO "no such option" error. assert "no such option" not in result.output.lower(), ( f"README example failed Click parsing: argv={argv!r}\n" f"output={result.output!r}" ) # --------------------------------------------------------------------------- # Client.Python-023 — REGRESSION of Client.Python-013. _use_plaintext must # not silently auto-downgrade on localhost / 127.0.0.1. # --------------------------------------------------------------------------- def test_use_plaintext_does_not_auto_downgrade_for_localhost_endpoint() -> None: """A bare ``localhost:...`` endpoint with no flags must default to TLS.""" assert _use_plaintext({ "endpoint": "localhost:5001", "plaintext": False, "use_tls": False, }) is False def test_use_plaintext_does_not_auto_downgrade_for_loopback_ipv4_endpoint() -> None: """A bare ``127.0.0.1:...`` endpoint with no flags must default to TLS.""" assert _use_plaintext({ "endpoint": "127.0.0.1:5001", "plaintext": False, "use_tls": False, }) is False def test_use_plaintext_requires_explicit_plaintext_flag() -> None: """``--plaintext`` is the only way to opt in.""" assert _use_plaintext({ "endpoint": "localhost:5001", "plaintext": True, "use_tls": False, }) is True def test_use_plaintext_tls_flag_explicitly_disables_plaintext() -> None: """``--tls`` is accepted as an explicit affirmation of the default.""" assert _use_plaintext({ "endpoint": "localhost:5001", "plaintext": False, "use_tls": True, }) is False def test_use_plaintext_rejects_plaintext_and_tls_combined() -> None: """``--plaintext`` and ``--tls`` together must be rejected as ambiguous.""" import click as _click with pytest.raises(_click.UsageError): _use_plaintext({ "endpoint": "localhost:5001", "plaintext": True, "use_tls": True, }) def test_cli_localhost_endpoint_with_no_flags_uses_tls_channel(monkeypatch) -> None: """End-to-end CLI: against ``localhost:...`` with no flags, the resolved ``ClientOptions.plaintext`` flowing into ``GatewayClient.connect`` must be ``False`` (TLS), so the API key bearer cannot leak over plaintext. """ captured: dict[str, Any] = {} class _FakeStub: def __init__(self) -> None: pass async def OpenSession(self, request: Any, *, metadata: tuple[Any, ...]) -> Any: captured["metadata"] = metadata return pb.OpenSessionReply( session_id="session-1", protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ) real_connect = GatewayClient.connect @classmethod async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient: captured["options"] = options return await real_connect(options, stub=_FakeStub()) monkeypatch.setattr(GatewayClient, "connect", _spy_connect) runner = CliRunner() result = runner.invoke( main, [ "open-session", "--endpoint", "localhost:5000", "--api-key", "mxgw_test_secret", "--json", ], ) assert result.exit_code == 0, result.output assert "options" in captured assert captured["options"].plaintext is False, ( "localhost endpoint without --plaintext must NOT auto-downgrade to plaintext" ) # --------------------------------------------------------------------------- # Client.Python-024 — `batch` must not use CliRunner from production code, # and a recursive `batch` line must not silently re-enter. # --------------------------------------------------------------------------- def test_batch_command_does_not_use_clirunner_in_production() -> None: """`commands.py` must not import or instantiate the test-only CliRunner helper. Docstring references explaining what the module deliberately avoids are permitted; what is forbidden is an actual ``import`` of ``click.testing`` or an actual ``CliRunner()`` instantiation in executable code. """ source = Path(cli_commands.__file__).read_text(encoding="utf-8") assert "from click.testing" not in source, ( "click.testing is a test-only helper and must not be used by production code" ) assert "import click.testing" not in source, ( "click.testing is a test-only helper and must not be used by production code" ) # `CliRunner()` (instantiation) must not appear in production code. assert "CliRunner(" not in source, ( "CliRunner() must not be instantiated in production code" ) def test_batch_recursive_batch_line_is_bounded() -> None: """A `batch` line nested inside `batch` stdin must not be silently spawned. The pre-fix implementation re-invoked the test runner with empty stdin, so `batch` inside `batch` exited cleanly with no error. The fix either rejects the nested invocation or surfaces it as an error block so the behaviour is auditable. """ runner = CliRunner() result = runner.invoke( main, ["batch"], input="batch\nversion --json\n", ) # Outer batch must still exit 0 and process both lines. assert result.exit_code == 0 assert result.output.count(_BATCH_EOR) == 2 blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block] # The first block — the recursive `batch` line — must surface an error # JSON. (Either an explicit rejection, or some non-empty error block — # NOT a silently empty block.) first_block = blocks[0].strip() assert first_block, "recursive batch line must not be silently swallowed" payload = json.loads(first_block.splitlines()[-1]) assert "error" in payload, ( f"recursive batch line should surface an error: got {payload!r}" ) # --------------------------------------------------------------------------- # Client.Python-025 — Behavioural tests for new bulk SDK methods, # stream_alarms, and the new CLI subcommands. # --------------------------------------------------------------------------- class _AlarmFakeStream: def __init__(self, messages: list[pb.AlarmFeedMessage]) -> None: self._messages = list(messages) self.cancelled = False def __aiter__(self) -> "_AlarmFakeStream": return self async def __anext__(self) -> pb.AlarmFeedMessage: if not self._messages: raise StopAsyncIteration return self._messages.pop(0) def cancel(self) -> None: self.cancelled = True class _BulkFakeUnary: def __init__(self, replies: list[Any]) -> None: self.replies = replies self.requests: list[Any] = [] self.metadata: tuple[tuple[str, str], ...] | None = None async def __call__(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any: self.requests.append(request) self.metadata = metadata return self.replies.pop(0) class _BulkFakeStub: def __init__(self) -> None: self.open_session = _BulkFakeUnary( [ pb.OpenSessionReply( session_id="session-1", protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ), ], ) self.invoke = _BulkFakeUnary([]) self.OpenSession = self.open_session self.Invoke = self.invoke self.stream_alarms_metadata: tuple[tuple[str, str], ...] | None = None self._alarm_stream = _AlarmFakeStream([]) def set_invoke_replies(self, replies: list[Any]) -> None: self.invoke.replies = replies def set_alarm_stream(self, stream: _AlarmFakeStream) -> None: self._alarm_stream = stream def StreamAlarms(self, request: Any, *, metadata: tuple[tuple[str, str], ...]) -> Any: self.stream_alarms_request = request self.stream_alarms_metadata = metadata return self._alarm_stream @pytest.mark.asyncio async def test_session_read_bulk_sends_expected_request_shape() -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_READ_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), read_bulk=pb.BulkReadReply( results=[ pb.BulkReadResult( tag_address="Tank01.Level", was_successful=True, ), ], ), ), ], ) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) session = await client.open_session() results = await session.read_bulk(12, ["Tank01.Level"], timeout_ms=1500) assert len(results) == 1 assert results[0].tag_address == "Tank01.Level" request = stub.invoke.requests[0] assert request.command.kind == pb.MX_COMMAND_KIND_READ_BULK assert request.command.read_bulk.server_handle == 12 assert list(request.command.read_bulk.tag_addresses) == ["Tank01.Level"] assert request.command.read_bulk.timeout_ms == 1500 @pytest.mark.asyncio async def test_session_write_bulk_sends_expected_request_shape() -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_WRITE_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), write_bulk=pb.BulkWriteReply( results=[pb.BulkWriteResult(was_successful=True)], ), ), ], ) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) session = await client.open_session() from zb_mom_ww_mxgateway.values import to_mx_value entries = [ pb.WriteBulkEntry(item_handle=34, user_id=99, value=to_mx_value(123)), ] results = await session.write_bulk(12, entries) assert results[0].was_successful is True cmd = stub.invoke.requests[0].command assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK assert cmd.write_bulk.server_handle == 12 assert cmd.write_bulk.entries[0].item_handle == 34 assert cmd.write_bulk.entries[0].user_id == 99 @pytest.mark.asyncio async def test_session_write2_bulk_sends_expected_request_shape() -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_WRITE2_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), write2_bulk=pb.BulkWriteReply( results=[pb.BulkWriteResult(was_successful=True)], ), ), ], ) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) session = await client.open_session() from zb_mom_ww_mxgateway.values import to_mx_value entries = [ pb.Write2BulkEntry( item_handle=34, user_id=99, value=to_mx_value(123), timestamp_value=to_mx_value(1.5), ), ] results = await session.write2_bulk(12, entries) assert results[0].was_successful is True cmd = stub.invoke.requests[0].command assert cmd.kind == pb.MX_COMMAND_KIND_WRITE2_BULK assert cmd.write2_bulk.server_handle == 12 assert cmd.write2_bulk.entries[0].item_handle == 34 @pytest.mark.asyncio async def test_session_write_secured_bulk_sends_expected_request_shape() -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), write_secured_bulk=pb.BulkWriteReply( results=[pb.BulkWriteResult(was_successful=True)], ), ), ], ) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) session = await client.open_session() from zb_mom_ww_mxgateway.values import to_mx_value entries = [ pb.WriteSecuredBulkEntry( item_handle=34, current_user_id=42, verifier_user_id=43, value=to_mx_value("secret"), ), ] results = await session.write_secured_bulk(12, entries) assert results[0].was_successful is True cmd = stub.invoke.requests[0].command assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED_BULK assert cmd.write_secured_bulk.server_handle == 12 assert cmd.write_secured_bulk.entries[0].current_user_id == 42 assert cmd.write_secured_bulk.entries[0].verifier_user_id == 43 @pytest.mark.asyncio async def test_session_write_secured2_bulk_sends_expected_request_shape() -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), write_secured2_bulk=pb.BulkWriteReply( results=[pb.BulkWriteResult(was_successful=True)], ), ), ], ) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) session = await client.open_session() from zb_mom_ww_mxgateway.values import to_mx_value entries = [ pb.WriteSecured2BulkEntry( item_handle=34, current_user_id=42, verifier_user_id=43, value=to_mx_value("secret"), timestamp_value=to_mx_value(1.5), ), ] results = await session.write_secured2_bulk(12, entries) assert results[0].was_successful is True cmd = stub.invoke.requests[0].command assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK assert cmd.write_secured2_bulk.entries[0].current_user_id == 42 @pytest.mark.asyncio async def test_stream_alarms_yields_feed_messages_and_cancels_on_close() -> None: transitions = [ pb.AlarmFeedMessage( transition=pb.OnAlarmTransitionEvent( alarm_full_reference="Tank01.Level.HiHi", transition_kind=pb.ALARM_TRANSITION_KIND_RAISE, ), ), ] stream = _AlarmFakeStream(transitions) stub = _BulkFakeStub() stub.set_alarm_stream(stream) client = await GatewayClient.connect( ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), stub=stub, ) iterator = client.stream_alarms(pb.StreamAlarmsRequest(alarm_filter_prefix="Tank01.")) first = await anext(iterator) await iterator.aclose() assert first.transition.alarm_full_reference == "Tank01.Level.HiHi" assert stream.cancelled assert stub.stream_alarms_metadata == (("authorization", "Bearer mxgw_test_secret"),) assert stub.stream_alarms_request.alarm_filter_prefix == "Tank01." # ---- CLI happy-path coverage for the new subcommands ---- def _install_fake_connect(monkeypatch, stub: Any) -> dict[str, Any]: """Patch `GatewayClient.connect` so the CLI uses the supplied fake stub.""" captured: dict[str, Any] = {} real_connect = GatewayClient.connect @classmethod async def _spy_connect(cls, options: ClientOptions, **kwargs: Any) -> GatewayClient: captured["options"] = options return await real_connect(options, stub=stub) monkeypatch.setattr(GatewayClient, "connect", _spy_connect) return captured def test_cli_read_bulk_happy_path(monkeypatch) -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_READ_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), read_bulk=pb.BulkReadReply( results=[ pb.BulkReadResult( tag_address="Tank01.Level", was_successful=True, ), ], ), ), ], ) _install_fake_connect(monkeypatch, stub) runner = CliRunner() result = runner.invoke( main, [ "read-bulk", "--endpoint", "localhost:5000", "--plaintext", "--session-id", "session-1", "--server-handle", "12", "--items", "Tank01.Level", "--timeout-ms", "1500", "--json", ], ) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["results"][0]["tagAddress"] == "Tank01.Level" def test_cli_write_bulk_happy_path(monkeypatch) -> None: stub = _BulkFakeStub() stub.set_invoke_replies( [ pb.MxCommandReply( session_id="session-1", kind=pb.MX_COMMAND_KIND_WRITE_BULK, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), write_bulk=pb.BulkWriteReply( results=[pb.BulkWriteResult(was_successful=True)], ), ), ], ) _install_fake_connect(monkeypatch, stub) runner = CliRunner() result = runner.invoke( main, [ "write-bulk", "--endpoint", "localhost:5000", "--plaintext", "--session-id", "session-1", "--server-handle", "12", "--item-handles", "34", "--values", "123", "--type", "int32", "--json", ], ) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["results"][0]["wasSuccessful"] is True cmd = stub.invoke.requests[0].command assert cmd.kind == pb.MX_COMMAND_KIND_WRITE_BULK def test_cli_stream_alarms_happy_path(monkeypatch) -> None: transitions = [ pb.AlarmFeedMessage( transition=pb.OnAlarmTransitionEvent( alarm_full_reference="Tank01.Level.HiHi", transition_kind=pb.ALARM_TRANSITION_KIND_RAISE, ), ), ] stream = _AlarmFakeStream(transitions) stub = _BulkFakeStub() stub.set_alarm_stream(stream) _install_fake_connect(monkeypatch, stub) runner = CliRunner() result = runner.invoke( main, [ "stream-alarms", "--endpoint", "localhost:5000", "--plaintext", "--max-messages", "1", "--timeout", "5.0", "--filter-prefix", "Tank01.", "--json", ], ) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["messages"][0]["transition"]["alarmFullReference"] == "Tank01.Level.HiHi" def test_cli_acknowledge_alarm_happy_path(monkeypatch) -> None: stub = _BulkFakeStub() stub.acknowledge_alarm = _BulkFakeUnary( [ pb.AcknowledgeAlarmReply( correlation_id="corr-1", protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK), ), ], ) stub.AcknowledgeAlarm = stub.acknowledge_alarm _install_fake_connect(monkeypatch, stub) runner = CliRunner() result = runner.invoke( main, [ "acknowledge-alarm", "--endpoint", "localhost:5000", "--plaintext", "--reference", "Tank01.Level.HiHi", "--comment", "investigating", "--operator", "alice", "--json", ], ) assert result.exit_code == 0, result.output captured_request = stub.acknowledge_alarm.requests[0] assert captured_request.alarm_full_reference == "Tank01.Level.HiHi" assert captured_request.comment == "investigating" assert captured_request.operator_user == "alice" # --------------------------------------------------------------------------- # Client.Python-026 — `import time` at module scope; tighter cleanup excepts. # --------------------------------------------------------------------------- def test_commands_module_imports_time_at_module_scope() -> None: """`time` must be imported at module scope, not inside `_bench_read_bulk`. `inspect.getsource(_bench_read_bulk)` must not contain a function-local ``import time`` statement. """ import inspect source = inspect.getsource(cli_commands._bench_read_bulk) # The function body must NOT contain a function-local `import time` line. for line in source.splitlines(): stripped = line.strip() assert stripped != "import time", ( f"_bench_read_bulk must not have function-local `import time`: {line!r}" ) # And the module-level `time` attribute must be present. assert hasattr(cli_commands, "time"), ( "`time` must be imported at module scope on commands.py" ) assert cli_commands.time is _time_module_ref def test_commands_module_bench_read_bulk_does_not_use_bare_except_pass() -> None: """The two `except Exception: pass` cleanup blocks in `_bench_read_bulk` must be removed in favour of either logging or a narrower exception class. """ import inspect source = inspect.getsource(cli_commands._bench_read_bulk) # Reject the bare `except Exception:` followed by `pass` pattern in # `_bench_read_bulk`. We tolerate `except Exception as :` because the # fix logs the exception. pattern = re.compile(r"except\s+Exception\s*:\s*\n\s*pass\b") assert not pattern.search(source), ( "_bench_read_bulk cleanup blocks must log or narrow the except clause" )