"""Regression tests for Client.Python-015 and Client.Python-016. Client.Python-015 — coverage for the ``bench-read-bulk`` CLI body and the ``_percentile`` / ``_percentile_summary`` helpers. The percentile algorithm must remain byte-for-byte equivalent across the five client languages (.NET, Go, Rust, Java, Python) so cross-language stats are directly comparable; the unit tests here lock that contract down with known inputs. Client.Python-016 — coverage for the two remaining untested CLI helpers after Client.Python-013 removed the localhost auto-plaintext branch from ``_use_plaintext``: the ``MAX_AGGREGATE_EVENTS`` guard inside ``_collect_events`` and the ``_api_key_from_env`` env-var helper. """ from __future__ import annotations import json from typing import Any import pytest from click.testing import CliRunner from mxgateway import ClientOptions, GatewayClient from mxgateway.generated import mxaccess_gateway_pb2 as pb from mxgateway_cli import commands from mxgateway_cli.commands import ( MAX_AGGREGATE_EVENTS, _api_key_from_env, _percentile, _percentile_summary, ) # --- Client.Python-015: _percentile / _percentile_summary ------------------ # # The algorithm is "linear interpolation between the two closest ranks", with # the rank computed as ``q * (n - 1)``. This matches the .NET, Go, Rust and # Java drivers; any divergence corrupts cross-language comparisons. def test_percentile_empty_sample_returns_zero() -> None: assert _percentile([], 0.50) == 0.0 assert _percentile([], 0.95) == 0.0 assert _percentile([], 0.99) == 0.0 def test_percentile_single_element_returns_that_element() -> None: assert _percentile([42.0], 0.0) == 42.0 assert _percentile([42.0], 0.50) == 42.0 assert _percentile([42.0], 0.95) == 42.0 assert _percentile([42.0], 1.0) == 42.0 def test_percentile_exact_rank_returns_sample_value() -> None: # n = 5 → rank for p50 = 0.5 * 4 = 2 → exact index 2 (value 30.0). sample = [10.0, 20.0, 30.0, 40.0, 50.0] assert _percentile(sample, 0.50) == 30.0 assert _percentile(sample, 0.0) == 10.0 assert _percentile(sample, 1.0) == 50.0 def test_percentile_interpolates_between_ranks() -> None: # n = 5 → rank for p95 = 0.95 * 4 = 3.8 → between index 3 (40.0) and # index 4 (50.0) with fraction 0.8 → 40.0 + (50.0 - 40.0) * 0.8 = 48.0. sample = [10.0, 20.0, 30.0, 40.0, 50.0] assert _percentile(sample, 0.95) == pytest.approx(48.0) # p99 = 0.99 * 4 = 3.96 → 40.0 + 10.0 * 0.96 = 49.6. assert _percentile(sample, 0.99) == pytest.approx(49.6) def test_percentile_summary_empty_sample_zeros_all_fields() -> None: assert _percentile_summary([]) == { "p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0, } def test_percentile_summary_known_sample_matches_cross_language_contract() -> None: # The same five-element sample as the percentile interpolation test; the # summary must be byte-for-byte the values the .NET / Go / Rust / Java # drivers produce for the same input (linear interpolation, q * (n-1)). sample = [10.0, 20.0, 30.0, 40.0, 50.0] summary = _percentile_summary(sample) assert summary == { "p50": 30.0, "p95": 48.0, "p99": 49.6, "max": 50.0, "mean": 30.0, } def test_percentile_summary_rounds_to_three_decimal_places() -> None: # 1, 2, 3 → p95 = 0.95 * 2 = 1.9 → 2 + (3 - 2) * 0.9 = 2.9; round(2.9, 3) # is 2.9. Use a sample that exercises the round() call non-trivially. sample = [1.0, 2.0, 3.0001, 4.0001] summary = _percentile_summary(sample) # mean = (1 + 2 + 3.0001 + 4.0001) / 4 = 2.50005 → rounded to 2.5 assert summary["mean"] == 2.5 # max round to 3dp = 4.0 assert summary["max"] == 4.0 # --- Client.Python-015: bench-read-bulk CLI smoke test --------------------- class _BenchFakeUnary: """A fake unary RPC that pops a reply per call (cycling on exhaustion).""" def __init__(self, replies_factory: Any) -> None: self._factory = replies_factory self.requests: list[Any] = [] async def __call__( self, request: Any, *, metadata: tuple[tuple[str, str], ...], ) -> Any: self.requests.append(request) return self._factory(request) def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply: return pb.MxCommandReply( session_id="session-bench", kind=kind, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), **fields, ) class _BenchStub: """Fake gateway stub that handles OpenSession + Invoke for bench-read-bulk.""" def __init__(self, tags: list[str]) -> None: self._tags = tags async def open_session( request: Any, *, metadata: tuple[tuple[str, str], ...], ) -> Any: return pb.OpenSessionReply( session_id="session-bench", protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ) async def close_session( request: Any, *, metadata: tuple[tuple[str, str], ...], ) -> Any: return pb.CloseSessionReply( session_id=request.session_id, final_state=pb.SESSION_STATE_CLOSED, protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ) def _reply_for(request: Any) -> Any: kind = request.command.kind if kind == pb.MX_COMMAND_KIND_REGISTER: return _ok_reply( kind, register=pb.RegisterReply(server_handle=7), ) if kind == pb.MX_COMMAND_KIND_SUBSCRIBE_BULK: results = [ pb.SubscribeResult( server_handle=7, tag_address=tag, item_handle=100 + i, was_successful=True, ) for i, tag in enumerate(self._tags) ] return _ok_reply( kind, subscribe_bulk=pb.BulkSubscribeReply(results=results), ) if kind == pb.MX_COMMAND_KIND_UNSUBSCRIBE_BULK: results = [ pb.SubscribeResult( server_handle=7, item_handle=100 + i, was_successful=True, ) for i in range(len(self._tags)) ] return _ok_reply( kind, unsubscribe_bulk=pb.BulkSubscribeReply(results=results), ) if kind == pb.MX_COMMAND_KIND_READ_BULK: results = [ pb.BulkReadResult( server_handle=7, tag_address=tag, item_handle=100 + i, was_successful=True, was_cached=True, ) for i, tag in enumerate(self._tags) ] return _ok_reply( kind, read_bulk=pb.BulkReadReply(results=results), ) raise AssertionError(f"unexpected MxCommand kind in bench test: {kind}") self.OpenSession = open_session self.CloseSession = close_session self.Invoke = _BenchFakeUnary(_reply_for) def test_bench_read_bulk_emits_cross_language_schema( monkeypatch: pytest.MonkeyPatch, ) -> None: """Drive bench-read-bulk with duration=0 / warmup=0 and assert the schema. A drift in any of these field names (callsPerSecond, cachedReadResults, latencyMs.p50, …) would break the cross-language scripts/bench-read-bulk.ps1 aggregation silently. """ bulk_size = 3 tags = [f"TestMachine_{i:03d}.TestChangingInt" for i in range(1, 1 + bulk_size)] async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient: return await GatewayClient.connect( ClientOptions(endpoint=kwargs["endpoint"], plaintext=True), stub=_BenchStub(tags), ) monkeypatch.setattr(commands, "_connect", _fake_connect) runner = CliRunner() result = runner.invoke( commands.main, [ "bench-read-bulk", "--endpoint", "localhost:5000", "--client-name", "pytest-bench", "--duration-seconds", "0", "--warmup-seconds", "0", "--bulk-size", str(bulk_size), "--tag-start", "1", "--json", ], ) assert result.exit_code == 0, result.output payload = json.loads(result.output) # Locked cross-language schema (matches .NET / Go / Rust / Java drivers). expected_top_level = { "language", "command", "endpoint", "clientName", "bulkSize", "durationSeconds", "warmupSeconds", "durationMs", "tags", "totalCalls", "successfulCalls", "failedCalls", "totalReadResults", "cachedReadResults", "callsPerSecond", "latencyMs", } assert set(payload.keys()) == expected_top_level assert payload["language"] == "python" assert payload["command"] == "bench-read-bulk" assert payload["endpoint"] == "localhost:5000" assert payload["clientName"] == "pytest-bench" assert payload["bulkSize"] == bulk_size assert payload["durationSeconds"] == 0 assert payload["warmupSeconds"] == 0 assert payload["tags"] == tags # latencyMs sub-shape is the percentile-summary contract. assert set(payload["latencyMs"].keys()) == {"p50", "p95", "p99", "max", "mean"} for key in ("p50", "p95", "p99", "max", "mean"): assert isinstance(payload["latencyMs"][key], (int, float)) # --- Client.Python-016: MAX_AGGREGATE_EVENTS guard ------------------------- def test_collect_events_rejects_max_events_above_aggregate_cap( monkeypatch: pytest.MonkeyPatch, ) -> None: """``--max-events`` greater than ``MAX_AGGREGATE_EVENTS`` exits non-zero with the documented error message. The guard lives inside ``_collect_events`` (after a session is opened), so the test routes the CLI through stubbed ``_connect`` / ``_session`` fakes and asserts the guard fires before any event is pulled. """ class _EventStreamShouldNotBeUsed: def __aiter__(self) -> "_EventStreamShouldNotBeUsed": return self async def __anext__(self) -> pb.MxEvent: raise AssertionError( "MAX_AGGREGATE_EVENTS guard must trip before any event is pulled", ) class _FakeSession: def __init__(self) -> None: self.session_id = "session-1" def stream_events( self, *, after_worker_sequence: int = 0 ) -> _EventStreamShouldNotBeUsed: return _EventStreamShouldNotBeUsed() class _FakeClient: async def __aenter__(self) -> "_FakeClient": return self async def __aexit__(self, *exc_info: object) -> None: return None async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient: return _FakeClient() def _fake_session(client: Any, session_id: str) -> _FakeSession: return _FakeSession() monkeypatch.setattr(commands, "_connect", _fake_connect) monkeypatch.setattr(commands, "_session", _fake_session) runner = CliRunner() result = runner.invoke( commands.main, [ "stream-events", "--endpoint", "localhost:5000", "--session-id", "session-1", "--max-events", str(MAX_AGGREGATE_EVENTS + 1), "--plaintext", "--json", ], ) assert result.exit_code != 0 assert f"less than or equal to {MAX_AGGREGATE_EVENTS}" in result.output assert "--max-events" in result.output def test_collect_events_accepts_max_events_at_aggregate_cap_boundary( monkeypatch: pytest.MonkeyPatch, ) -> None: """``--max-events`` equal to ``MAX_AGGREGATE_EVENTS`` must not trip the guard.""" class _EmptyEventStream: def __aiter__(self) -> "_EmptyEventStream": return self async def __anext__(self) -> pb.MxEvent: raise StopAsyncIteration class _FakeSession: def __init__(self) -> None: self.client = None # type: ignore[assignment] self.session_id = "session-1" def stream_events(self, *, after_worker_sequence: int = 0) -> _EmptyEventStream: return _EmptyEventStream() class _FakeClient: async def __aenter__(self) -> "_FakeClient": return self async def __aexit__(self, *exc_info: object) -> None: return None async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient: return _FakeClient() def _fake_session(client: Any, session_id: str) -> _FakeSession: return _FakeSession() monkeypatch.setattr(commands, "_connect", _fake_connect) monkeypatch.setattr(commands, "_session", _fake_session) runner = CliRunner() result = runner.invoke( commands.main, [ "stream-events", "--endpoint", "localhost:5000", "--session-id", "session-1", "--max-events", str(MAX_AGGREGATE_EVENTS), "--timeout", "0.01", "--plaintext", "--json", ], ) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload == {"events": []} # --- Client.Python-016: _api_key_from_env ---------------------------------- def test_api_key_from_env_resolves_value_when_variable_is_set( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setenv("MXGATEWAY_TEST_API_KEY", "mxgw_envtest_secret") assert _api_key_from_env("MXGATEWAY_TEST_API_KEY") == "mxgw_envtest_secret" def test_api_key_from_env_returns_none_when_variable_is_unset( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.delenv("MXGATEWAY_TEST_API_KEY_NOT_SET", raising=False) assert _api_key_from_env("MXGATEWAY_TEST_API_KEY_NOT_SET") is None def test_api_key_from_env_returns_none_when_name_is_none() -> None: assert _api_key_from_env(None) is None def test_api_key_from_env_returns_none_when_name_is_empty_string() -> None: # The implementation guards on ``if not name`` so empty string is treated # the same as ``None`` — no env lookup is attempted. assert _api_key_from_env("") is None