fix(python): UTC-normalize galaxy-last-deploy output, add deploy-event collector, help text, test
This commit is contained in:
@@ -216,9 +216,11 @@ def test_batch_continues_after_error_line() -> None:
|
||||
class _FakeGalaxyClient:
|
||||
"""Minimal async-context-manager fake satisfying the galaxy command bodies."""
|
||||
|
||||
def __init__(self, *, ok: bool = True, objects=None) -> None:
|
||||
def __init__(self, *, ok: bool = True, objects=None, last_deploy=None, events=None) -> None:
|
||||
self._ok = ok
|
||||
self._objects = objects or []
|
||||
self._last_deploy = last_deploy
|
||||
self._events = events or []
|
||||
|
||||
async def __aenter__(self) -> "_FakeGalaxyClient":
|
||||
return self
|
||||
@@ -232,6 +234,19 @@ class _FakeGalaxyClient:
|
||||
async def discover_hierarchy(self):
|
||||
return self._objects
|
||||
|
||||
async def get_last_deploy_time(self):
|
||||
# Mirrors galaxy.py: protobuf ToDatetime() yields a timezone-NAIVE UTC datetime.
|
||||
return self._last_deploy
|
||||
|
||||
def watch_deploy_events(self, _last_seen_deploy_time=None):
|
||||
events = self._events
|
||||
|
||||
async def _iter():
|
||||
for event in events:
|
||||
yield event
|
||||
|
||||
return _iter()
|
||||
|
||||
|
||||
def _patch_galaxy_connect(monkeypatch: pytest.MonkeyPatch, fake: _FakeGalaxyClient) -> None:
|
||||
async def fake_connect(options, **_kwargs):
|
||||
@@ -275,6 +290,42 @@ def test_galaxy_discover_serializes_objects(monkeypatch: pytest.MonkeyPatch) ->
|
||||
assert payload["objects"][1]["gobjectId"] == 8
|
||||
|
||||
|
||||
def test_galaxy_last_deploy_emits_utc_iso(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""The naive-UTC deploy time from the library must be emitted as unambiguous UTC ISO-8601."""
|
||||
from datetime import datetime
|
||||
|
||||
naive_utc = datetime(2025, 6, 15, 12, 0, 0) # noqa: DTZ001 -- mirrors protobuf ToDatetime()
|
||||
_patch_galaxy_connect(monkeypatch, _FakeGalaxyClient(last_deploy=naive_utc))
|
||||
|
||||
result = CliRunner().invoke(
|
||||
main,
|
||||
["galaxy-last-deploy", "--plaintext", "--json"],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
payload = json.loads(result.output)
|
||||
assert payload["command"] == "galaxy-last-deploy"
|
||||
assert payload["present"] is True
|
||||
assert payload["timeOfLastDeploy"].endswith(("+00:00", "Z"))
|
||||
|
||||
|
||||
def test_galaxy_watch_serializes_deploy_events(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
|
||||
|
||||
events = [galaxy_pb.DeployEvent(sequence=1)]
|
||||
_patch_galaxy_connect(monkeypatch, _FakeGalaxyClient(events=events))
|
||||
|
||||
result = CliRunner().invoke(
|
||||
main,
|
||||
["galaxy-watch", "--plaintext", "--max-events", "1", "--json"],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
payload = json.loads(result.output)
|
||||
assert payload["command"] == "galaxy-watch"
|
||||
assert len(payload["events"]) == 1
|
||||
|
||||
|
||||
def test_galaxy_commands_are_registered() -> None:
|
||||
runner = CliRunner()
|
||||
for command in (
|
||||
|
||||
Reference in New Issue
Block a user