From 730fdc93e06249f053a54ff4116036bf353906a7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 30 Apr 2026 17:01:35 -0400 Subject: [PATCH] clients/java: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tenth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Java SDK. Depends on PR E.1 (regen, merged). - MxGatewayClient.acknowledgeAlarm — blocking unary call, validates protocol status via the existing MxGatewayErrors helper. Wraps RuntimeException through MxGatewayErrors.fromGrpc for typed failure mapping. - MxGatewayClient.acknowledgeAlarmAsync — CompletableFuture variant using the future stub. - MxGatewayClient.queryActiveAlarms — async server-streaming RPC observed via a new MxGatewayActiveAlarmsSubscription handle (parallel to MxGatewayEventSubscription; the existing subscription class is hard-typed to MxEvent so a parallel type was simpler than retrofitting generics). - MxGatewayClientVersion bumps GATEWAY_PROTOCOL_VERSION 2 → 3 to match the .NET contract; CLI version-string assertions updated to match. Java SDK build green via Gradle 9.4.1 (mxgateway-client + mxgateway-cli). 17 tasks, all tests passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mxgateway/cli/MxGatewayCliTests.java | 4 +- .../MxGatewayActiveAlarmsSubscription.java | 67 +++++++++++++++++++ .../mxgateway/client/MxGatewayClient.java | 61 +++++++++++++++++ .../client/MxGatewayClientVersion.java | 2 +- 4 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayActiveAlarmsSubscription.java diff --git a/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java b/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java index 6f047fb..481bea7 100644 --- a/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/mxgateway-cli/src/test/java/com/dohertylan/mxgateway/cli/MxGatewayCliTests.java @@ -32,7 +32,7 @@ final class MxGatewayCliTests { assertEquals(0, run.exitCode()); assertEquals("", run.errors()); assertTrue(run.output().contains("mxgateway-java 0.1.0")); - assertTrue(run.output().contains("gatewayProtocolVersion=1")); + assertTrue(run.output().contains("gatewayProtocolVersion=3")); assertTrue(run.output().contains("workerProtocolVersion=1")); } @@ -42,7 +42,7 @@ final class MxGatewayCliTests { assertEquals(0, run.exitCode()); assertTrue(run.output().contains("\"clientVersion\":\"0.1.0\"")); - assertTrue(run.output().contains("\"gatewayProtocolVersion\":1")); + assertTrue(run.output().contains("\"gatewayProtocolVersion\":3")); } @Test diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayActiveAlarmsSubscription.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayActiveAlarmsSubscription.java new file mode 100644 index 0000000..7a2c832 --- /dev/null +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayActiveAlarmsSubscription.java @@ -0,0 +1,67 @@ +package com.dohertylan.mxgateway.client; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; + +/** + * Cancellable handle returned by {@code queryActiveAlarms}. + * + *

Wraps a caller-supplied {@link StreamObserver} and exposes a + * {@link #cancel()} entry point that aborts the underlying gRPC call. The + * subscription also implements {@link AutoCloseable} so it can participate in + * try-with-resources blocks. + */ +public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable { + private final AtomicReference> requestStream = new AtomicReference<>(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + + ClientResponseObserver wrap(StreamObserver observer) { + return new ClientResponseObserver<>() { + @Override + public void beforeStart(ClientCallStreamObserver stream) { + requestStream.set(stream); + if (cancelled.get()) { + stream.cancel("client cancelled active-alarms query", null); + } + } + + @Override + public void onNext(ActiveAlarmSnapshot value) { + observer.onNext(value); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } + + /** + * Cancels the underlying gRPC call. Safe to invoke before the call has + * started; cancellation is recorded and applied as soon as the stream + * attaches. + */ + public void cancel() { + cancelled.set(true); + ClientCallStreamObserver stream = requestStream.get(); + if (stream != null) { + stream.cancel("client cancelled active-alarms query", null); + } + } + + @Override + public void close() { + cancel(); + } +} diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java index e81e29e..e3599fe 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java @@ -15,6 +15,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import mxaccess_gateway.v1.MxAccessGatewayGrpc; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; @@ -23,6 +26,7 @@ import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; +import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; /** @@ -259,6 +263,63 @@ public final class MxGatewayClient implements AutoCloseable { return subscription; } + /** + * Acknowledges an active MXAccess alarm condition through the gateway. + * + *

The gateway authenticates the request against the API key's + * {@code invoke:alarm-ack} scope and forwards the acknowledge to the + * worker's MXAccess session; the resulting native MxStatus is returned + * in the reply. Acks are idempotent at the MxAccess layer. + * + * @param request the {@code AcknowledgeAlarmRequest} + * @return the raw acknowledge reply + * @throws MxGatewayException on transport or protocol failure + */ + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + try { + AcknowledgeAlarmReply reply = rawBlockingStub().acknowledgeAlarm(request); + MxGatewayErrors.ensureProtocolSuccess("acknowledge alarm", reply.getProtocolStatus(), null); + return reply; + } catch (RuntimeException error) { + if (error instanceof MxGatewayException) { + throw error; + } + throw MxGatewayErrors.fromGrpc("acknowledge alarm", error); + } + } + + /** + * Acknowledges an active MXAccess alarm condition asynchronously. + * + * @param request the {@code AcknowledgeAlarmRequest} + * @return a future completed with the raw reply, or completed exceptionally + * with {@link MxGatewayException} on failure + */ + public CompletableFuture acknowledgeAlarmAsync(AcknowledgeAlarmRequest request) { + CompletableFuture future = toCompletable(rawFutureStub().acknowledgeAlarm(request)); + return future.thenApply(reply -> { + MxGatewayErrors.ensureProtocolSuccess("acknowledge alarm", reply.getProtocolStatus(), null); + return reply; + }); + } + + /** + * Streams a snapshot of all alarms currently Active or ActiveAcked — the + * gateway's ConditionRefresh equivalent. Used after reconnect to seed + * local Part 9 state. + * + * @param request the {@code QueryActiveAlarmsRequest}, optionally scoped by + * alarm-reference prefix + * @param observer caller-supplied observer that receives snapshots and completion + * @return a cancellable subscription handle + */ + public MxGatewayActiveAlarmsSubscription queryActiveAlarms( + QueryActiveAlarmsRequest request, StreamObserver observer) { + MxGatewayActiveAlarmsSubscription subscription = new MxGatewayActiveAlarmsSubscription(); + withStreamDeadline(rawAsyncStub()).queryActiveAlarms(request, subscription.wrap(observer)); + return subscription; + } + @Override public void close() { if (ownedChannel != null) { diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientVersion.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientVersion.java index 2f30be3..01c67dd 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientVersion.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientVersion.java @@ -7,7 +7,7 @@ package com.dohertylan.mxgateway.client; * worker speak the same protocol version as the client. */ public final class MxGatewayClientVersion { - private static final int GATEWAY_PROTOCOL_VERSION = 2; + private static final int GATEWAY_PROTOCOL_VERSION = 3; private static final int WORKER_PROTOCOL_VERSION = 1; private static final String CLIENT_VERSION = "0.1.0";