a0203503a7
Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).
Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
GatewayGrpcScopeResolver so non-admin keys can use them; document
the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
in generated tonic code by reformatting the ReadBulkCommand proto
comment and scoping a #![allow(...)] to the generated submodules.
Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
make DisposeAsync race-safe against in-flight CloseAsync (-016);
add constraint-enforcement test coverage for the bulk-plan path
(-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
can distinguish graceful shutdown from a real STA-affinity
violation (-016); have the watchdog skip StaHung while
CurrentCommandCorrelationId is non-empty so a legitimate slow
ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
11 GatewaySession bulk methods (-013); replace the real TCP probe
in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
(-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
test and assert OnWriteComplete (-012); add live tests for
Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
CreateForTesting factory (-016); cover WorkerCancel and
unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
beforeStart() (-014); return a CancellingCompletableFuture that
actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
histograms with failed-call durations (-015); add coverage for
the five MalformedReply paths, the bulk-write helpers, the
Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
command family (-009).
Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
WorkerAlarmRpcDispatcher missing-session handling; drop the
duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
subscriptionExpression / ExecutingCommand arms; preserve
factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
source; switch the heartbeat-expires test to ManualTimeProvider;
add InvariantCulture to the remaining DateTimeOffset.Parse sites;
document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
IDisposable, class-level [Trait], single-source ZB default
connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
so absent env vars SKIP not pass; PascalCase rename of probe
[Fact]s; deterministic deadline test; new frame-protocol error
tests; ComputeTransitions diff-coverage; relocate dev-rig probes
to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
TreatWarningsAsErrors / analysers apply; document
DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
bulk-read handles in CLI; surface AcknowledgeAlarm transport
faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
runWriteBulkVariant; document the six new subcommands in
writeUsage; drain galaxy-watch events on limit; switch io.EOF
comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
option; regex-based credential redaction; Long.toUnsignedString
for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
_percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
_api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
stop hard-coding correlation IDs; resync RustClientDesign.md
with the current Session / Error surface and CLI subcommand set.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
455 lines
15 KiB
Python
455 lines
15 KiB
Python
"""Regression tests for Client.Python-015 and Client.Python-016.
|
|
|
|
Client.Python-015 — coverage for the ``bench-read-bulk`` CLI body and the
|
|
``_percentile`` / ``_percentile_summary`` helpers. The percentile algorithm
|
|
must remain byte-for-byte equivalent across the five client languages
|
|
(.NET, Go, Rust, Java, Python) so cross-language stats are directly
|
|
comparable; the unit tests here lock that contract down with known inputs.
|
|
|
|
Client.Python-016 — coverage for the two remaining untested CLI helpers
|
|
after Client.Python-013 removed the localhost auto-plaintext branch from
|
|
``_use_plaintext``: the ``MAX_AGGREGATE_EVENTS`` guard inside
|
|
``_collect_events`` and the ``_api_key_from_env`` env-var helper.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from click.testing import CliRunner
|
|
|
|
from mxgateway import ClientOptions, GatewayClient
|
|
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
|
from mxgateway_cli import commands
|
|
from mxgateway_cli.commands import (
|
|
MAX_AGGREGATE_EVENTS,
|
|
_api_key_from_env,
|
|
_percentile,
|
|
_percentile_summary,
|
|
)
|
|
|
|
|
|
# --- Client.Python-015: _percentile / _percentile_summary ------------------
|
|
#
|
|
# The algorithm is "linear interpolation between the two closest ranks", with
|
|
# the rank computed as ``q * (n - 1)``. This matches the .NET, Go, Rust and
|
|
# Java drivers; any divergence corrupts cross-language comparisons.
|
|
|
|
|
|
def test_percentile_empty_sample_returns_zero() -> None:
|
|
assert _percentile([], 0.50) == 0.0
|
|
assert _percentile([], 0.95) == 0.0
|
|
assert _percentile([], 0.99) == 0.0
|
|
|
|
|
|
def test_percentile_single_element_returns_that_element() -> None:
|
|
assert _percentile([42.0], 0.0) == 42.0
|
|
assert _percentile([42.0], 0.50) == 42.0
|
|
assert _percentile([42.0], 0.95) == 42.0
|
|
assert _percentile([42.0], 1.0) == 42.0
|
|
|
|
|
|
def test_percentile_exact_rank_returns_sample_value() -> None:
|
|
# n = 5 → rank for p50 = 0.5 * 4 = 2 → exact index 2 (value 30.0).
|
|
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
|
|
assert _percentile(sample, 0.50) == 30.0
|
|
assert _percentile(sample, 0.0) == 10.0
|
|
assert _percentile(sample, 1.0) == 50.0
|
|
|
|
|
|
def test_percentile_interpolates_between_ranks() -> None:
|
|
# n = 5 → rank for p95 = 0.95 * 4 = 3.8 → between index 3 (40.0) and
|
|
# index 4 (50.0) with fraction 0.8 → 40.0 + (50.0 - 40.0) * 0.8 = 48.0.
|
|
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
|
|
assert _percentile(sample, 0.95) == pytest.approx(48.0)
|
|
# p99 = 0.99 * 4 = 3.96 → 40.0 + 10.0 * 0.96 = 49.6.
|
|
assert _percentile(sample, 0.99) == pytest.approx(49.6)
|
|
|
|
|
|
def test_percentile_summary_empty_sample_zeros_all_fields() -> None:
|
|
assert _percentile_summary([]) == {
|
|
"p50": 0.0,
|
|
"p95": 0.0,
|
|
"p99": 0.0,
|
|
"max": 0.0,
|
|
"mean": 0.0,
|
|
}
|
|
|
|
|
|
def test_percentile_summary_known_sample_matches_cross_language_contract() -> None:
|
|
# The same five-element sample as the percentile interpolation test; the
|
|
# summary must be byte-for-byte the values the .NET / Go / Rust / Java
|
|
# drivers produce for the same input (linear interpolation, q * (n-1)).
|
|
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
|
|
summary = _percentile_summary(sample)
|
|
|
|
assert summary == {
|
|
"p50": 30.0,
|
|
"p95": 48.0,
|
|
"p99": 49.6,
|
|
"max": 50.0,
|
|
"mean": 30.0,
|
|
}
|
|
|
|
|
|
def test_percentile_summary_rounds_to_three_decimal_places() -> None:
|
|
# 1, 2, 3 → p95 = 0.95 * 2 = 1.9 → 2 + (3 - 2) * 0.9 = 2.9; round(2.9, 3)
|
|
# is 2.9. Use a sample that exercises the round() call non-trivially.
|
|
sample = [1.0, 2.0, 3.0001, 4.0001]
|
|
summary = _percentile_summary(sample)
|
|
# mean = (1 + 2 + 3.0001 + 4.0001) / 4 = 2.50005 → rounded to 2.5
|
|
assert summary["mean"] == 2.5
|
|
# max round to 3dp = 4.0
|
|
assert summary["max"] == 4.0
|
|
|
|
|
|
# --- Client.Python-015: bench-read-bulk CLI smoke test ---------------------
|
|
|
|
|
|
class _BenchFakeUnary:
|
|
"""A fake unary RPC that pops a reply per call (cycling on exhaustion)."""
|
|
|
|
def __init__(self, replies_factory: Any) -> None:
|
|
self._factory = replies_factory
|
|
self.requests: list[Any] = []
|
|
|
|
async def __call__(
|
|
self,
|
|
request: Any,
|
|
*,
|
|
metadata: tuple[tuple[str, str], ...],
|
|
) -> Any:
|
|
self.requests.append(request)
|
|
return self._factory(request)
|
|
|
|
|
|
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
|
|
return pb.MxCommandReply(
|
|
session_id="session-bench",
|
|
kind=kind,
|
|
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
|
**fields,
|
|
)
|
|
|
|
|
|
class _BenchStub:
|
|
"""Fake gateway stub that handles OpenSession + Invoke for bench-read-bulk."""
|
|
|
|
def __init__(self, tags: list[str]) -> None:
|
|
self._tags = tags
|
|
|
|
async def open_session(
|
|
request: Any,
|
|
*,
|
|
metadata: tuple[tuple[str, str], ...],
|
|
) -> Any:
|
|
return pb.OpenSessionReply(
|
|
session_id="session-bench",
|
|
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
|
)
|
|
|
|
async def close_session(
|
|
request: Any,
|
|
*,
|
|
metadata: tuple[tuple[str, str], ...],
|
|
) -> Any:
|
|
return pb.CloseSessionReply(
|
|
session_id=request.session_id,
|
|
final_state=pb.SESSION_STATE_CLOSED,
|
|
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
|
)
|
|
|
|
def _reply_for(request: Any) -> Any:
|
|
kind = request.command.kind
|
|
if kind == pb.MX_COMMAND_KIND_REGISTER:
|
|
return _ok_reply(
|
|
kind,
|
|
register=pb.RegisterReply(server_handle=7),
|
|
)
|
|
if kind == pb.MX_COMMAND_KIND_SUBSCRIBE_BULK:
|
|
results = [
|
|
pb.SubscribeResult(
|
|
server_handle=7,
|
|
tag_address=tag,
|
|
item_handle=100 + i,
|
|
was_successful=True,
|
|
)
|
|
for i, tag in enumerate(self._tags)
|
|
]
|
|
return _ok_reply(
|
|
kind,
|
|
subscribe_bulk=pb.BulkSubscribeReply(results=results),
|
|
)
|
|
if kind == pb.MX_COMMAND_KIND_UNSUBSCRIBE_BULK:
|
|
results = [
|
|
pb.SubscribeResult(
|
|
server_handle=7,
|
|
item_handle=100 + i,
|
|
was_successful=True,
|
|
)
|
|
for i in range(len(self._tags))
|
|
]
|
|
return _ok_reply(
|
|
kind,
|
|
unsubscribe_bulk=pb.BulkSubscribeReply(results=results),
|
|
)
|
|
if kind == pb.MX_COMMAND_KIND_READ_BULK:
|
|
results = [
|
|
pb.BulkReadResult(
|
|
server_handle=7,
|
|
tag_address=tag,
|
|
item_handle=100 + i,
|
|
was_successful=True,
|
|
was_cached=True,
|
|
)
|
|
for i, tag in enumerate(self._tags)
|
|
]
|
|
return _ok_reply(
|
|
kind,
|
|
read_bulk=pb.BulkReadReply(results=results),
|
|
)
|
|
raise AssertionError(f"unexpected MxCommand kind in bench test: {kind}")
|
|
|
|
self.OpenSession = open_session
|
|
self.CloseSession = close_session
|
|
self.Invoke = _BenchFakeUnary(_reply_for)
|
|
|
|
|
|
def test_bench_read_bulk_emits_cross_language_schema(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Drive bench-read-bulk with duration=0 / warmup=0 and assert the schema.
|
|
|
|
A drift in any of these field names (callsPerSecond, cachedReadResults,
|
|
latencyMs.p50, …) would break the cross-language
|
|
scripts/bench-read-bulk.ps1 aggregation silently.
|
|
"""
|
|
|
|
bulk_size = 3
|
|
tags = [f"TestMachine_{i:03d}.TestChangingInt" for i in range(1, 1 + bulk_size)]
|
|
|
|
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
|
|
return await GatewayClient.connect(
|
|
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
|
|
stub=_BenchStub(tags),
|
|
)
|
|
|
|
monkeypatch.setattr(commands, "_connect", _fake_connect)
|
|
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
commands.main,
|
|
[
|
|
"bench-read-bulk",
|
|
"--endpoint",
|
|
"localhost:5000",
|
|
"--client-name",
|
|
"pytest-bench",
|
|
"--duration-seconds",
|
|
"0",
|
|
"--warmup-seconds",
|
|
"0",
|
|
"--bulk-size",
|
|
str(bulk_size),
|
|
"--tag-start",
|
|
"1",
|
|
"--json",
|
|
],
|
|
)
|
|
|
|
assert result.exit_code == 0, result.output
|
|
payload = json.loads(result.output)
|
|
|
|
# Locked cross-language schema (matches .NET / Go / Rust / Java drivers).
|
|
expected_top_level = {
|
|
"language",
|
|
"command",
|
|
"endpoint",
|
|
"clientName",
|
|
"bulkSize",
|
|
"durationSeconds",
|
|
"warmupSeconds",
|
|
"durationMs",
|
|
"tags",
|
|
"totalCalls",
|
|
"successfulCalls",
|
|
"failedCalls",
|
|
"totalReadResults",
|
|
"cachedReadResults",
|
|
"callsPerSecond",
|
|
"latencyMs",
|
|
}
|
|
assert set(payload.keys()) == expected_top_level
|
|
assert payload["language"] == "python"
|
|
assert payload["command"] == "bench-read-bulk"
|
|
assert payload["endpoint"] == "localhost:5000"
|
|
assert payload["clientName"] == "pytest-bench"
|
|
assert payload["bulkSize"] == bulk_size
|
|
assert payload["durationSeconds"] == 0
|
|
assert payload["warmupSeconds"] == 0
|
|
assert payload["tags"] == tags
|
|
|
|
# latencyMs sub-shape is the percentile-summary contract.
|
|
assert set(payload["latencyMs"].keys()) == {"p50", "p95", "p99", "max", "mean"}
|
|
for key in ("p50", "p95", "p99", "max", "mean"):
|
|
assert isinstance(payload["latencyMs"][key], (int, float))
|
|
|
|
|
|
# --- Client.Python-016: MAX_AGGREGATE_EVENTS guard -------------------------
|
|
|
|
|
|
def test_collect_events_rejects_max_events_above_aggregate_cap(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""``--max-events`` greater than ``MAX_AGGREGATE_EVENTS`` exits non-zero
|
|
with the documented error message.
|
|
|
|
The guard lives inside ``_collect_events`` (after a session is opened),
|
|
so the test routes the CLI through stubbed ``_connect`` / ``_session``
|
|
fakes and asserts the guard fires before any event is pulled.
|
|
"""
|
|
|
|
class _EventStreamShouldNotBeUsed:
|
|
def __aiter__(self) -> "_EventStreamShouldNotBeUsed":
|
|
return self
|
|
|
|
async def __anext__(self) -> pb.MxEvent:
|
|
raise AssertionError(
|
|
"MAX_AGGREGATE_EVENTS guard must trip before any event is pulled",
|
|
)
|
|
|
|
class _FakeSession:
|
|
def __init__(self) -> None:
|
|
self.session_id = "session-1"
|
|
|
|
def stream_events(
|
|
self, *, after_worker_sequence: int = 0
|
|
) -> _EventStreamShouldNotBeUsed:
|
|
return _EventStreamShouldNotBeUsed()
|
|
|
|
class _FakeClient:
|
|
async def __aenter__(self) -> "_FakeClient":
|
|
return self
|
|
|
|
async def __aexit__(self, *exc_info: object) -> None:
|
|
return None
|
|
|
|
async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient:
|
|
return _FakeClient()
|
|
|
|
def _fake_session(client: Any, session_id: str) -> _FakeSession:
|
|
return _FakeSession()
|
|
|
|
monkeypatch.setattr(commands, "_connect", _fake_connect)
|
|
monkeypatch.setattr(commands, "_session", _fake_session)
|
|
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
commands.main,
|
|
[
|
|
"stream-events",
|
|
"--endpoint",
|
|
"localhost:5000",
|
|
"--session-id",
|
|
"session-1",
|
|
"--max-events",
|
|
str(MAX_AGGREGATE_EVENTS + 1),
|
|
"--plaintext",
|
|
"--json",
|
|
],
|
|
)
|
|
|
|
assert result.exit_code != 0
|
|
assert f"less than or equal to {MAX_AGGREGATE_EVENTS}" in result.output
|
|
assert "--max-events" in result.output
|
|
|
|
|
|
def test_collect_events_accepts_max_events_at_aggregate_cap_boundary(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""``--max-events`` equal to ``MAX_AGGREGATE_EVENTS`` must not trip the guard."""
|
|
|
|
class _EmptyEventStream:
|
|
def __aiter__(self) -> "_EmptyEventStream":
|
|
return self
|
|
|
|
async def __anext__(self) -> pb.MxEvent:
|
|
raise StopAsyncIteration
|
|
|
|
class _FakeSession:
|
|
def __init__(self) -> None:
|
|
self.client = None # type: ignore[assignment]
|
|
self.session_id = "session-1"
|
|
|
|
def stream_events(self, *, after_worker_sequence: int = 0) -> _EmptyEventStream:
|
|
return _EmptyEventStream()
|
|
|
|
class _FakeClient:
|
|
async def __aenter__(self) -> "_FakeClient":
|
|
return self
|
|
|
|
async def __aexit__(self, *exc_info: object) -> None:
|
|
return None
|
|
|
|
async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient:
|
|
return _FakeClient()
|
|
|
|
def _fake_session(client: Any, session_id: str) -> _FakeSession:
|
|
return _FakeSession()
|
|
|
|
monkeypatch.setattr(commands, "_connect", _fake_connect)
|
|
monkeypatch.setattr(commands, "_session", _fake_session)
|
|
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
commands.main,
|
|
[
|
|
"stream-events",
|
|
"--endpoint",
|
|
"localhost:5000",
|
|
"--session-id",
|
|
"session-1",
|
|
"--max-events",
|
|
str(MAX_AGGREGATE_EVENTS),
|
|
"--timeout",
|
|
"0.01",
|
|
"--plaintext",
|
|
"--json",
|
|
],
|
|
)
|
|
|
|
assert result.exit_code == 0, result.output
|
|
payload = json.loads(result.output)
|
|
assert payload == {"events": []}
|
|
|
|
|
|
# --- Client.Python-016: _api_key_from_env ----------------------------------
|
|
|
|
|
|
def test_api_key_from_env_resolves_value_when_variable_is_set(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
monkeypatch.setenv("MXGATEWAY_TEST_API_KEY", "mxgw_envtest_secret")
|
|
|
|
assert _api_key_from_env("MXGATEWAY_TEST_API_KEY") == "mxgw_envtest_secret"
|
|
|
|
|
|
def test_api_key_from_env_returns_none_when_variable_is_unset(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
monkeypatch.delenv("MXGATEWAY_TEST_API_KEY_NOT_SET", raising=False)
|
|
|
|
assert _api_key_from_env("MXGATEWAY_TEST_API_KEY_NOT_SET") is None
|
|
|
|
|
|
def test_api_key_from_env_returns_none_when_name_is_none() -> None:
|
|
assert _api_key_from_env(None) is None
|
|
|
|
|
|
def test_api_key_from_env_returns_none_when_name_is_empty_string() -> None:
|
|
# The implementation guards on ``if not name`` so empty string is treated
|
|
# the same as ``None`` — no env lookup is attempted.
|
|
assert _api_key_from_env("") is None
|