feat(python): add galaxy-* CLI commands (§4.2)

This commit is contained in:
Joseph Doherty
2026-06-15 09:40:55 -04:00
parent 849f1d2f6d
commit a211faefed
3 changed files with 211 additions and 3 deletions
+75
View File
@@ -211,3 +211,78 @@ def test_batch_continues_after_error_line() -> None:
# Second block: successful version JSON.
version_payload = json.loads(blocks[1].strip())
assert version_payload["version"] == __version__
class _FakeGalaxyClient:
"""Minimal async-context-manager fake satisfying the galaxy command bodies."""
def __init__(self, *, ok: bool = True, objects=None) -> None:
self._ok = ok
self._objects = objects or []
async def __aenter__(self) -> "_FakeGalaxyClient":
return self
async def __aexit__(self, *_exc: object) -> None:
return None
async def test_connection(self) -> bool:
return self._ok
async def discover_hierarchy(self):
return self._objects
def _patch_galaxy_connect(monkeypatch: pytest.MonkeyPatch, fake: _FakeGalaxyClient) -> None:
async def fake_connect(options, **_kwargs):
return fake
monkeypatch.setattr(commands_module.GalaxyRepositoryClient, "connect", fake_connect)
def test_galaxy_test_connection_emits_ok(monkeypatch: pytest.MonkeyPatch) -> None:
_patch_galaxy_connect(monkeypatch, _FakeGalaxyClient(ok=True))
result = CliRunner().invoke(
main,
["galaxy-test-connection", "--plaintext", "--json"],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload == {"command": "galaxy-test-connection", "ok": True}
def test_galaxy_discover_serializes_objects(monkeypatch: pytest.MonkeyPatch) -> None:
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
objects = [
galaxy_pb.GalaxyObject(gobject_id=7, tag_name="Area001", contained_name="Area001"),
galaxy_pb.GalaxyObject(gobject_id=8, tag_name="Pump001", contained_name="Pump001"),
]
_patch_galaxy_connect(monkeypatch, _FakeGalaxyClient(objects=objects))
result = CliRunner().invoke(
main,
["galaxy-discover", "--plaintext", "--json"],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["command"] == "galaxy-discover"
assert len(payload["objects"]) == 2
assert payload["objects"][0]["tagName"] == "Area001"
assert payload["objects"][1]["gobjectId"] == 8
def test_galaxy_commands_are_registered() -> None:
runner = CliRunner()
for command in (
"galaxy-test-connection",
"galaxy-last-deploy",
"galaxy-discover",
"galaxy-watch",
):
result = runner.invoke(main, [command, "--help"])
assert result.exit_code == 0, result.output
assert "--endpoint" in result.output