Files
mxaccessgw/clients/python/tests/test_galaxy.py
T
Joseph Doherty 397d3c5c4f rename: apply ZB.MOM.WW prefix to all client SDKs + fix pre-existing alarm-RPC breaks
Rename across every client surface using each language's idiomatic convention:

  * .NET   clients/dotnet/MxGateway.Client[.Cli|.Tests]/
             -> clients/dotnet/ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]/
             namespaces -> ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]
             contracts ProjectReference repointed to ZB.MOM.WW.MxGateway.Contracts
             sln migrated to slnx (dotnet sln migrate)
  * Python src/mxgateway -> src/zb_mom_ww_mxgateway
             src/mxgateway_cli -> src/zb_mom_ww_mxgateway_cli
             distribution: mxaccess-gateway-client -> zb-mom-ww-mxaccess-gateway-client
  * Rust   crate: mxgateway-client -> zb-mom-ww-mxgateway-client
             build.rs proto path repointed
  * Java   subprojects: mxgateway-{client,cli} -> zb-mom-ww-mxgateway-{client,cli}
             packages com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
             group   com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
             rootProject mxaccessgw-java -> zb-mom-ww-mxaccessgw-java
  * Go     generate-proto.ps1 proto path repointed; module path and
             package mxgateway kept (Go convention).
  * proto-inputs.json: generatedOutputs.python updated to new package path.
  * scripts/run-client-e2e-tests.ps1: Java CLI install path + gradle task
             updated to zb-mom-ww-mxgateway-cli.

CLI binary names (mxgw, mxgw-py, mxgw-go, mxgateway-cli) and wire-level
identifiers (MXGATEWAY_* env vars, the mxgw_<id>_<secret> API key
prefix, protobuf package names like mxaccess_gateway.v1, all MXAccess
references) intentionally NOT renamed.

Fix pre-existing alarms-over-gateway breaks unblocked by the rename:

  * mxaccess_gateway.proto: add missing public message QueryActiveAlarmsRequest
    {session_id, client_correlation_id, alarm_filter_prefix} and missing
    rpc QueryActiveAlarms(QueryActiveAlarmsRequest) returns
    (stream ActiveAlarmSnapshot). All four typed clients referenced
    these but they were absent from the proto.
  * MxAccessGatewayService.QueryActiveAlarms: implement the new RPC on
    the server, streaming from IGatewayAlarmService.CurrentAlarms with
    optional alarm_filter_prefix filter.
  * clients/dotnet/.../DiscoverHierarchyOptions.cs: add the hand-written
    .NET POCO that wraps DiscoverHierarchyRequest (referenced by
    GalaxyRepositoryClient.DiscoverHierarchyAsync but never authored).
  * Drop retired session_id field references from
    AcknowledgeAlarmRequest/AcknowledgeAlarmReply test fixtures across
    .NET, Rust, Go, and Python clients.
  * Rust integration test: add the missing stream_alarms impl on the
    fake MxAccessGateway server (the trait gained the method, fake
    didn't).
  * Rust CLI test: bump expected gatewayProtocolVersion 2 -> 3.

Regenerated artifacts updated in this commit:
  * src/ZB.MOM.WW.MxGateway.Contracts/Generated/{MxaccessGateway,MxaccessGatewayGrpc}.cs
  * clients/python/src/zb_mom_ww_mxgateway/generated/*_pb2{,_grpc}.py
  * clients/go/internal/generated/*.pb.go
(C# regenerated by Grpc.Tools on contracts build; Python and Go via
their generate-proto.ps1 scripts; Rust regenerates from .proto via
tonic-build at compile time so no checked-in artefact.)

Verification: 472 server tests, 275 worker tests (9 dev-rig skipped),
18 integration tests (live MxAccess + LDAP + Galaxy), 57 .NET client
tests, 32 Rust workspace tests, 39 Python tests, all Go packages, and
gradle build for Java all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 19:09:34 -04:00

348 lines
12 KiB
Python

"""Tests for the Galaxy Repository async client wrapper."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Any
import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from zb_mom_ww_mxgateway import ClientOptions, DeployEvent, GalaxyRepositoryClient, WatchDeployEventsRequest
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
def test_galaxy_messages_import() -> None:
request = galaxy_pb.DiscoverHierarchyRequest()
obj = galaxy_pb.GalaxyObject(
gobject_id=42,
tag_name="DelmiaReceiver_001",
contained_name="DelmiaReceiver",
browse_name="DelmiaReceiver",
parent_gobject_id=10,
is_area=False,
category_id=4,
hosted_by_gobject_id=10,
template_chain=["$ApplicationObject", "$DelmiaReceiver"],
attributes=[
galaxy_pb.GalaxyAttribute(
attribute_name="DownloadPath",
full_tag_reference="DelmiaReceiver_001.DownloadPath",
mx_data_type=8,
data_type_name="String",
),
],
)
assert request.DESCRIPTOR is not None
assert obj.attributes[0].attribute_name == "DownloadPath"
assert hasattr(galaxy_pb_grpc, "GalaxyRepositoryStub")
@pytest.mark.asyncio
async def test_test_connection_returns_bool_and_sends_auth() -> None:
stub = FakeGalaxyStub()
stub.test_connection.replies = [galaxy_pb.TestConnectionReply(ok=True)]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
result = await client.test_connection()
assert result is True
assert stub.test_connection.metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert isinstance(stub.test_connection.requests[0], galaxy_pb.TestConnectionRequest)
@pytest.mark.asyncio
async def test_get_last_deploy_time_returns_datetime_when_present() -> None:
timestamp = Timestamp()
timestamp.FromDatetime(datetime(2025, 4, 1, 12, 30, 45, tzinfo=timezone.utc))
stub = FakeGalaxyStub()
stub.get_last_deploy_time.replies = [
galaxy_pb.GetLastDeployTimeReply(present=True, time_of_last_deploy=timestamp),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
when = await client.get_last_deploy_time()
assert when is not None
assert when.year == 2025
assert when.month == 4
assert when.day == 1
assert when.hour == 12
assert when.minute == 30
assert when.second == 45
@pytest.mark.asyncio
async def test_get_last_deploy_time_returns_none_when_not_present() -> None:
stub = FakeGalaxyStub()
stub.get_last_deploy_time.replies = [galaxy_pb.GetLastDeployTimeReply(present=False)]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
assert await client.get_last_deploy_time() is None
@pytest.mark.asyncio
async def test_discover_hierarchy_returns_proto_objects() -> None:
stub = FakeGalaxyStub()
stub.discover_hierarchy.replies = [
galaxy_pb.DiscoverHierarchyReply(
next_page_token="page-2",
total_object_count=2,
objects=[
galaxy_pb.GalaxyObject(
gobject_id=1,
tag_name="TestMachine_001",
contained_name="TestMachine",
browse_name="TestMachine_001",
is_area=True,
),
],
),
galaxy_pb.DiscoverHierarchyReply(
total_object_count=2,
objects=[
galaxy_pb.GalaxyObject(
gobject_id=2,
tag_name="DelmiaReceiver_001",
contained_name="DelmiaReceiver",
browse_name="DelmiaReceiver",
parent_gobject_id=1,
attributes=[
galaxy_pb.GalaxyAttribute(
attribute_name="DownloadPath",
full_tag_reference="DelmiaReceiver_001.DownloadPath",
mx_data_type=8,
data_type_name="String",
),
],
),
],
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
objects = await client.discover_hierarchy()
assert isinstance(objects, list)
assert len(objects) == 2
assert len(stub.discover_hierarchy.requests) == 2
assert stub.discover_hierarchy.requests[0].page_size == 5000
assert stub.discover_hierarchy.requests[0].page_token == ""
assert stub.discover_hierarchy.requests[1].page_token == "page-2"
assert objects[0].tag_name == "TestMachine_001"
assert objects[1].attributes[0].full_tag_reference == "DelmiaReceiver_001.DownloadPath"
@pytest.mark.asyncio
async def test_discover_hierarchy_rejects_repeated_page_token() -> None:
stub = FakeGalaxyStub()
stub.discover_hierarchy.replies = [
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
with pytest.raises(Exception, match="repeated page token"):
await client.discover_hierarchy()
@pytest.mark.asyncio
async def test_watch_deploy_events_yields_events_in_order() -> None:
ts1 = Timestamp()
ts1.FromDatetime(datetime(2025, 4, 1, 10, 0, 0, tzinfo=timezone.utc))
ts2 = Timestamp()
ts2.FromDatetime(datetime(2025, 4, 1, 11, 0, 0, tzinfo=timezone.utc))
events = [
galaxy_pb.DeployEvent(
sequence=1,
observed_at=ts1,
time_of_last_deploy=ts1,
time_of_last_deploy_present=True,
object_count=10,
attribute_count=42,
),
galaxy_pb.DeployEvent(
sequence=2,
observed_at=ts2,
time_of_last_deploy=ts2,
time_of_last_deploy_present=True,
object_count=11,
attribute_count=45,
),
]
stub = FakeGalaxyStub()
stub.watch_deploy_events.replies = list(events)
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
received: list[DeployEvent] = []
async for event in client.watch_deploy_events():
received.append(event)
assert len(received) == 2
assert received[0].sequence == 1
assert received[1].sequence == 2
assert received[0].object_count == 10
assert received[1].attribute_count == 45
assert stub.watch_deploy_events.metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert isinstance(stub.watch_deploy_events.requests[0], galaxy_pb.WatchDeployEventsRequest)
# No last_seen_deploy_time was passed, so the request should leave it unset.
assert not stub.watch_deploy_events.requests[0].HasField("last_seen_deploy_time")
@pytest.mark.asyncio
async def test_watch_deploy_events_propagates_last_seen_deploy_time() -> None:
last_seen = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
stub = FakeGalaxyStub()
stub.watch_deploy_events.replies = []
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
async for _ in client.watch_deploy_events(last_seen_deploy_time=last_seen):
pass
request = stub.watch_deploy_events.requests[0]
assert isinstance(request, WatchDeployEventsRequest)
assert request.HasField("last_seen_deploy_time")
assert request.last_seen_deploy_time.ToDatetime(tzinfo=timezone.utc) == last_seen
@pytest.mark.asyncio
async def test_watch_deploy_events_cancellation_closes_stream() -> None:
ts = Timestamp()
ts.FromDatetime(datetime(2025, 4, 1, 10, 0, 0, tzinfo=timezone.utc))
stub = FakeGalaxyStub()
# Use a "blocking" stream that never yields more after the first event.
stub.watch_deploy_events = FakeStream(
[galaxy_pb.DeployEvent(sequence=1, observed_at=ts)],
block_after_replies=True,
)
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
iterator = client.watch_deploy_events()
first = await iterator.__anext__()
assert first.sequence == 1
# Break the iterator by aclose() — this should drive the cancel path.
await iterator.aclose()
assert stub.watch_deploy_events.cancel_called is True
@pytest.mark.asyncio
async def test_close_marks_channel_closed_when_no_real_channel() -> None:
stub = FakeGalaxyStub()
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
await client.close()
# Idempotent: a second close should not raise.
await client.close()
class FakeGalaxyStub:
def __init__(self) -> None:
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
self.get_last_deploy_time = FakeUnary([galaxy_pb.GetLastDeployTimeReply(present=False)])
self.discover_hierarchy = FakeUnary([galaxy_pb.DiscoverHierarchyReply()])
self.watch_deploy_events = FakeStream([])
self.TestConnection = self.test_connection
self.GetLastDeployTime = self.get_last_deploy_time
self.DiscoverHierarchy = self.discover_hierarchy
@property
def WatchDeployEvents(self) -> "FakeStream": # noqa: N802 — gRPC naming
return self.watch_deploy_events
class FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
timeout: float | None = None,
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class FakeStream:
"""Sync-callable fake matching the gRPC unary-stream surface.
Calling the stub returns ``self`` (an async iterator). After exhausting the
seeded ``replies``, iteration either ends (default) or blocks indefinitely
(``block_after_replies=True``) so cancellation paths can be exercised.
"""
def __init__(
self,
replies: list[Any],
*,
block_after_replies: bool = False,
) -> None:
self.replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
self.cancel_called = False
self._block_after_replies = block_after_replies
def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
timeout: float | None = None,
) -> "FakeStream":
self.requests.append(request)
self.metadata = metadata
return self
def __aiter__(self) -> "FakeStream":
return self
async def __anext__(self) -> Any:
if self.replies:
return self.replies.pop(0)
if self._block_after_replies:
# Sleep forever until the consumer cancels us.
await asyncio.Event().wait()
raise StopAsyncIteration
def cancel(self) -> None:
self.cancel_called = True