diff --git a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java index 7d3107f..b8c469d 100644 --- a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java @@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.cli; import com.dohertylan.mxgateway.client.DeployEventStream; import com.dohertylan.mxgateway.client.GalaxyRepositoryClient; import com.dohertylan.mxgateway.client.MxEventStream; +import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription; import com.dohertylan.mxgateway.client.MxGatewayClient; import com.dohertylan.mxgateway.client.MxGatewayClientOptions; import com.dohertylan.mxgateway.client.MxGatewayClientVersion; @@ -28,14 +29,23 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import io.grpc.stub.StreamObserver; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; +import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; @@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory)); commandLine.addSubcommand("write", new WriteCommand(clientFactory)); commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory)); + commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory)); + commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory)); commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory)); commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand()); commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand()); @@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable { /** Sentinel written to stdout after every command result in batch mode. */ static final String BATCH_EOR = "__MXGW_BATCH_EOR__"; + /** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */ + private static final Object ALARM_FEED_END = new Object(); + /** * Reads one CLI invocation per stdin line, executes each via a fresh * {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after @@ -1155,6 +1170,115 @@ public final class MxGatewayCli implements Callable { } } + @Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.") + static final class StreamAlarmsCommand extends GatewayCommand { + @Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.") + String filterPrefix = ""; + + @Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.") + int limit; + + StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + // The async alarm feed delivers on a background gRPC thread; buffer + // messages in a bounded queue and drain them on this thread so the + // --limit termination mirrors stream-events. 1024 absorbs the + // gateway's initial active-alarm snapshot burst. + BlockingQueue queue = new ArrayBlockingQueue<>(1024); + StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder() + .setAlarmFilterPrefix(filterPrefix) + .build(); + MxGatewayAlarmFeedSubscription subscription = + client.streamAlarms(request, new StreamObserver<>() { + @Override + public void onNext(AlarmFeedMessage value) { + queue.offer(value); + } + + @Override + public void onError(Throwable error) { + queue.offer(error); + } + + @Override + public void onCompleted() { + queue.offer(ALARM_FEED_END); + } + }); + try { + int count = 0; + while (true) { + Object item = queue.take(); + if (item == ALARM_FEED_END) { + break; + } + if (item instanceof Throwable error) { + throw new IllegalStateException( + "gateway stream alarms failed: " + error.getMessage(), error); + } + AlarmFeedMessage message = (AlarmFeedMessage) item; + if (json) { + client.out().println(protoJson(message)); + } else { + client.out().println(formatAlarmFeedMessage(message)); + } + client.out().flush(); + count++; + if (limit > 0 && count >= limit) { + subscription.cancel(); + break; + } + } + } catch (InterruptedException error) { + Thread.currentThread().interrupt(); + subscription.cancel(); + } finally { + subscription.cancel(); + } + } + return 0; + } + } + + @Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.") + static final class AcknowledgeAlarmCommand extends GatewayCommand { + @Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.") + String reference; + + @Option(names = "--comment", description = "Operator acknowledge comment.") + String comment = ""; + + @Option(names = "--operator", description = "Operator user performing the acknowledge.") + String operator = ""; + + AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder() + .setAlarmFullReference(reference) + .setComment(comment) + .setOperatorUser(operator) + .build()); + writeOutput( + "acknowledge-alarm", + common, + json, + reply, + () -> Integer.toString(reply.getHresult())); + } + return 0; + } + } + @Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.") static final class SmokeCommand extends GatewayCommand { @Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.") @@ -1329,6 +1453,11 @@ public final class MxGatewayCli implements Callable { MxGatewayCliSession session(String sessionId); + AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request); + + MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer); + @Override void close(); } @@ -1401,6 +1530,17 @@ public final class MxGatewayCli implements Callable { return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId)); } + @Override + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + return client.acknowledgeAlarm(request); + } + + @Override + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + return client.streamAlarms(request, observer); + } + @Override public void close() { client.close(); @@ -1576,6 +1716,32 @@ public final class MxGatewayCli implements Callable { return values; } + /** + * Renders one {@link AlarmFeedMessage} in the CLI's plain-text output + * style, distinguishing the active-alarm snapshot, snapshot-complete + * sentinel, and transition cases of the message's {@code payload} oneof. + */ + private static String formatAlarmFeedMessage(AlarmFeedMessage message) { + return switch (message.getPayloadCase()) { + case ACTIVE_ALARM -> { + ActiveAlarmSnapshot alarm = message.getActiveAlarm(); + yield String.format( + "active-alarm %s state=%s severity=%d", + alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity()); + } + case SNAPSHOT_COMPLETE -> "snapshot-complete"; + case TRANSITION -> { + OnAlarmTransitionEvent transition = message.getTransition(); + yield String.format( + "transition %s kind=%s severity=%d", + transition.getAlarmFullReference(), + transition.getTransitionKind().name(), + transition.getSeverity()); + } + case PAYLOAD_NOT_SET -> "unknown"; + }; + } + private static MxValue parseValue(String type, String text) { return switch (type) { case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text)); diff --git a/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java b/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java index bc3b49f..a06fbe2 100644 --- a/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java @@ -8,10 +8,18 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; +import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription; +import io.grpc.stub.StreamObserver; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; +import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; +import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind; import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; @@ -20,9 +28,11 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; +import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; import mxaccess_gateway.v1.MxaccessGateway.RegisterReply; import mxaccess_gateway.v1.MxaccessGateway.SessionState; @@ -389,6 +399,70 @@ final class MxGatewayCliTests { assertTrue(output.contains("TestMachine_002.TestChangingInt"), output); } + // ---- stream-alarms / acknowledge-alarm subcommands ---- + + @Test + void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01"); + + assertEquals(0, run.exitCode()); + assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix()); + String out = run.output(); + assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out); + assertTrue(out.contains("snapshot-complete"), out); + assertTrue(out.contains("transition Tank01.Level.HiHi"), out); + } + + @Test + void streamAlarmsCommandHonoursLimit() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--limit", "1"); + + assertEquals(0, run.exitCode()); + long lines = run.output().lines().filter(line -> !line.isBlank()).count(); + assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output()); + } + + @Test + void streamAlarmsCommandPrintsJson() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--json"); + + assertEquals(0, run.exitCode()); + assertTrue(run.output().contains("\"activeAlarm\""), run.output()); + assertTrue(run.output().contains("\"snapshotComplete\""), run.output()); + } + + @Test + void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute( + factory, + "acknowledge-alarm", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "checked", + "--operator", + "operator1", + "--json"); + + assertEquals(0, run.exitCode()); + assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference()); + assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment()); + assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser()); + assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output()); + } + + @Test + void acknowledgeAlarmCommandRequiresReference() { + CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked"); + + assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference"); + assertTrue(run.errors().contains("--reference"), run.errors()); + } + // ---- Client.Java-027: batch subcommand ---- @Test @@ -501,6 +575,8 @@ final class MxGatewayCliTests { private final PrintWriter out; private final FakeSession session = new FakeSession(); private boolean closeCalled; + private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest; + private StreamAlarmsRequest lastStreamAlarmsRequest; private FakeClient(PrintWriter out) { this.out = out; @@ -534,6 +610,40 @@ final class MxGatewayCliTests { return session; } + @Override + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + lastAcknowledgeAlarmRequest = request; + return AcknowledgeAlarmReply.newBuilder() + .setCorrelationId(request.getClientCorrelationId()) + .setProtocolStatus(ok()) + .setHresult(0) + .build(); + } + + @Override + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + lastStreamAlarmsRequest = request; + // Replay a deterministic active-alarm snapshot, snapshot-complete + // sentinel, transition, then complete the feed so the CLI command + // drains a bounded stream without contacting a live gateway. + observer.onNext(AlarmFeedMessage.newBuilder() + .setActiveAlarm(ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE) + .setSeverity(700)) + .build()); + observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build()); + observer.onNext(AlarmFeedMessage.newBuilder() + .setTransition(OnAlarmTransitionEvent.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE) + .setSeverity(700)) + .build()); + observer.onCompleted(); + return new MxGatewayAlarmFeedSubscription(); + } + @Override public void close() { }