Files
mxaccessgw/clients/python/tests/test_coverage_gaps.py
Joseph Doherty e4fbbb541a Resolve Client.Python-003, -005, -009 code-review findings
Client.Python-003: stream_events_raw and query_active_alarms passed `timeout`
to the stub with no TypeError fallback, unlike _unary. Both now route through
a shared _open_stream helper that strips `timeout` on TypeError.

Client.Python-005: discover_hierarchy buffered the entire Galaxy hierarchy in
memory. Added GalaxyRepositoryClient.iter_hierarchy, a lazy async generator
yielding objects page-by-page; discover_hierarchy is now a thin wrapper that
preserves its list contract. README documents iter_hierarchy.

Client.Python-009: added regression coverage for previously untested paths —
write2/add_item2 request shape, the MAX_BULK_ITEMS boundary, the None-argument
TypeError guards, TLS ca_file reading, and the non-auth map_rpc_error fallthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:45:16 -04:00

285 lines
8.8 KiB
Python

"""Regression tests for Client.Python-009: untested public paths.
Covers `Session.write2`/`add_item2` request construction, the bulk-size limit
guard, the ``None``-argument ``TypeError`` guards, the TLS ``ca_file`` read
path in `create_channel`, the generic `map_rpc_error` fallthrough, and a
happy-path CLI command body driven by a fake stub.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
import grpc
import pytest
from click.testing import CliRunner
from mxgateway import ClientOptions, GatewayClient
from mxgateway.errors import MxGatewayTransportError, map_rpc_error
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.options import create_channel
from mxgateway.session import MAX_BULK_ITEMS, Session
class _FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class _FakeGatewayStub:
def __init__(self) -> None:
self.open_session = _FakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = _FakeUnary([])
self.OpenSession = self.open_session
self.Invoke = self.invoke
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
return pb.MxCommandReply(
session_id="session-1",
kind=kind,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
**fields,
)
# --- write2 / add_item2 request construction -------------------------------
@pytest.mark.asyncio
async def test_add_item2_sends_item_context_and_returns_handle() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [
_ok_reply(pb.MX_COMMAND_KIND_ADD_ITEM2, add_item2=pb.AddItem2Reply(item_handle=77)),
]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
item_handle = await session.add_item2(12, "Object.Attribute", "ctx-A")
assert item_handle == 77
command = stub.invoke.requests[0].command
assert command.kind == pb.MX_COMMAND_KIND_ADD_ITEM2
assert command.add_item2.server_handle == 12
assert command.add_item2.item_definition == "Object.Attribute"
assert command.add_item2.item_context == "ctx-A"
@pytest.mark.asyncio
async def test_write2_sends_value_and_timestamp_value() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [_ok_reply(pb.MX_COMMAND_KIND_WRITE2)]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
when = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
await session.write2(12, 34, 123, when, user_id=5)
command = stub.invoke.requests[0].command
assert command.kind == pb.MX_COMMAND_KIND_WRITE2
assert command.write2.server_handle == 12
assert command.write2.item_handle == 34
assert command.write2.user_id == 5
# The integer value is carried as the int32 field of the MxValue oneof.
assert command.write2.value.WhichOneof("kind") == "int32_value"
assert command.write2.value.int32_value == 123
# The timestamp value carries the datetime via the timestamp_value oneof.
assert command.write2.timestamp_value.WhichOneof("kind") == "timestamp_value"
assert command.write2.timestamp_value.timestamp_value.ToDatetime(
tzinfo=timezone.utc,
) == when
# --- bulk-size limit + None-argument guards --------------------------------
@pytest.mark.asyncio
async def test_subscribe_bulk_rejects_oversized_request() -> None:
stub = _FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
oversized = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS + 1)]
with pytest.raises(ValueError, match=str(MAX_BULK_ITEMS)):
await session.subscribe_bulk(12, oversized)
# No RPC should have been issued for a rejected request.
assert stub.invoke.requests == []
@pytest.mark.asyncio
async def test_advise_item_bulk_rejects_none_argument() -> None:
stub = _FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
with pytest.raises(TypeError, match="item_handles is required"):
await session.advise_item_bulk(12, None) # type: ignore[arg-type]
@pytest.mark.asyncio
async def test_add_item_bulk_at_limit_is_allowed() -> None:
stub = _FakeGatewayStub()
stub.invoke.replies = [
_ok_reply(
pb.MX_COMMAND_KIND_ADD_ITEM_BULK,
add_item_bulk=pb.BulkSubscribeReply(results=[]),
),
]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
session = await client.open_session()
at_limit = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS)]
results = await session.add_item_bulk(12, at_limit)
assert results == []
assert len(stub.invoke.requests) == 1
assert len(stub.invoke.requests[0].command.add_item_bulk.tag_addresses) == MAX_BULK_ITEMS
# --- TLS ca_file read path -------------------------------------------------
@pytest.mark.asyncio
async def test_create_channel_reads_ca_file(tmp_path: Any) -> None:
ca_path = tmp_path / "ca.pem"
ca_path.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
channel = create_channel(
ClientOptions(
endpoint="mxgateway.example.local:5001",
ca_file=str(ca_path),
server_name_override="mxgateway.example.local",
),
)
# A secure channel object is returned without raising; the ca_file was read.
assert channel is not None
await channel.close()
def test_create_channel_missing_ca_file_raises() -> None:
with pytest.raises(FileNotFoundError):
create_channel(
ClientOptions(
endpoint="mxgateway.example.local:5001",
ca_file="C:/does/not/exist/ca.pem",
),
)
# --- map_rpc_error generic fallthrough -------------------------------------
class _FakeRpcError(grpc.RpcError):
def __init__(self, code: grpc.StatusCode, details: str) -> None:
self._code = code
self._details = details
def code(self) -> grpc.StatusCode:
return self._code
def details(self) -> str:
return self._details
def test_map_rpc_error_generic_branch_returns_transport_error() -> None:
error = _FakeRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused")
mapped = map_rpc_error("invoke", error)
assert type(mapped) is MxGatewayTransportError
assert "invoke failed: connection refused" in str(mapped)
def test_map_rpc_error_handles_error_without_code() -> None:
mapped = map_rpc_error("invoke", grpc.RpcError())
assert type(mapped) is MxGatewayTransportError
assert "invoke failed:" in str(mapped)
# --- happy-path CLI command body -------------------------------------------
def test_cli_register_happy_path_emits_server_handle(monkeypatch: Any) -> None:
"""Drive the `register` CLI command end to end against a fake stub."""
from mxgateway_cli import commands
invoke = _FakeUnary(
[
_ok_reply(
pb.MX_COMMAND_KIND_REGISTER,
register=pb.RegisterReply(server_handle=99),
),
],
)
class _Stub:
def __init__(self) -> None:
self.Invoke = invoke
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
return await GatewayClient.connect(
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
stub=_Stub(),
)
monkeypatch.setattr(commands, "_connect", _fake_connect)
runner = CliRunner()
result = runner.invoke(
commands.main,
[
"register",
"--endpoint",
"localhost:5000",
"--session-id",
"session-1",
"--client-name",
"pytest-client",
"--json",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {"serverHandle": 99}
assert invoke.requests[0].command.register.client_name == "pytest-client"