95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
"""Client connection options for the async Python wrapper."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
import grpc
|
|
|
|
from .auth import REDACTED, ApiKey
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ClientOptions:
|
|
"""Connection settings for `GatewayClient.connect`."""
|
|
|
|
endpoint: str
|
|
api_key: str | ApiKey | None = None
|
|
plaintext: bool = False
|
|
ca_file: str | None = None
|
|
server_name_override: str | None = None
|
|
call_timeout: float | None = 30.0
|
|
stream_timeout: float | None = None
|
|
max_grpc_message_bytes: int = 16 * 1024 * 1024
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Validate options; raise `ValueError` for invalid combinations."""
|
|
if not self.endpoint:
|
|
raise ValueError("endpoint must not be empty")
|
|
|
|
if self.plaintext and self.ca_file:
|
|
raise ValueError("ca_file cannot be used with plaintext connections")
|
|
if self.call_timeout is not None and self.call_timeout <= 0:
|
|
raise ValueError("call_timeout must be greater than zero")
|
|
if self.stream_timeout is not None and self.stream_timeout <= 0:
|
|
raise ValueError("stream_timeout must be greater than zero")
|
|
if self.max_grpc_message_bytes <= 0:
|
|
raise ValueError("max_grpc_message_bytes must be greater than zero")
|
|
|
|
def __repr__(self) -> str:
|
|
"""Return a repr that redacts the API key value."""
|
|
api_key = REDACTED if self.api_key else None
|
|
return (
|
|
f"{type(self).__name__}(endpoint={self.endpoint!r}, "
|
|
f"api_key={api_key!r}, plaintext={self.plaintext!r}, "
|
|
f"ca_file={self.ca_file!r}, "
|
|
f"server_name_override={self.server_name_override!r}, "
|
|
f"call_timeout={self.call_timeout!r}, "
|
|
f"stream_timeout={self.stream_timeout!r}, "
|
|
f"max_grpc_message_bytes={self.max_grpc_message_bytes!r})"
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BrowseChildrenOptions:
|
|
"""Filters and shape options for ``GalaxyRepositoryClient.browse``.
|
|
|
|
Mirrors the AND-combined filter set on ``BrowseChildrenRequest`` so a
|
|
single instance can be re-used across an entire lazy browse session
|
|
(the filter set is part of the page-token contract).
|
|
"""
|
|
|
|
category_ids: Sequence[int] = field(default_factory=tuple)
|
|
template_chain_contains: Sequence[str] = field(default_factory=tuple)
|
|
tag_name_glob: str | None = None
|
|
include_attributes: bool | None = None
|
|
alarm_bearing_only: bool = False
|
|
historized_only: bool = False
|
|
|
|
|
|
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
|
|
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
|
|
|
|
channel_options: list[tuple[str, str | int]] = [
|
|
("grpc.max_receive_message_length", options.max_grpc_message_bytes),
|
|
("grpc.max_send_message_length", options.max_grpc_message_bytes),
|
|
]
|
|
if options.server_name_override:
|
|
channel_options.append(("grpc.ssl_target_name_override", options.server_name_override))
|
|
|
|
if options.plaintext:
|
|
return grpc.aio.insecure_channel(options.endpoint, options=channel_options)
|
|
|
|
root_certificates = None
|
|
if options.ca_file:
|
|
root_certificates = Path(options.ca_file).read_bytes()
|
|
|
|
credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
|
|
return grpc.aio.secure_channel(
|
|
options.endpoint,
|
|
credentials,
|
|
options=channel_options,
|
|
)
|