From 8a0c59d7e8dfb1ab6992934b90081ddf6cee54d4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 06:46:03 -0400 Subject: [PATCH] Java client: port stream-alarms and acknowledge-alarm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the session-less alarm CLI subcommands to the Java CLI. stream-alarms attaches to the gateway's central alarm feed (--filter-prefix, --limit, --json — NDJSON, one AlarmFeedMessage per line); acknowledge-alarm is a unary ack (--reference required, --comment, --operator). streamAlarms joins queryActiveAlarms on MxGatewayClient and uses a new MxGatewayAlarmFeedSubscription cancellable handle. Batch dispatch re-enters the picocli command line per stdin line, so registering the two new subcommands suffices. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zb/mom/ww/mxgateway/cli/MxGatewayCli.java | 166 ++++++++++++++++++ .../ww/mxgateway/cli/MxGatewayCliTests.java | 110 ++++++++++++ .../MxGatewayAlarmFeedSubscription.java | 67 +++++++ .../ww/mxgateway/client/MxGatewayClient.java | 23 +++ 4 files changed, 366 insertions(+) create mode 100644 clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java index b64daf2..310bf38 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java @@ -3,6 +3,7 @@ package com.zb.mom.ww.mxgateway.cli; import com.zb.mom.ww.mxgateway.client.DeployEventStream; import com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient; import com.zb.mom.ww.mxgateway.client.MxEventStream; +import com.zb.mom.ww.mxgateway.client.MxGatewayAlarmFeedSubscription; import com.zb.mom.ww.mxgateway.client.MxGatewayClient; import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions; import com.zb.mom.ww.mxgateway.client.MxGatewayClientVersion; @@ -14,6 +15,7 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyAttribute; import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject; import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; +import io.grpc.stub.StreamObserver; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; @@ -28,14 +30,22 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; +import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; @@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory)); commandLine.addSubcommand("write", new WriteCommand(clientFactory)); commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory)); + commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory)); + commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory)); commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory)); commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand()); commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand()); @@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable { /** Sentinel written to stdout after every command result in batch mode. */ static final String BATCH_EOR = "__MXGW_BATCH_EOR__"; + /** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */ + private static final Object ALARM_FEED_END = new Object(); + /** * Reads one CLI invocation per stdin line, executes each via a fresh * {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after @@ -1041,6 +1056,115 @@ public final class MxGatewayCli implements Callable { } } + @Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.") + static final class StreamAlarmsCommand extends GatewayCommand { + @Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.") + String filterPrefix = ""; + + @Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.") + int limit; + + StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + // The async alarm feed delivers on a background gRPC thread; buffer + // messages in a bounded queue and drain them on this thread so the + // --limit termination mirrors stream-events. 1024 absorbs the + // gateway's initial active-alarm snapshot burst. + BlockingQueue queue = new ArrayBlockingQueue<>(1024); + StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder() + .setAlarmFilterPrefix(filterPrefix) + .build(); + MxGatewayAlarmFeedSubscription subscription = + client.streamAlarms(request, new StreamObserver<>() { + @Override + public void onNext(AlarmFeedMessage value) { + queue.offer(value); + } + + @Override + public void onError(Throwable error) { + queue.offer(error); + } + + @Override + public void onCompleted() { + queue.offer(ALARM_FEED_END); + } + }); + try { + int count = 0; + while (true) { + Object item = queue.take(); + if (item == ALARM_FEED_END) { + break; + } + if (item instanceof Throwable error) { + throw new IllegalStateException( + "gateway stream alarms failed: " + error.getMessage(), error); + } + AlarmFeedMessage message = (AlarmFeedMessage) item; + if (json) { + client.out().println(protoJson(message)); + } else { + client.out().println(formatAlarmFeedMessage(message)); + } + client.out().flush(); + count++; + if (limit > 0 && count >= limit) { + subscription.cancel(); + break; + } + } + } catch (InterruptedException error) { + Thread.currentThread().interrupt(); + subscription.cancel(); + } finally { + subscription.cancel(); + } + } + return 0; + } + } + + @Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.") + static final class AcknowledgeAlarmCommand extends GatewayCommand { + @Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.") + String reference; + + @Option(names = "--comment", description = "Operator acknowledge comment.") + String comment = ""; + + @Option(names = "--operator", description = "Operator user performing the acknowledge.") + String operator = ""; + + AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder() + .setAlarmFullReference(reference) + .setComment(comment) + .setOperatorUser(operator) + .build()); + writeOutput( + "acknowledge-alarm", + common, + json, + reply, + () -> Integer.toString(reply.getHresult())); + } + return 0; + } + } + @Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.") static final class SmokeCommand extends GatewayCommand { @Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.") @@ -1160,6 +1284,11 @@ public final class MxGatewayCli implements Callable { MxGatewayCliSession session(String sessionId); + AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request); + + MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer); + @Override void close(); } @@ -1232,6 +1361,17 @@ public final class MxGatewayCli implements Callable { return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId)); } + @Override + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + return client.acknowledgeAlarm(request); + } + + @Override + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + return client.streamAlarms(request, observer); + } + @Override public void close() { client.close(); @@ -1407,6 +1547,32 @@ public final class MxGatewayCli implements Callable { return values; } + /** + * Renders one {@link AlarmFeedMessage} in the CLI's plain-text output + * style, distinguishing the active-alarm snapshot, snapshot-complete + * sentinel, and transition cases of the message's {@code payload} oneof. + */ + private static String formatAlarmFeedMessage(AlarmFeedMessage message) { + return switch (message.getPayloadCase()) { + case ACTIVE_ALARM -> { + ActiveAlarmSnapshot alarm = message.getActiveAlarm(); + yield String.format( + "active-alarm %s state=%s severity=%d", + alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity()); + } + case SNAPSHOT_COMPLETE -> "snapshot-complete"; + case TRANSITION -> { + OnAlarmTransitionEvent transition = message.getTransition(); + yield String.format( + "transition %s kind=%s severity=%d", + transition.getAlarmFullReference(), + transition.getTransitionKind().name(), + transition.getSeverity()); + } + case PAYLOAD_NOT_SET -> "unknown"; + }; + } + private static MxValue parseValue(String type, String text) { return switch (type) { case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text)); diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java index 01e7b3c..39e4cad 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java @@ -4,6 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.zb.mom.ww.mxgateway.client.MxGatewayAlarmFeedSubscription; +import io.grpc.stub.StreamObserver; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.PrintWriter; @@ -12,7 +14,13 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; +import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; +import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind; import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; @@ -21,12 +29,14 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; +import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; import mxaccess_gateway.v1.MxaccessGateway.RegisterReply; import mxaccess_gateway.v1.MxaccessGateway.SessionState; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; @@ -151,6 +161,70 @@ final class MxGatewayCliTests { assertTrue(run.output().contains("\"wasSuccessful\":true")); } + // ---- stream-alarms / acknowledge-alarm subcommands ---- + + @Test + void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01"); + + assertEquals(0, run.exitCode()); + assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix()); + String out = run.output(); + assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out); + assertTrue(out.contains("snapshot-complete"), out); + assertTrue(out.contains("transition Tank01.Level.HiHi"), out); + } + + @Test + void streamAlarmsCommandHonoursLimit() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--limit", "1"); + + assertEquals(0, run.exitCode()); + long lines = run.output().lines().filter(line -> !line.isBlank()).count(); + assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output()); + } + + @Test + void streamAlarmsCommandPrintsJson() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--json"); + + assertEquals(0, run.exitCode()); + assertTrue(run.output().contains("\"activeAlarm\""), run.output()); + assertTrue(run.output().contains("\"snapshotComplete\""), run.output()); + } + + @Test + void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() { + FakeClientFactory factory = new FakeClientFactory(); + CliRun run = execute( + factory, + "acknowledge-alarm", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "checked", + "--operator", + "operator1", + "--json"); + + assertEquals(0, run.exitCode()); + assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference()); + assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment()); + assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser()); + assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output()); + } + + @Test + void acknowledgeAlarmCommandRequiresReference() { + CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked"); + + assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference"); + assertTrue(run.errors().contains("--reference"), run.errors()); + } + @Test void batchCommandExecutesVersionAndEmitsEorMarker() { CliRun run = executeBatch(new FakeClientFactory(), "version --json\n"); @@ -220,6 +294,8 @@ final class MxGatewayCliTests { private final PrintWriter out; private final FakeSession session = new FakeSession(); private boolean closeCalled; + private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest; + private StreamAlarmsRequest lastStreamAlarmsRequest; private FakeClient(PrintWriter out) { this.out = out; @@ -253,6 +329,40 @@ final class MxGatewayCliTests { return session; } + @Override + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + lastAcknowledgeAlarmRequest = request; + return AcknowledgeAlarmReply.newBuilder() + .setCorrelationId(request.getClientCorrelationId()) + .setProtocolStatus(ok()) + .setHresult(0) + .build(); + } + + @Override + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + lastStreamAlarmsRequest = request; + // Replay a deterministic active-alarm snapshot, snapshot-complete + // sentinel, transition, then complete the feed so the CLI command + // drains a bounded stream without contacting a live gateway. + observer.onNext(AlarmFeedMessage.newBuilder() + .setActiveAlarm(ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE) + .setSeverity(700)) + .build()); + observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build()); + observer.onNext(AlarmFeedMessage.newBuilder() + .setTransition(OnAlarmTransitionEvent.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE) + .setSeverity(700)) + .build()); + observer.onCompleted(); + return new MxGatewayAlarmFeedSubscription(); + } + @Override public void close() { } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java new file mode 100644 index 0000000..5a582ae --- /dev/null +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java @@ -0,0 +1,67 @@ +package com.zb.mom.ww.mxgateway.client; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; + +/** + * Cancellable handle returned by {@code streamAlarms}. + * + *

Wraps a caller-supplied {@link StreamObserver} and exposes a + * {@link #cancel()} entry point that aborts the underlying gRPC call. The + * subscription also implements {@link AutoCloseable} so it can participate in + * try-with-resources blocks. + */ +public final class MxGatewayAlarmFeedSubscription implements AutoCloseable { + private final AtomicReference> requestStream = new AtomicReference<>(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + + ClientResponseObserver wrap(StreamObserver observer) { + return new ClientResponseObserver<>() { + @Override + public void beforeStart(ClientCallStreamObserver stream) { + requestStream.set(stream); + if (cancelled.get()) { + stream.cancel("client cancelled alarm feed", null); + } + } + + @Override + public void onNext(AlarmFeedMessage value) { + observer.onNext(value); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } + + /** + * Cancels the underlying gRPC call. Safe to invoke before the call has + * started; cancellation is recorded and applied as soon as the stream + * attaches. + */ + public void cancel() { + cancelled.set(true); + ClientCallStreamObserver stream = requestStream.get(); + if (stream != null) { + stream.cancel("client cancelled alarm feed", null); + } + } + + @Override + public void close() { + cancel(); + } +} diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayClient.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayClient.java index 6a256ac..0aa4c35 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayClient.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayClient.java @@ -18,6 +18,7 @@ import mxaccess_gateway.v1.MxAccessGatewayGrpc; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; @@ -27,6 +28,7 @@ import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; /** @@ -320,6 +322,27 @@ public final class MxGatewayClient implements AutoCloseable { return subscription; } + /** + * Attaches to the gateway's central alarm feed. The stream opens with one + * {@code AlarmFeedMessage} per currently-active alarm (the ConditionRefresh + * snapshot), then a single {@code snapshot_complete}, then a + * {@code transition} for every subsequent raise / acknowledge / clear. + * + *

Served by the gateway's always-on alarm monitor — no worker session is + * opened — so any number of clients may attach. + * + * @param request the {@code StreamAlarmsRequest}, optionally scoped by + * alarm-reference prefix + * @param observer caller-supplied observer that receives feed messages and completion + * @return a cancellable subscription handle + */ + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + MxGatewayAlarmFeedSubscription subscription = new MxGatewayAlarmFeedSubscription(); + withStreamDeadline(rawAsyncStub()).streamAlarms(request, subscription.wrap(observer)); + return subscription; + } + @Override public void close() { if (ownedChannel != null) {