Point the Java client at the StreamAlarms alarm feed

Regenerate the Java protobuf stubs and replace queryActiveAlarms with
streamAlarms, returning a MxGatewayAlarmFeedSubscription over
AlarmFeedMessage served by the gateway's central alarm monitor
(snapshot, snapshot_complete, then live transitions). Drops session_id
from the acknowledge surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 16:55:20 -04:00
parent 1b6ca07bb5
commit 6777d49030
5 changed files with 1677 additions and 900 deletions
@@ -5,33 +5,33 @@ import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
/**
* Cancellable handle returned by {@code queryActiveAlarms}.
* Cancellable handle returned by {@code streamAlarms}.
*
* <p>Wraps a caller-supplied {@link StreamObserver} and exposes a
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
* subscription also implements {@link AutoCloseable} so it can participate in
* try-with-resources blocks.
*/
public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<QueryActiveAlarmsRequest>> requestStream = new AtomicReference<>();
public final class MxGatewayAlarmFeedSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<StreamAlarmsRequest>> requestStream = new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
ClientResponseObserver<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> wrap(StreamObserver<ActiveAlarmSnapshot> observer) {
ClientResponseObserver<StreamAlarmsRequest, AlarmFeedMessage> wrap(StreamObserver<AlarmFeedMessage> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<QueryActiveAlarmsRequest> stream) {
public void beforeStart(ClientCallStreamObserver<StreamAlarmsRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel("client cancelled active-alarms query", null);
stream.cancel("client cancelled alarm feed", null);
}
}
@Override
public void onNext(ActiveAlarmSnapshot value) {
public void onNext(AlarmFeedMessage value) {
observer.onNext(value);
}
@@ -54,9 +54,9 @@ public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable {
*/
public void cancel() {
cancelled.set(true);
ClientCallStreamObserver<QueryActiveAlarmsRequest> stream = requestStream.get();
ClientCallStreamObserver<StreamAlarmsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled active-alarms query", null);
stream.cancel("client cancelled alarm feed", null);
}
}
@@ -10,7 +10,7 @@ import java.util.concurrent.CompletableFuture;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
@@ -19,7 +19,7 @@ import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
/**
@@ -328,20 +328,24 @@ public final class MxGatewayClient implements AutoCloseable {
}
/**
* Streams a snapshot of all alarms currently Active or ActiveAcked — the
* gateway's ConditionRefresh equivalent. Used after reconnect to seed
* local Part 9 state.
* Attaches to the gateway's central alarm feed. The stream opens with one
* {@code AlarmFeedMessage} per currently-active alarm (the ConditionRefresh
* snapshot), then a single {@code snapshot_complete}, then a
* {@code transition} for every subsequent raise / acknowledge / clear.
*
* @param request the {@code QueryActiveAlarmsRequest}, optionally scoped by
* <p>Served by the gateway's always-on alarm monitor — no worker session is
* opened — so any number of clients may attach.
*
* @param request the {@code StreamAlarmsRequest}, optionally scoped by
* alarm-reference prefix
* @param observer caller-supplied observer that receives snapshots and completion
* @param observer caller-supplied observer that receives feed messages and completion
* @return a cancellable subscription handle
*/
public MxGatewayActiveAlarmsSubscription queryActiveAlarms(
QueryActiveAlarmsRequest request, StreamObserver<ActiveAlarmSnapshot> observer) {
MxGatewayActiveAlarmsSubscription subscription = new MxGatewayActiveAlarmsSubscription();
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
MxGatewayAlarmFeedSubscription subscription = new MxGatewayAlarmFeedSubscription();
MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options)
.queryActiveAlarms(request, subscription.wrap(observer));
.streamAlarms(request, subscription.wrap(observer));
return subscription;
}
@@ -30,10 +30,11 @@ import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import org.junit.jupiter.api.Test;
@@ -57,7 +58,6 @@ final class MxGatewayLowFindingsTests {
AcknowledgeAlarmRequest request, StreamObserver<AcknowledgeAlarmReply> responseObserver) {
seen.set(request);
responseObserver.onNext(AcknowledgeAlarmReply.newBuilder()
.setSessionId(request.getSessionId())
.setProtocolStatus(ok())
.setDiagnosticMessage("acked")
.build());
@@ -67,7 +67,6 @@ final class MxGatewayLowFindingsTests {
try (Harness harness = Harness.start(service, "mxgw_keyid_secret", authorization)) {
AcknowledgeAlarmReply reply = harness.client().acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
.setSessionId("s-1")
.setAlarmFullReference("Area1.Pump.PV.HiHi")
.setComment("operator note")
.build());
@@ -84,7 +83,6 @@ final class MxGatewayLowFindingsTests {
public void acknowledgeAlarm(
AcknowledgeAlarmRequest request, StreamObserver<AcknowledgeAlarmReply> responseObserver) {
responseObserver.onNext(AcknowledgeAlarmReply.newBuilder()
.setSessionId(request.getSessionId())
.setProtocolStatus(ProtocolStatus.newBuilder()
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND))
.build());
@@ -96,7 +94,7 @@ final class MxGatewayLowFindingsTests {
assertThrows(
MxGatewayException.class,
() -> harness.client().acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
.setSessionId("missing")
.setAlarmFullReference("Area1.Pump.PV.HiHi")
.build()));
}
}
@@ -108,7 +106,6 @@ final class MxGatewayLowFindingsTests {
public void acknowledgeAlarm(
AcknowledgeAlarmRequest request, StreamObserver<AcknowledgeAlarmReply> responseObserver) {
responseObserver.onNext(AcknowledgeAlarmReply.newBuilder()
.setSessionId(request.getSessionId())
.setProtocolStatus(ok())
.setDiagnosticMessage("async-acked")
.build());
@@ -118,7 +115,9 @@ final class MxGatewayLowFindingsTests {
try (Harness harness = Harness.start(service)) {
CompletableFuture<AcknowledgeAlarmReply> future = harness.client()
.acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder().setSessionId("s-2").build());
.acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder()
.setAlarmFullReference("Area1.Pump.PV.HiHi")
.build());
assertEquals("async-acked", future.get(5, TimeUnit.SECONDS).getDiagnosticMessage());
}
}
@@ -135,39 +134,45 @@ final class MxGatewayLowFindingsTests {
try (Harness harness = Harness.start(service)) {
CompletableFuture<AcknowledgeAlarmReply> future = harness.client()
.acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder().setSessionId("s-3").build());
.acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder()
.setAlarmFullReference("Area1.Pump.PV.HiHi")
.build());
ExecutionException error = assertThrows(
ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
assertTrue(error.getCause() instanceof MxGatewayException, () -> String.valueOf(error.getCause()));
}
}
// --- Client.Java-007: QueryActiveAlarms RPC + subscription coverage ---
// --- Client.Java-007: StreamAlarms RPC + subscription coverage ---
@Test
void queryActiveAlarmsDeliversSnapshotsToObserver() throws Exception {
ActiveAlarmSnapshot snapshot = ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Area1.Tank.Level.Hi")
.setSeverity(800)
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
void streamAlarmsDeliversFeedMessagesToObserver() throws Exception {
AlarmFeedMessage active = AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Area1.Tank.Level.Hi")
.setSeverity(800)
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE))
.build();
AlarmFeedMessage snapshotComplete =
AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build();
TestService service = new TestService() {
@Override
public void queryActiveAlarms(
QueryActiveAlarmsRequest request, StreamObserver<ActiveAlarmSnapshot> responseObserver) {
responseObserver.onNext(snapshot);
public void streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> responseObserver) {
responseObserver.onNext(active);
responseObserver.onNext(snapshotComplete);
responseObserver.onCompleted();
}
};
try (Harness harness = Harness.start(service)) {
List<ActiveAlarmSnapshot> received = new ArrayList<>();
List<AlarmFeedMessage> received = new ArrayList<>();
CountDownLatch done = new CountDownLatch(1);
harness.client().queryActiveAlarms(
QueryActiveAlarmsRequest.newBuilder().setSessionId("s-4").build(),
harness.client().streamAlarms(
StreamAlarmsRequest.newBuilder().build(),
new StreamObserver<>() {
@Override
public void onNext(ActiveAlarmSnapshot value) {
public void onNext(AlarmFeedMessage value) {
received.add(value);
}
@@ -182,18 +187,19 @@ final class MxGatewayLowFindingsTests {
}
});
assertTrue(done.await(5, TimeUnit.SECONDS), "stream should complete");
assertEquals(1, received.size());
assertEquals("Area1.Tank.Level.Hi", received.get(0).getAlarmFullReference());
assertEquals(2, received.size());
assertEquals("Area1.Tank.Level.Hi", received.get(0).getActiveAlarm().getAlarmFullReference());
assertTrue(received.get(1).getSnapshotComplete());
}
}
@Test
void activeAlarmsSubscriptionCancelBeforeBeforeStartCancelsStream() {
MxGatewayActiveAlarmsSubscription subscription = new MxGatewayActiveAlarmsSubscription();
ClientResponseObserver<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> observer =
void alarmFeedSubscriptionCancelBeforeBeforeStartCancelsStream() {
MxGatewayAlarmFeedSubscription subscription = new MxGatewayAlarmFeedSubscription();
ClientResponseObserver<StreamAlarmsRequest, AlarmFeedMessage> observer =
subscription.wrap(new StreamObserver<>() {
@Override
public void onNext(ActiveAlarmSnapshot value) {
public void onNext(AlarmFeedMessage value) {
}
@Override
@@ -204,13 +210,13 @@ final class MxGatewayLowFindingsTests {
public void onCompleted() {
}
});
RecordingActiveAlarmsRequestStream requestStream = new RecordingActiveAlarmsRequestStream();
RecordingAlarmFeedRequestStream requestStream = new RecordingAlarmFeedRequestStream();
subscription.cancel();
observer.beforeStart(requestStream);
assertTrue(requestStream.cancelled);
assertEquals("client cancelled active-alarms query", requestStream.cancelMessage);
assertEquals("client cancelled alarm feed", requestStream.cancelMessage);
}
// --- Client.Java-007: async streamEvents + subscription cancellation ---
@@ -456,8 +462,8 @@ final class MxGatewayLowFindingsTests {
}
}
private static final class RecordingActiveAlarmsRequestStream
extends ClientCallStreamObserver<QueryActiveAlarmsRequest> {
private static final class RecordingAlarmFeedRequestStream
extends ClientCallStreamObserver<StreamAlarmsRequest> {
private boolean cancelled;
private String cancelMessage;
@@ -489,7 +495,7 @@ final class MxGatewayLowFindingsTests {
}
@Override
public void onNext(QueryActiveAlarmsRequest value) {
public void onNext(StreamAlarmsRequest value) {
}
@Override