Compare commits

...

2 Commits

Author SHA1 Message Date
Joseph Doherty 3e22285f09 Exercise the alarm subcommands in the client e2e matrix
Add an opt-in alarm phase (-VerifyAlarms) to run-client-e2e-tests.ps1:
each of the five client CLIs runs stream-alarms (asserting at least one
AlarmFeedMessage) and acknowledge-alarm against the gateway's central
alarm monitor. Both RPCs are session-less. -AlarmReference and
-AlarmStreamMax tune the phase; GatewayTesting.md documents it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:47:20 -04:00
Joseph Doherty 120cd0b1b6 Add stream-alarms and acknowledge-alarm to the Python CLI
Brings the Python mxgateway_cli in line with the other four client
CLIs: stream-alarms reads a bounded slice of the gateway's central
alarm feed (--filter-prefix, --max-messages, --timeout);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:47:19 -04:00
4 changed files with 264 additions and 1 deletions
@@ -404,6 +404,40 @@ def stream_events(**kwargs: Any) -> None:
)
@main.command("stream-alarms")
@gateway_options
@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.")
@click.option("--max-messages", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def stream_alarms(**kwargs: Any) -> None:
"""Stream a bounded number of messages from the gateway's central alarm feed."""
_run(
_stream_alarms(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("acknowledge-alarm")
@gateway_options
@click.option("--reference", required=True, help="Alarm full reference to acknowledge.")
@click.option("--comment", default="", help="Acknowledgement comment.")
@click.option("--operator", default="", help="Operator user name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def acknowledge_alarm(**kwargs: Any) -> None:
"""Acknowledge an active MXAccess alarm condition (session-less)."""
_run(
_acknowledge_alarm(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@@ -779,6 +813,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]:
return {"events": [_message_dict(event) for event in events]}
async def _stream_alarms(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
messages = await _collect_alarm_messages(
client.stream_alarms(
pb.StreamAlarmsRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_filter_prefix=kwargs["filter_prefix"],
),
),
max_messages=kwargs["max_messages"],
timeout=kwargs["timeout"],
)
return {"messages": [_message_dict(message) for message in messages]}
async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_full_reference=kwargs["reference"],
comment=kwargs["comment"],
operator_user=kwargs["operator"],
),
)
return {"rawReply": _message_dict(reply)}
async def _write(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
async with await _connect(kwargs) as client:
@@ -936,6 +998,34 @@ async def _collect_events(
return collected
async def _collect_alarm_messages(
messages: Any,
*,
max_messages: int,
timeout: float,
) -> list[pb.AlarmFeedMessage]:
if max_messages > MAX_AGGREGATE_EVENTS:
raise click.BadParameter(
f"must be less than or equal to {MAX_AGGREGATE_EVENTS}",
param_hint="--max-messages",
)
collected: list[pb.AlarmFeedMessage] = []
iterator = messages.__aiter__()
try:
while len(collected) < max_messages:
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
except StopAsyncIteration:
pass
except asyncio.TimeoutError:
pass
finally:
close = getattr(iterator, "aclose", None)
if close is not None:
await close()
return collected
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
normalized = value_type.lower()
if normalized == "bool":
+22
View File
@@ -52,6 +52,28 @@ def test_write_parser_rejects_unknown_value_type() -> None:
assert "unsupported value type" in result.output
def test_stream_alarms_is_registered() -> None:
runner = CliRunner()
result = runner.invoke(main, ["stream-alarms", "--help"])
assert result.exit_code == 0
assert "--filter-prefix" in result.output
assert "--max-messages" in result.output
def test_acknowledge_alarm_requires_reference() -> None:
runner = CliRunner()
result = runner.invoke(
main,
["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"],
)
assert result.exit_code != 0
assert "--reference" in result.output
def test_cli_error_output_redacts_api_key() -> None:
runner = CliRunner()
+14
View File
@@ -293,6 +293,18 @@ path and writes a JSON report under `artifacts/e2e/`:
write command is rejected — e.g. against a gateway whose worker predates
write support (`MxAccessCommandExecutor` returning `InvalidRequest` for
`Write`/`Write2`/`WriteSecured`/`WriteSecured2`).
8. **Alarm feed + acknowledge***opt-in (`-VerifyAlarms`).* Runs after the
stream phase. Exercises the two session-less alarm subcommands against the
gateway's central alarm monitor: `stream-alarms` reads a bounded slice of
the feed (`-AlarmStreamMax`, default 1 — the feed's first message always
arrives immediately, whereas later ones depend on live transitions) and
asserts at least one `AlarmFeedMessage`; `acknowledge-alarm` acknowledges
`-AlarmReference` (default `Galaxy!TestArea.TestMachine_001.TestAlarm001`)
and asserts the RPC round-trips. The native ack outcome is not asserted —
it depends on whether that alarm is currently active.
It is opt-in because it depends on the gateway's central alarm monitor
being enabled (`MxGateway:Alarms:Enabled`) and a live alarm provider.
Each client CLI is driven through one long-lived `batch` process. Every CLI
exposes a `batch` subcommand: a process that reads one command line from stdin,
@@ -329,6 +341,8 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipB
# Write round-trip (opt-in): point at a writable scalar attribute and its
# value type.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyWrite -WriteAttribute TestChangingInt -WriteType int32
# Alarm feed + acknowledge (opt-in): needs MxGateway:Alarms:Enabled on the gateway.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyAlarms -AlarmReference "Galaxy!TestArea.TestMachine_001.TestAlarm001"
# Auth rejection: also assert an insufficient-scope key is denied.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY
# Run all five clients concurrently as isolated child processes.
+138 -1
View File
@@ -7,7 +7,9 @@ Drives the .NET, Go, Rust, Python, and Java client CLIs against a running
gateway + worker. For each language the script exercises session open/close,
register, bulk subscribe/unsubscribe, per-tag add-item/advise, event
streaming, a write round-trip with value assertion, error-path (parity)
checks, and API-key auth rejection.
checks, and API-key auth rejection. With -VerifyAlarms it also exercises the
session-less stream-alarms and acknowledge-alarm subcommands against the
gateway's central alarm monitor.
Each client CLI is driven through one long-lived `batch` process: the harness
writes one command line to its stdin and reads the JSON result back, so the
@@ -60,6 +62,18 @@ param(
[string]$WriteType = "int32",
[int]$WriteValueBase = 424200,
[int]$WriteEchoMaxEvents = 200,
# Alarm feed + acknowledge coverage. Opt-in because it depends on the
# gateway's central alarm monitor being enabled (MxGateway:Alarms:Enabled)
# and a live alarm provider: stream-alarms reads the monitor's snapshot and
# acknowledge-alarm acknowledges -AlarmReference. Both RPCs are session-less
# — they exercise the gateway's always-on monitor, not a client session.
[switch]$VerifyAlarms,
[string]$AlarmReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001",
# Messages to read from the central alarm feed. 1 is enough to confirm the
# subcommand round-trips: the feed's first message (an active-alarm
# snapshot, or snapshot-complete when no alarms are active) always arrives
# immediately, whereas later messages depend on live alarm transitions.
[int]$AlarmStreamMax = 1,
# Error-path (parity) checks.
[switch]$SkipParity,
# API-key auth rejection checks.
@@ -118,6 +132,10 @@ if ($WriteEchoMaxEvents -lt 1) {
throw "WriteEchoMaxEvents must be greater than zero."
}
if ($AlarmStreamMax -lt 1) {
throw "AlarmStreamMax must be greater than zero."
}
foreach ($client in $Clients) {
if ($validClients -notcontains $client) {
throw "Unsupported client '$client'. Supported clients: $($validClients -join ', ')."
@@ -327,6 +345,25 @@ function Get-StreamEvents {
}
}
# Counts the messages in a stream-alarms reply. The CLIs shape the aggregate
# JSON differently: .NET nests them under `alarms`, Rust under `messages` with
# a `messageCount`, Python under `messages`; Go and Java emit one AlarmFeedMessage
# object per line (Read-JsonObject collapses NDJSON into a bare array).
function Get-AlarmMessageCount {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return @($Json.alarms).Count }
"go" { return @($Json).Count }
"rust" { return [int]$Json.messageCount }
"python" { return @($Json.messages).Count }
"java" { return @($Json).Count }
}
}
function Get-PropertyValue {
param(
[object]$Object,
@@ -564,6 +601,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -600,6 +644,13 @@ function Get-ClientCommand {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("-session-id", $Values.sessionId, "-limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("-limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("-filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("-reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("-comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("-operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("-session-id", $Values.sessionId)
}
@@ -637,6 +688,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -673,6 +731,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-messages", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -712,6 +777,13 @@ function Get-ClientCommand {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$cliArgs += @("--session-id", $Values.sessionId, "--limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$cliArgs += @("--limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $cliArgs += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$cliArgs += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $cliArgs += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $cliArgs += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$cliArgs += @("--session-id", $Values.sessionId)
}
@@ -801,6 +873,36 @@ function Get-DryRunReply {
default { return [pscustomobject]@{ events = $events } }
}
}
"stream-alarms" {
# Synthesize an active-alarm snapshot followed by the
# snapshot-complete sentinel. The reply is shaped per client:
# Go and Java emit one message object per line (Read-JsonObject
# collapses NDJSON to a bare array), Rust aggregates under
# `messages` with a `messageCount`, Python under `messages`, and
# .NET under `alarms`.
$activeAlarm = [pscustomobject]@{
activeAlarm = [pscustomobject]@{
alarmFullReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001"
currentState = "ALARM_CONDITION_STATE_ACTIVE"
severity = 500
}
}
$snapshotComplete = [pscustomobject]@{ snapshotComplete = $true }
$messages = @($activeAlarm, $snapshotComplete)
switch ($Client) {
"go" { return ,$messages }
"java" { return ,$messages }
"rust" { return [pscustomobject]@{ messageCount = $messages.Count; messages = $messages } }
"dotnet" { return [pscustomobject]@{ alarms = $messages } }
default { return [pscustomobject]@{ messages = $messages } }
}
}
"acknowledge-alarm" {
return [pscustomobject]@{
rawReply = [pscustomobject]@{ hresult = 0; diagnosticMessage = "dry-run ack" }
reply = [pscustomobject]@{ hresult = 0 }
}
}
default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } }
}
}
@@ -1053,6 +1155,7 @@ function Invoke-ClientFlow {
addedItems = @()
eventCount = 0
write = $null
alarms = $null
parity = @()
auth = @()
closed = $false
@@ -1285,6 +1388,35 @@ function Invoke-ClientFlow {
}
}
# --- Alarm feed + acknowledge -------------------------------------
# Session-less RPCs against the gateway's always-on central alarm
# monitor. Opt-in (-VerifyAlarms) because it needs the monitor enabled
# (MxGateway:Alarms:Enabled) and a live alarm provider.
if ($VerifyAlarms) {
$alarmStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-alarms" -Values @{
maxEvents = $AlarmStreamMax
}
$alarmMessageCount = Get-AlarmMessageCount -Client $Client -Json $alarmStreamJson
if ($alarmMessageCount -lt 1) {
throw "The $Client stream-alarms command returned no alarm-feed messages."
}
# The acknowledge round-trips against the central monitor; the
# native ack outcome depends on whether the referenced alarm is
# currently active, so only the RPC's success is asserted here.
Invoke-ClientOperation -Client $Client -Operation "acknowledge-alarm" -Values @{
alarmReference = $AlarmReference
comment = "e2e-matrix"
operator = "mxgw-e2e"
} | Out-Null
$clientResult.alarms = [ordered]@{
streamMessageCount = $alarmMessageCount
acknowledgeReference = $AlarmReference
acknowledged = $true
}
}
# --- Error-path (parity) checks -----------------------------------
# MXAccess parity: an invalid item handle and an unknown session must
# both be rejected rather than silently succeeding.
@@ -1391,6 +1523,8 @@ function Get-ChildArgumentList {
"-WriteType", $WriteType,
"-WriteValueBase", "$WriteValueBase",
"-WriteEchoMaxEvents", "$WriteEchoMaxEvents",
"-AlarmReference", $AlarmReference,
"-AlarmStreamMax", "$AlarmStreamMax",
"-ReportPath", $ChildReportPath,
"-EmitReport"
)
@@ -1400,6 +1534,7 @@ function Get-ChildArgumentList {
if ($SkipStream) { $childArgs += "-SkipStream" }
if ($SkipBulk) { $childArgs += "-SkipBulk" }
if ($VerifyWrite) { $childArgs += "-VerifyWrite" }
if ($VerifyAlarms) { $childArgs += "-VerifyAlarms" }
if ($SkipParity) { $childArgs += "-SkipParity" }
if ($SkipAuth) { $childArgs += "-SkipAuth" }
if ($DryRun) { $childArgs += "-DryRun" }
@@ -1479,6 +1614,7 @@ if ($Parallel -and $Clients.Count -gt 1) {
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute
@@ -1540,6 +1676,7 @@ $run = [ordered]@{
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute