"""Regression tests for Client.Python-009: untested public paths. Covers `Session.write2`/`add_item2` request construction, the bulk-size limit guard, the ``None``-argument ``TypeError`` guards, the TLS ``ca_file`` read path in `create_channel`, the generic `map_rpc_error` fallthrough, and a happy-path CLI command body driven by a fake stub. """ from __future__ import annotations import json from datetime import datetime, timezone from typing import Any import grpc import pytest from click.testing import CliRunner from mxgateway import ClientOptions, GatewayClient from mxgateway.errors import MxGatewayTransportError, map_rpc_error from mxgateway.generated import mxaccess_gateway_pb2 as pb from mxgateway.options import create_channel from mxgateway.session import MAX_BULK_ITEMS, Session class _FakeUnary: def __init__(self, replies: list[Any]) -> None: self.replies = list(replies) self.requests: list[Any] = [] self.metadata: tuple[tuple[str, str], ...] | None = None async def __call__( self, request: Any, *, metadata: tuple[tuple[str, str], ...], ) -> Any: self.requests.append(request) self.metadata = metadata return self.replies.pop(0) class _FakeGatewayStub: def __init__(self) -> None: self.open_session = _FakeUnary( [ pb.OpenSessionReply( session_id="session-1", protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ), ], ) self.invoke = _FakeUnary([]) self.OpenSession = self.open_session self.Invoke = self.invoke def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply: return pb.MxCommandReply( session_id="session-1", kind=kind, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), **fields, ) # --- write2 / add_item2 request construction ------------------------------- @pytest.mark.asyncio async def test_add_item2_sends_item_context_and_returns_handle() -> None: stub = _FakeGatewayStub() stub.invoke.replies = [ _ok_reply(pb.MX_COMMAND_KIND_ADD_ITEM2, add_item2=pb.AddItem2Reply(item_handle=77)), ] client = await GatewayClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) session = await client.open_session() item_handle = await session.add_item2(12, "Object.Attribute", "ctx-A") assert item_handle == 77 command = stub.invoke.requests[0].command assert command.kind == pb.MX_COMMAND_KIND_ADD_ITEM2 assert command.add_item2.server_handle == 12 assert command.add_item2.item_definition == "Object.Attribute" assert command.add_item2.item_context == "ctx-A" @pytest.mark.asyncio async def test_write2_sends_value_and_timestamp_value() -> None: stub = _FakeGatewayStub() stub.invoke.replies = [_ok_reply(pb.MX_COMMAND_KIND_WRITE2)] client = await GatewayClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) session = await client.open_session() when = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc) await session.write2(12, 34, 123, when, user_id=5) command = stub.invoke.requests[0].command assert command.kind == pb.MX_COMMAND_KIND_WRITE2 assert command.write2.server_handle == 12 assert command.write2.item_handle == 34 assert command.write2.user_id == 5 # The integer value is carried as the int32 field of the MxValue oneof. assert command.write2.value.WhichOneof("kind") == "int32_value" assert command.write2.value.int32_value == 123 # The timestamp value carries the datetime via the timestamp_value oneof. assert command.write2.timestamp_value.WhichOneof("kind") == "timestamp_value" assert command.write2.timestamp_value.timestamp_value.ToDatetime( tzinfo=timezone.utc, ) == when # --- bulk-size limit + None-argument guards -------------------------------- @pytest.mark.asyncio async def test_subscribe_bulk_rejects_oversized_request() -> None: stub = _FakeGatewayStub() client = await GatewayClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) session = await client.open_session() oversized = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS + 1)] with pytest.raises(ValueError, match=str(MAX_BULK_ITEMS)): await session.subscribe_bulk(12, oversized) # No RPC should have been issued for a rejected request. assert stub.invoke.requests == [] @pytest.mark.asyncio async def test_advise_item_bulk_rejects_none_argument() -> None: stub = _FakeGatewayStub() client = await GatewayClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) session = await client.open_session() with pytest.raises(TypeError, match="item_handles is required"): await session.advise_item_bulk(12, None) # type: ignore[arg-type] @pytest.mark.asyncio async def test_add_item_bulk_at_limit_is_allowed() -> None: stub = _FakeGatewayStub() stub.invoke.replies = [ _ok_reply( pb.MX_COMMAND_KIND_ADD_ITEM_BULK, add_item_bulk=pb.BulkSubscribeReply(results=[]), ), ] client = await GatewayClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) session = await client.open_session() at_limit = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS)] results = await session.add_item_bulk(12, at_limit) assert results == [] assert len(stub.invoke.requests) == 1 assert len(stub.invoke.requests[0].command.add_item_bulk.tag_addresses) == MAX_BULK_ITEMS # --- TLS ca_file read path ------------------------------------------------- @pytest.mark.asyncio async def test_create_channel_reads_ca_file(tmp_path: Any) -> None: ca_path = tmp_path / "ca.pem" ca_path.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n") channel = create_channel( ClientOptions( endpoint="mxgateway.example.local:5001", ca_file=str(ca_path), server_name_override="mxgateway.example.local", ), ) # A secure channel object is returned without raising; the ca_file was read. assert channel is not None await channel.close() def test_create_channel_missing_ca_file_raises() -> None: with pytest.raises(FileNotFoundError): create_channel( ClientOptions( endpoint="mxgateway.example.local:5001", ca_file="C:/does/not/exist/ca.pem", ), ) # --- map_rpc_error generic fallthrough ------------------------------------- class _FakeRpcError(grpc.RpcError): def __init__(self, code: grpc.StatusCode, details: str) -> None: self._code = code self._details = details def code(self) -> grpc.StatusCode: return self._code def details(self) -> str: return self._details def test_map_rpc_error_generic_branch_returns_transport_error() -> None: error = _FakeRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused") mapped = map_rpc_error("invoke", error) assert type(mapped) is MxGatewayTransportError assert "invoke failed: connection refused" in str(mapped) def test_map_rpc_error_handles_error_without_code() -> None: mapped = map_rpc_error("invoke", grpc.RpcError()) assert type(mapped) is MxGatewayTransportError assert "invoke failed:" in str(mapped) # --- happy-path CLI command body ------------------------------------------- def test_cli_register_happy_path_emits_server_handle(monkeypatch: Any) -> None: """Drive the `register` CLI command end to end against a fake stub.""" from mxgateway_cli import commands invoke = _FakeUnary( [ _ok_reply( pb.MX_COMMAND_KIND_REGISTER, register=pb.RegisterReply(server_handle=99), ), ], ) class _Stub: def __init__(self) -> None: self.Invoke = invoke async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient: return await GatewayClient.connect( ClientOptions(endpoint=kwargs["endpoint"], plaintext=True), stub=_Stub(), ) monkeypatch.setattr(commands, "_connect", _fake_connect) runner = CliRunner() result = runner.invoke( commands.main, [ "register", "--endpoint", "localhost:5000", "--session-id", "session-1", "--client-name", "pytest-client", "--json", ], ) assert result.exit_code == 0, result.output assert json.loads(result.output) == {"serverHandle": 99} assert invoke.requests[0].command.register.client_name == "pytest-client"