235 lines
7.2 KiB
Python
235 lines
7.2 KiB
Python
"""MXAccess value conversion helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from google.protobuf.timestamp_pb2 import Timestamp
|
|
|
|
from .generated import mxaccess_gateway_pb2 as pb
|
|
|
|
|
|
MxValueInput = bool | int | float | str | datetime | bytes | None | Sequence[Any]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MxValueView:
|
|
"""Typed projection of a raw `MxValue` protobuf message."""
|
|
|
|
value: Any
|
|
kind: str
|
|
raw: pb.MxValue
|
|
|
|
|
|
def to_mx_value(value: MxValueInput, *, data_type: str | None = None) -> pb.MxValue:
|
|
"""Convert a Python value into the public protobuf `MxValue` union."""
|
|
|
|
if isinstance(value, pb.MxValue):
|
|
return value
|
|
|
|
if value is None:
|
|
return pb.MxValue(
|
|
data_type=pb.MX_DATA_TYPE_NO_DATA,
|
|
variant_type="VT_EMPTY",
|
|
is_null=True,
|
|
raw_data_type=pb.MX_DATA_TYPE_NO_DATA,
|
|
)
|
|
|
|
if isinstance(value, bool):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_BOOLEAN),
|
|
variant_type="VT_BOOL",
|
|
bool_value=value,
|
|
)
|
|
|
|
if isinstance(value, int):
|
|
if -(2**31) <= value <= (2**31 - 1):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
|
|
variant_type="VT_I4",
|
|
int32_value=value,
|
|
)
|
|
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
|
|
variant_type="VT_I8",
|
|
int64_value=value,
|
|
)
|
|
|
|
if isinstance(value, float):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_DOUBLE),
|
|
variant_type="VT_R8",
|
|
double_value=value,
|
|
)
|
|
|
|
if isinstance(value, str):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_STRING),
|
|
variant_type="VT_BSTR",
|
|
string_value=value,
|
|
)
|
|
|
|
if isinstance(value, datetime):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_TIME),
|
|
variant_type="VT_DATE",
|
|
timestamp_value=_timestamp_from_datetime(value),
|
|
)
|
|
|
|
if isinstance(value, bytes):
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
|
|
variant_type="VT_RECORD",
|
|
raw_value=value,
|
|
)
|
|
|
|
if isinstance(value, Sequence):
|
|
return _sequence_to_mx_value(value, data_type=data_type)
|
|
|
|
raise TypeError(f"unsupported MxValue input type: {type(value).__name__}")
|
|
|
|
|
|
def from_mx_value(value: pb.MxValue) -> MxValueView:
|
|
"""Project a protobuf `MxValue` into an idiomatic Python value."""
|
|
|
|
kind = value.WhichOneof("kind")
|
|
if kind is None:
|
|
return MxValueView(None, "none", value)
|
|
|
|
if kind == "timestamp_value":
|
|
return MxValueView(
|
|
value.timestamp_value.ToDatetime().replace(tzinfo=timezone.utc),
|
|
kind,
|
|
value,
|
|
)
|
|
|
|
if kind == "array_value":
|
|
return MxValueView(from_mx_array(value.array_value), kind, value)
|
|
|
|
return MxValueView(getattr(value, kind), kind, value)
|
|
|
|
|
|
def from_mx_array(array: pb.MxArray) -> list[Any]:
|
|
"""Project a protobuf `MxArray` into a Python list."""
|
|
|
|
kind = array.WhichOneof("values")
|
|
if kind is None:
|
|
return []
|
|
|
|
values = list(getattr(array, kind).values)
|
|
if kind == "timestamp_values":
|
|
return [
|
|
timestamp.ToDatetime().replace(tzinfo=timezone.utc)
|
|
for timestamp in values
|
|
]
|
|
|
|
return values
|
|
|
|
|
|
def _sequence_to_mx_value(
|
|
values: Sequence[Any],
|
|
*,
|
|
data_type: str | None,
|
|
) -> pb.MxValue:
|
|
sequence = list(values)
|
|
if not sequence:
|
|
return pb.MxValue(
|
|
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
|
|
array_value=pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
|
|
dimensions=[0],
|
|
),
|
|
)
|
|
|
|
first = sequence[0]
|
|
dimensions = [len(sequence)]
|
|
|
|
if all(isinstance(item, bool) for item in sequence):
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_BOOLEAN,
|
|
variant_type="VT_ARRAY|VT_BOOL",
|
|
dimensions=dimensions,
|
|
bool_values=pb.BoolArray(values=sequence),
|
|
)
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_BOOLEAN, array_value=array)
|
|
|
|
if all(isinstance(item, int) and not isinstance(item, bool) for item in sequence):
|
|
use_int32 = all(-(2**31) <= item <= (2**31 - 1) for item in sequence)
|
|
if use_int32:
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_INTEGER,
|
|
variant_type="VT_ARRAY|VT_I4",
|
|
dimensions=dimensions,
|
|
int32_values=pb.Int32Array(values=sequence),
|
|
)
|
|
else:
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_INTEGER,
|
|
variant_type="VT_ARRAY|VT_I8",
|
|
dimensions=dimensions,
|
|
int64_values=pb.Int64Array(values=sequence),
|
|
)
|
|
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_INTEGER, array_value=array)
|
|
|
|
if all(isinstance(item, float) for item in sequence):
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_DOUBLE,
|
|
variant_type="VT_ARRAY|VT_R8",
|
|
dimensions=dimensions,
|
|
double_values=pb.DoubleArray(values=sequence),
|
|
)
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_DOUBLE, array_value=array)
|
|
|
|
if all(isinstance(item, str) for item in sequence):
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_STRING,
|
|
variant_type="VT_ARRAY|VT_BSTR",
|
|
dimensions=dimensions,
|
|
string_values=pb.StringArray(values=sequence),
|
|
)
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_STRING, array_value=array)
|
|
|
|
if all(isinstance(item, datetime) for item in sequence):
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_TIME,
|
|
variant_type="VT_ARRAY|VT_DATE",
|
|
dimensions=dimensions,
|
|
timestamp_values=pb.TimestampArray(
|
|
values=[_timestamp_from_datetime(item) for item in sequence],
|
|
),
|
|
)
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_TIME, array_value=array)
|
|
|
|
if all(isinstance(item, bytes) for item in sequence):
|
|
array = pb.MxArray(
|
|
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
|
|
variant_type="VT_ARRAY|VT_VARIANT",
|
|
dimensions=dimensions,
|
|
raw_values=pb.RawArray(values=sequence),
|
|
)
|
|
return pb.MxValue(data_type=pb.MX_DATA_TYPE_UNKNOWN, array_value=array)
|
|
|
|
raise TypeError(
|
|
"MxValue array inputs must use one supported element type; "
|
|
f"first element was {type(first).__name__}"
|
|
)
|
|
|
|
|
|
def _timestamp_from_datetime(value: datetime) -> Timestamp:
|
|
timestamp = Timestamp()
|
|
if value.tzinfo is None:
|
|
value = value.replace(tzinfo=timezone.utc)
|
|
timestamp.FromDatetime(value.astimezone(timezone.utc))
|
|
return timestamp
|
|
|
|
|
|
def _data_type(name: str | None, default: int) -> int:
|
|
if name is None:
|
|
return default
|
|
return pb.MxDataType.Value(name)
|