rename: apply ZB.MOM.WW prefix to all client SDKs + fix pre-existing alarm-RPC breaks
Rename across every client surface using each language's idiomatic convention:
* .NET clients/dotnet/MxGateway.Client[.Cli|.Tests]/
-> clients/dotnet/ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]/
namespaces -> ZB.MOM.WW.MxGateway.Client[.Cli|.Tests]
contracts ProjectReference repointed to ZB.MOM.WW.MxGateway.Contracts
sln migrated to slnx (dotnet sln migrate)
* Python src/mxgateway -> src/zb_mom_ww_mxgateway
src/mxgateway_cli -> src/zb_mom_ww_mxgateway_cli
distribution: mxaccess-gateway-client -> zb-mom-ww-mxaccess-gateway-client
* Rust crate: mxgateway-client -> zb-mom-ww-mxgateway-client
build.rs proto path repointed
* Java subprojects: mxgateway-{client,cli} -> zb-mom-ww-mxgateway-{client,cli}
packages com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
group com.dohertylan.mxgateway -> com.zb.mom.ww.mxgateway
rootProject mxaccessgw-java -> zb-mom-ww-mxaccessgw-java
* Go generate-proto.ps1 proto path repointed; module path and
package mxgateway kept (Go convention).
* proto-inputs.json: generatedOutputs.python updated to new package path.
* scripts/run-client-e2e-tests.ps1: Java CLI install path + gradle task
updated to zb-mom-ww-mxgateway-cli.
CLI binary names (mxgw, mxgw-py, mxgw-go, mxgateway-cli) and wire-level
identifiers (MXGATEWAY_* env vars, the mxgw_<id>_<secret> API key
prefix, protobuf package names like mxaccess_gateway.v1, all MXAccess
references) intentionally NOT renamed.
Fix pre-existing alarms-over-gateway breaks unblocked by the rename:
* mxaccess_gateway.proto: add missing public message QueryActiveAlarmsRequest
{session_id, client_correlation_id, alarm_filter_prefix} and missing
rpc QueryActiveAlarms(QueryActiveAlarmsRequest) returns
(stream ActiveAlarmSnapshot). All four typed clients referenced
these but they were absent from the proto.
* MxAccessGatewayService.QueryActiveAlarms: implement the new RPC on
the server, streaming from IGatewayAlarmService.CurrentAlarms with
optional alarm_filter_prefix filter.
* clients/dotnet/.../DiscoverHierarchyOptions.cs: add the hand-written
.NET POCO that wraps DiscoverHierarchyRequest (referenced by
GalaxyRepositoryClient.DiscoverHierarchyAsync but never authored).
* Drop retired session_id field references from
AcknowledgeAlarmRequest/AcknowledgeAlarmReply test fixtures across
.NET, Rust, Go, and Python clients.
* Rust integration test: add the missing stream_alarms impl on the
fake MxAccessGateway server (the trait gained the method, fake
didn't).
* Rust CLI test: bump expected gatewayProtocolVersion 2 -> 3.
Regenerated artifacts updated in this commit:
* src/ZB.MOM.WW.MxGateway.Contracts/Generated/{MxaccessGateway,MxaccessGatewayGrpc}.cs
* clients/python/src/zb_mom_ww_mxgateway/generated/*_pb2{,_grpc}.py
* clients/go/internal/generated/*.pb.go
(C# regenerated by Grpc.Tools on contracts build; Python and Go via
their generate-proto.ps1 scripts; Rust regenerates from .proto via
tonic-build at compile time so no checked-in artefact.)
Verification: 472 server tests, 275 worker tests (9 dev-rig skipped),
18 integration tests (live MxAccess + LDAP + Galaxy), 57 .NET client
tests, 32 Rust workspace tests, 39 Python tests, all Go packages, and
gradle build for Java all pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,219 @@
|
||||
"""Async Galaxy Repository client wrapper.
|
||||
|
||||
Wraps the read-only ``GalaxyRepository`` gRPC service exposed by the
|
||||
MxAccess Gateway. The service lets callers test connectivity to the AVEVA
|
||||
System Platform Galaxy Repository (ZB SQL database), read the last
|
||||
deployment timestamp, and enumerate the deployed object hierarchy plus the
|
||||
attributes on each object.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator, Sequence
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
from google.protobuf.timestamp_pb2 import Timestamp
|
||||
|
||||
from .auth import merge_metadata
|
||||
from .errors import MxGatewayError, map_rpc_error
|
||||
from .generated import galaxy_repository_pb2 as galaxy_pb
|
||||
from .generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
|
||||
from .options import ClientOptions, create_channel
|
||||
|
||||
_DISCOVER_HIERARCHY_PAGE_SIZE = 5000
|
||||
|
||||
|
||||
class GalaxyRepositoryClient:
|
||||
"""Async client for the Galaxy Repository gRPC service."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
options: ClientOptions,
|
||||
stub: Any,
|
||||
channel: grpc.aio.Channel | None = None,
|
||||
) -> None:
|
||||
"""Initialize the client with resolved options and a gRPC stub."""
|
||||
self.options = options
|
||||
self.raw_stub = stub
|
||||
self._channel = channel
|
||||
self._closed = False
|
||||
|
||||
@classmethod
|
||||
async def connect(
|
||||
cls,
|
||||
options: ClientOptions | None = None,
|
||||
*,
|
||||
endpoint: str | None = None,
|
||||
api_key: str | None = None,
|
||||
plaintext: bool = False,
|
||||
ca_file: str | None = None,
|
||||
server_name_override: str | None = None,
|
||||
stub: Any | None = None,
|
||||
) -> "GalaxyRepositoryClient":
|
||||
"""Create a client with either a real async channel or a supplied fake stub."""
|
||||
|
||||
resolved = options or ClientOptions(
|
||||
endpoint=endpoint or "",
|
||||
api_key=api_key,
|
||||
plaintext=plaintext,
|
||||
ca_file=ca_file,
|
||||
server_name_override=server_name_override,
|
||||
)
|
||||
|
||||
if stub is not None:
|
||||
return cls(options=resolved, stub=stub)
|
||||
|
||||
channel = create_channel(resolved)
|
||||
return cls(
|
||||
options=resolved,
|
||||
stub=galaxy_pb_grpc.GalaxyRepositoryStub(channel),
|
||||
channel=channel,
|
||||
)
|
||||
|
||||
async def __aenter__(self) -> "GalaxyRepositoryClient":
|
||||
"""Return self to support ``async with`` usage."""
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_exc_info: object) -> None:
|
||||
"""Close the client when leaving an ``async with`` block."""
|
||||
await self.close()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the owned gRPC channel."""
|
||||
|
||||
if self._closed:
|
||||
return
|
||||
|
||||
if self._channel is not None:
|
||||
await self._channel.close()
|
||||
self._closed = True
|
||||
|
||||
async def test_connection(self) -> bool:
|
||||
"""Return ``True`` when the gateway can reach the Galaxy Repository DB."""
|
||||
|
||||
reply = await self._unary(
|
||||
"test connection",
|
||||
self.raw_stub.TestConnection,
|
||||
galaxy_pb.TestConnectionRequest(),
|
||||
)
|
||||
return bool(reply.ok)
|
||||
|
||||
async def get_last_deploy_time(self) -> datetime | None:
|
||||
"""Return the last Galaxy deploy timestamp or ``None`` when unset."""
|
||||
|
||||
reply = await self._unary(
|
||||
"get last deploy time",
|
||||
self.raw_stub.GetLastDeployTime,
|
||||
galaxy_pb.GetLastDeployTimeRequest(),
|
||||
)
|
||||
if not reply.present:
|
||||
return None
|
||||
return reply.time_of_last_deploy.ToDatetime()
|
||||
|
||||
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
|
||||
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
|
||||
|
||||
objects: list[galaxy_pb.GalaxyObject] = []
|
||||
seen_page_tokens: set[str] = set()
|
||||
page_token = ""
|
||||
while True:
|
||||
reply = await self._unary(
|
||||
"discover hierarchy",
|
||||
self.raw_stub.DiscoverHierarchy,
|
||||
galaxy_pb.DiscoverHierarchyRequest(
|
||||
page_size=_DISCOVER_HIERARCHY_PAGE_SIZE,
|
||||
page_token=page_token,
|
||||
),
|
||||
)
|
||||
objects.extend(reply.objects)
|
||||
page_token = reply.next_page_token
|
||||
if not page_token:
|
||||
return objects
|
||||
if page_token in seen_page_tokens:
|
||||
raise MxGatewayError(
|
||||
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
|
||||
)
|
||||
seen_page_tokens.add(page_token)
|
||||
|
||||
def watch_deploy_events(
|
||||
self,
|
||||
last_seen_deploy_time: datetime | None = None,
|
||||
*,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> AsyncIterator[galaxy_pb.DeployEvent]:
|
||||
"""Stream Galaxy deploy events.
|
||||
|
||||
On subscribe the gateway emits the current cached state and then one
|
||||
event per new deploy time. ``sequence`` is monotonic per server start;
|
||||
gaps mean events were dropped from the per-subscriber buffer. When
|
||||
``last_seen_deploy_time`` is supplied and matches the current cached
|
||||
deploy time the bootstrap event is suppressed.
|
||||
"""
|
||||
|
||||
request = galaxy_pb.WatchDeployEventsRequest()
|
||||
if last_seen_deploy_time is not None:
|
||||
timestamp = Timestamp()
|
||||
timestamp.FromDatetime(last_seen_deploy_time)
|
||||
request.last_seen_deploy_time.CopyFrom(timestamp)
|
||||
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
try:
|
||||
call = self.raw_stub.WatchDeployEvents(request, **kwargs)
|
||||
except TypeError as error:
|
||||
if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error):
|
||||
raise
|
||||
kwargs.pop("timeout")
|
||||
call = self.raw_stub.WatchDeployEvents(request, **kwargs)
|
||||
|
||||
return _canceling_iterator(call)
|
||||
|
||||
async def _unary(
|
||||
self,
|
||||
operation: str,
|
||||
method: Any,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> Any:
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.call_timeout is not None:
|
||||
kwargs["timeout"] = self.options.call_timeout
|
||||
try:
|
||||
call = method(request, **kwargs)
|
||||
except TypeError as error:
|
||||
if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error):
|
||||
raise
|
||||
kwargs.pop("timeout")
|
||||
call = method(request, **kwargs)
|
||||
try:
|
||||
return await call
|
||||
except asyncio.CancelledError:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
raise
|
||||
except grpc.RpcError as error:
|
||||
raise map_rpc_error(operation, error) from error
|
||||
|
||||
|
||||
async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]:
|
||||
try:
|
||||
async for event in call:
|
||||
yield event
|
||||
except asyncio.CancelledError:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
raise
|
||||
except grpc.RpcError as error:
|
||||
raise map_rpc_error("watch deploy events", error) from error
|
||||
finally:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
Reference in New Issue
Block a user