Files
mxaccessgw/clients/python/tests/test_client_session.py
T
Joseph Doherty 397d3c5c4f rename: apply ZB.MOM.WW prefix to all client SDKs + fix pre-existing alarm-RPC breaks
Rename across every client surface using each language's idiomatic convention:

  * .NET   clients/dotnet/MxGateway.Client[.Cli|.Tests]/
             -> clients/dotnet/ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]/
             namespaces -> ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]
             contracts ProjectReference repointed to ZB.MOM.WW.MxGateway.Contracts
             sln migrated to slnx (dotnet sln migrate)
  * Python src/mxgateway -> src/zb_mom_ww_mxgateway
             src/mxgateway_cli -> src/zb_mom_ww_mxgateway_cli
             distribution: mxaccess-gateway-client -> zb-mom-ww-mxaccess-gateway-client
  * Rust   crate: mxgateway-client -> zb-mom-ww-mxgateway-client
             build.rs proto path repointed
  * Java   subprojects: mxgateway-{client,cli} -> zb-mom-ww-mxgateway-{client,cli}
             packages com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
             group   com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
             rootProject mxaccessgw-java -> zb-mom-ww-mxaccessgw-java
  * Go     generate-proto.ps1 proto path repointed; module path and
             package mxgateway kept (Go convention).
  * proto-inputs.json: generatedOutputs.python updated to new package path.
  * scripts/run-client-e2e-tests.ps1: Java CLI install path + gradle task
             updated to zb-mom-ww-mxgateway-cli.

CLI binary names (mxgw, mxgw-py, mxgw-go, mxgateway-cli) and wire-level
identifiers (MXGATEWAY_* env vars, the mxgw_<id>_<secret> API key
prefix, protobuf package names like mxaccess_gateway.v1, all MXAccess
references) intentionally NOT renamed.

Fix pre-existing alarms-over-gateway breaks unblocked by the rename:

  * mxaccess_gateway.proto: add missing public message QueryActiveAlarmsRequest
    {session_id, client_correlation_id, alarm_filter_prefix} and missing
    rpc QueryActiveAlarms(QueryActiveAlarmsRequest) returns
    (stream ActiveAlarmSnapshot). All four typed clients referenced
    these but they were absent from the proto.
  * MxAccessGatewayService.QueryActiveAlarms: implement the new RPC on
    the server, streaming from IGatewayAlarmService.CurrentAlarms with
    optional alarm_filter_prefix filter.
  * clients/dotnet/.../DiscoverHierarchyOptions.cs: add the hand-written
    .NET POCO that wraps DiscoverHierarchyRequest (referenced by
    GalaxyRepositoryClient.DiscoverHierarchyAsync but never authored).
  * Drop retired session_id field references from
    AcknowledgeAlarmRequest/AcknowledgeAlarmReply test fixtures across
    .NET, Rust, Go, and Python clients.
  * Rust integration test: add the missing stream_alarms impl on the
    fake MxAccessGateway server (the trait gained the method, fake
    didn't).
  * Rust CLI test: bump expected gatewayProtocolVersion 2 -> 3.

Regenerated artifacts updated in this commit:
  * src/ZB.MOM.WW.MxGateway.Contracts/Generated/{MxaccessGateway,MxaccessGatewayGrpc}.cs
  * clients/python/src/zb_mom_ww_mxgateway/generated/*_pb2{,_grpc}.py
  * clients/go/internal/generated/*.pb.go
(C# regenerated by Grpc.Tools on contracts build; Python and Go via
their generate-proto.ps1 scripts; Rust regenerates from .proto via
tonic-build at compile time so no checked-in artefact.)

Verification: 472 server tests, 275 worker tests (9 dev-rig skipped),
18 integration tests (live MxAccess + LDAP + Galaxy), 57 .NET client
tests, 32 Rust workspace tests, 39 Python tests, all Go packages, and
gradle build for Java all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 19:09:34 -04:00

261 lines
8.2 KiB
Python

"""Tests for the async client and session wrappers."""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from zb_mom_ww_mxgateway import ClientOptions, GatewayClient, MxAccessError
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
@pytest.mark.asyncio
async def test_session_helpers_send_auth_metadata_and_preserve_raw_replies() -> None:
stub = FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session(client_session_name="pytest")
server_handle = await session.register("pytest-client")
item_handle = await session.add_item(server_handle, "Object.Attribute")
await session.advise(server_handle, item_handle)
assert session.session_id == "session-1"
assert server_handle == 12
assert item_handle == 34
assert stub.open_session.metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert stub.invoke.requests[0].command.register.client_name == "pytest-client"
assert stub.invoke.requests[1].command.add_item.item_definition == "Object.Attribute"
assert stub.invoke.requests[2].command.advise.item_handle == 34
@pytest.mark.asyncio
async def test_mxaccess_error_preserves_raw_reply() -> None:
stub = FakeGatewayStub()
failure_reply = pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE,
protocol_status=pb.ProtocolStatus(
code=pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
message="MXAccess rejected write.",
),
hresult=-1,
)
stub.invoke.replies = [failure_reply]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
with pytest.raises(MxAccessError) as captured:
await session.write(12, 34, 123)
assert captured.value.raw_reply is failure_reply
@pytest.mark.asyncio
async def test_subscribe_bulk_sends_one_bulk_command_and_returns_results() -> None:
stub = FakeGatewayStub()
bulk_reply = pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_SUBSCRIBE_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
subscribe_bulk=pb.BulkSubscribeReply(
results=[
pb.SubscribeResult(
server_handle=12,
tag_address="Area001.Pump001.Speed",
item_handle=34,
was_successful=True,
),
],
),
)
stub.invoke.replies = [bulk_reply]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
results = await session.subscribe_bulk(12, ["Area001.Pump001.Speed"])
assert results[0].item_handle == 34
assert len(stub.invoke.requests) == 1
assert stub.invoke.requests[0].command.kind == pb.MX_COMMAND_KIND_SUBSCRIBE_BULK
assert list(stub.invoke.requests[0].command.subscribe_bulk.tag_addresses) == [
"Area001.Pump001.Speed",
]
@pytest.mark.asyncio
async def test_stream_events_cancels_underlying_call_when_closed() -> None:
stream = FakeStream(
[
pb.MxEvent(
session_id="session-1",
worker_sequence=1,
family=pb.MX_EVENT_FAMILY_ON_DATA_CHANGE,
),
],
)
stub = FakeGatewayStub(stream=stream)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
events = session.stream_events()
first = await anext(events)
await events.aclose()
assert first.worker_sequence == 1
assert stream.cancelled
assert stub.stream_metadata == (("authorization", "Bearer mxgw_test_secret"),)
@pytest.mark.asyncio
async def test_unary_task_cancellation_reaches_fake_call() -> None:
blocking = BlockingCancellableUnary()
stub = FakeGatewayStub()
stub.OpenSession = blocking
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
task = asyncio.create_task(client.open_session())
await blocking.started.wait()
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert blocking.call is not None
assert blocking.call.cancelled
class FakeGatewayStub:
def __init__(self, stream: "FakeStream | None" = None) -> None:
self.open_session = FakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.close_session = FakeUnary(
[
pb.CloseSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = FakeUnary(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_REGISTER,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
register=pb.RegisterReply(server_handle=12),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
add_item=pb.AddItemReply(item_handle=34),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADVISE,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.OpenSession = self.open_session
self.CloseSession = self.close_session
self.Invoke = self.invoke
self._stream = stream or FakeStream([])
self.stream_metadata: tuple[tuple[str, str], ...] | None = None
def StreamEvents(
self,
request: pb.StreamEventsRequest,
*,
metadata: tuple[tuple[str, str], ...],
) -> "FakeStream":
self.stream_request = request
self.stream_metadata = metadata
return self._stream
class FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class BlockingCancellableUnary:
def __init__(self) -> None:
self.started = asyncio.Event()
self.call: BlockingCall | None = None
def __call__(self, *_args: Any, **_kwargs: Any) -> "BlockingCall":
self.call = BlockingCall(self.started)
return self.call
class BlockingCall:
def __init__(self, started: asyncio.Event) -> None:
self.started = started
self.cancelled = False
def __await__(self):
return self._wait().__await__()
async def _wait(self) -> Any:
self.started.set()
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
raise
def cancel(self) -> None:
self.cancelled = True
class FakeStream:
def __init__(self, events: list[pb.MxEvent]) -> None:
self._events = events
self.cancelled = False
def __aiter__(self) -> "FakeStream":
return self
async def __anext__(self) -> pb.MxEvent:
if not self._events:
await asyncio.sleep(3600)
return self._events.pop(0)
def cancel(self) -> None:
self.cancelled = True