Add stream-alarms and acknowledge-alarm to the Java CLI

stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json, --filter-prefix); acknowledge-alarm
is a unary session-less ack (--reference required, --comment,
--operator). Both route through new session-less methods on the CLI
client abstraction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 19:01:57 -04:00
parent 1d3c8edb44
commit 7dec9b30f5
2 changed files with 276 additions and 0 deletions
@@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.DeployEventStream;
import com.dohertylan.mxgateway.client.GalaxyRepositoryClient;
import com.dohertylan.mxgateway.client.MxEventStream;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import com.dohertylan.mxgateway.client.MxGatewayClient;
import com.dohertylan.mxgateway.client.MxGatewayClientOptions;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
@@ -28,14 +29,23 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import io.grpc.stub.StreamObserver;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
@@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory));
commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand());
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
@@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable<Integer> {
/** Sentinel written to stdout after every command result in batch mode. */
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
private static final Object ALARM_FEED_END = new Object();
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
@@ -1155,6 +1170,115 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
@Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.")
static final class StreamAlarmsCommand extends GatewayCommand {
@Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.")
String filterPrefix = "";
@Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.")
int limit;
StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
// The async alarm feed delivers on a background gRPC thread; buffer
// messages in a bounded queue and drain them on this thread so the
// --limit termination mirrors stream-events. 1024 absorbs the
// gateway's initial active-alarm snapshot burst.
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix(filterPrefix)
.build();
MxGatewayAlarmFeedSubscription subscription =
client.streamAlarms(request, new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
queue.offer(value);
}
@Override
public void onError(Throwable error) {
queue.offer(error);
}
@Override
public void onCompleted() {
queue.offer(ALARM_FEED_END);
}
});
try {
int count = 0;
while (true) {
Object item = queue.take();
if (item == ALARM_FEED_END) {
break;
}
if (item instanceof Throwable error) {
throw new IllegalStateException(
"gateway stream alarms failed: " + error.getMessage(), error);
}
AlarmFeedMessage message = (AlarmFeedMessage) item;
if (json) {
client.out().println(protoJson(message));
} else {
client.out().println(formatAlarmFeedMessage(message));
}
client.out().flush();
count++;
if (limit > 0 && count >= limit) {
subscription.cancel();
break;
}
}
} catch (InterruptedException error) {
Thread.currentThread().interrupt();
subscription.cancel();
} finally {
subscription.cancel();
}
}
return 0;
}
}
@Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.")
static final class AcknowledgeAlarmCommand extends GatewayCommand {
@Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.")
String reference;
@Option(names = "--comment", description = "Operator acknowledge comment.")
String comment = "";
@Option(names = "--operator", description = "Operator user performing the acknowledge.")
String operator = "";
AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
.setAlarmFullReference(reference)
.setComment(comment)
.setOperatorUser(operator)
.build());
writeOutput(
"acknowledge-alarm",
common,
json,
reply,
() -> Integer.toString(reply.getHresult()));
}
return 0;
}
}
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
static final class SmokeCommand extends GatewayCommand {
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
@@ -1329,6 +1453,11 @@ public final class MxGatewayCli implements Callable<Integer> {
MxGatewayCliSession session(String sessionId);
AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request);
MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer);
@Override
void close();
}
@@ -1401,6 +1530,17 @@ public final class MxGatewayCli implements Callable<Integer> {
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
return client.acknowledgeAlarm(request);
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
return client.streamAlarms(request, observer);
}
@Override
public void close() {
client.close();
@@ -1576,6 +1716,32 @@ public final class MxGatewayCli implements Callable<Integer> {
return values;
}
/**
* Renders one {@link AlarmFeedMessage} in the CLI's plain-text output
* style, distinguishing the active-alarm snapshot, snapshot-complete
* sentinel, and transition cases of the message's {@code payload} oneof.
*/
private static String formatAlarmFeedMessage(AlarmFeedMessage message) {
return switch (message.getPayloadCase()) {
case ACTIVE_ALARM -> {
ActiveAlarmSnapshot alarm = message.getActiveAlarm();
yield String.format(
"active-alarm %s state=%s severity=%d",
alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity());
}
case SNAPSHOT_COMPLETE -> "snapshot-complete";
case TRANSITION -> {
OnAlarmTransitionEvent transition = message.getTransition();
yield String.format(
"transition %s kind=%s severity=%d",
transition.getAlarmFullReference(),
transition.getTransitionKind().name(),
transition.getSeverity());
}
case PAYLOAD_NOT_SET -> "unknown";
};
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
@@ -8,10 +8,18 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
@@ -20,9 +28,11 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
@@ -389,6 +399,70 @@ final class MxGatewayCliTests {
assertTrue(output.contains("TestMachine_002.TestChangingInt"), output);
}
// ---- stream-alarms / acknowledge-alarm subcommands ----
@Test
void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01");
assertEquals(0, run.exitCode());
assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix());
String out = run.output();
assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out);
assertTrue(out.contains("snapshot-complete"), out);
assertTrue(out.contains("transition Tank01.Level.HiHi"), out);
}
@Test
void streamAlarmsCommandHonoursLimit() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--limit", "1");
assertEquals(0, run.exitCode());
long lines = run.output().lines().filter(line -> !line.isBlank()).count();
assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output());
}
@Test
void streamAlarmsCommandPrintsJson() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"activeAlarm\""), run.output());
assertTrue(run.output().contains("\"snapshotComplete\""), run.output());
}
@Test
void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"acknowledge-alarm",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"checked",
"--operator",
"operator1",
"--json");
assertEquals(0, run.exitCode());
assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment());
assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output());
}
@Test
void acknowledgeAlarmCommandRequiresReference() {
CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked");
assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference");
assertTrue(run.errors().contains("--reference"), run.errors());
}
// ---- Client.Java-027: batch subcommand ----
@Test
@@ -501,6 +575,8 @@ final class MxGatewayCliTests {
private final PrintWriter out;
private final FakeSession session = new FakeSession();
private boolean closeCalled;
private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest;
private StreamAlarmsRequest lastStreamAlarmsRequest;
private FakeClient(PrintWriter out) {
this.out = out;
@@ -534,6 +610,40 @@ final class MxGatewayCliTests {
return session;
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
lastAcknowledgeAlarmRequest = request;
return AcknowledgeAlarmReply.newBuilder()
.setCorrelationId(request.getClientCorrelationId())
.setProtocolStatus(ok())
.setHresult(0)
.build();
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
lastStreamAlarmsRequest = request;
// Replay a deterministic active-alarm snapshot, snapshot-complete
// sentinel, transition, then complete the feed so the CLI command
// drains a bounded stream without contacting a live gateway.
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
observer.onNext(AlarmFeedMessage.newBuilder()
.setTransition(OnAlarmTransitionEvent.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
.setSeverity(700))
.build());
observer.onCompleted();
return new MxGatewayAlarmFeedSubscription();
}
@Override
public void close() {
}