Resolve Client.Java-032..036: shared subscription base, batch tokenizer

Client.Java-032  README CLI examples for stream-alarms and
                 acknowledge-alarm now use the correct picocli flags
                 (--filter-prefix and --reference); two regression
                 tests parse each documented invocation.
Client.Java-033  StreamAlarmsCommand publishes an
                 AtomicReference<MxGatewayAlarmFeedSubscription> and
                 mirrors MxEventStream's overflow branch: a failed
                 queue.offer cancels the subscription, queues an
                 IllegalStateException, then queues the END sentinel
                 — preserving the fail-fast contract.
Client.Java-034  BatchCommand routes through a new
                 MxGatewayCli.tokenizeBatchLine POSIX-style shell
                 tokenizer that respects double-quoted, single-quoted,
                 and backslash-escaped arguments.
Client.Java-035  Added streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages
                 to MxGatewayClientSessionTests; asserts request shape,
                 message ordering, and cancellation propagation.
Client.Java-036  Extracted MxGatewayStreamSubscription<TRequest,TResponse>
                 abstract base; the four subscription classes
                 (MxGatewayEventSubscription, MxGatewayAlarmFeedSubscription,
                 MxGatewayActiveAlarmsSubscription, DeployEventSubscription)
                 collapse to ~10-line subclasses. A new contract test
                 runs identical lifecycle / cancellation assertions
                 across all four subclasses.

All resolved at 2026-05-24; gradle build + gradle test BUILD SUCCESSFUL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 09:29:27 -04:00
parent 186d03e5cc
commit d3cb311aae
11 changed files with 862 additions and 217 deletions
+2 -2
View File
@@ -179,8 +179,8 @@ gradle :zb-mom-ww-mxgateway-cli:run --args="add-item --endpoint localhost:5000 -
gradle :zb-mom-ww-mxgateway-cli:run --args="advise --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="write --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-events --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --alarm-reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --filter-prefix Galaxy --limit 1 --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json"
gradle :zb-mom-ww-mxgateway-cli:run --args="smoke --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --item TestObject.TestInt --json"
```
@@ -33,6 +33,7 @@ import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
@@ -119,7 +120,7 @@ public final class MxGatewayCli implements Callable<Integer> {
return 0;
}
private static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory));
commandLine.addSubcommand("version", new VersionCommand());
commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory));
@@ -154,6 +155,120 @@ public final class MxGatewayCli implements Callable<Integer> {
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
private static final Object ALARM_FEED_END = new Object();
/**
* Tokenises a single batch-mode stdin line into the argv that the inner
* {@link CommandLine} should execute. Honours single-quoted, double-quoted,
* and backslash-escaped runs so values that contain spaces (e.g.
* {@code --comment "needs verification"}) survive intact — the old
* implementation used {@code split("\\s+")} which shredded any quoted
* argument mid-string (Client.Java-034).
*
* <p>Rules (a small POSIX-like shell tokenizer; no variable expansion,
* command substitution, globbing, or backtick handling):
*
* <ul>
* <li>Outside quotes, runs of whitespace separate tokens.</li>
* <li>{@code "..."} groups a sequence into one token; the surrounding
* quotes are removed. Inside double quotes a backslash escapes
* {@code \\}, {@code "}, and a literal newline; other characters
* are taken literally (so {@code \n} is the two characters
* backslash-n).</li>
* <li>{@code '...'} groups a sequence into one token; the surrounding
* quotes are removed. Inside single quotes nothing is escaped —
* the run is literal until the matching single quote.</li>
* <li>Outside quotes, backslash escapes the next character (including
* whitespace, so {@code needs\ verification} is one token).</li>
* <li>An unterminated quote or a trailing backslash throws
* {@link IllegalArgumentException} so the batch loop surfaces it
* as a JSON error instead of silently emitting wrong args.</li>
* </ul>
*
* <p>Empty input (or input that contains only whitespace) returns an
* empty array so callers can skip the line.
*/
static String[] tokenizeBatchLine(String line) {
List<String> tokens = new ArrayList<>();
StringBuilder current = new StringBuilder();
boolean inToken = false;
// 0 = outside, 1 = inside single quotes, 2 = inside double quotes
int quoteMode = 0;
int length = line.length();
for (int i = 0; i < length; i++) {
char c = line.charAt(i);
if (quoteMode == 1) {
if (c == '\'') {
quoteMode = 0;
} else {
current.append(c);
}
continue;
}
if (quoteMode == 2) {
if (c == '\\') {
if (i + 1 >= length) {
throw new IllegalArgumentException(
"batch tokenizer: trailing backslash inside double-quoted string");
}
char next = line.charAt(i + 1);
if (next == '\\' || next == '"' || next == '\n') {
current.append(next);
i++;
} else {
// POSIX rule: inside double quotes a backslash is
// literal unless it precedes \, ", $, `, or newline.
current.append(c);
}
continue;
}
if (c == '"') {
quoteMode = 0;
continue;
}
current.append(c);
continue;
}
// Outside any quotes.
if (c == '\'') {
quoteMode = 1;
inToken = true;
continue;
}
if (c == '"') {
quoteMode = 2;
inToken = true;
continue;
}
if (c == '\\') {
if (i + 1 >= length) {
throw new IllegalArgumentException(
"batch tokenizer: trailing backslash outside quotes");
}
current.append(line.charAt(i + 1));
i++;
inToken = true;
continue;
}
if (Character.isWhitespace(c)) {
if (inToken) {
tokens.add(current.toString());
current.setLength(0);
inToken = false;
}
continue;
}
current.append(c);
inToken = true;
}
if (quoteMode != 0) {
throw new IllegalArgumentException(
"batch tokenizer: unterminated " + (quoteMode == 1 ? "single" : "double") + " quote");
}
if (inToken) {
tokens.add(current.toString());
}
return tokens.toArray(new String[0]);
}
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
@@ -183,8 +298,8 @@ public final class MxGatewayCli implements Callable<Integer> {
if (line.isEmpty()) {
break;
}
String[] args = line.trim().split("\\s+");
if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) {
String[] args = tokenizeBatchLine(line);
if (args.length == 0) {
continue;
}
StringWriter cmdOut = new StringWriter();
@@ -1079,11 +1194,29 @@ public final class MxGatewayCli implements Callable<Integer> {
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix(filterPrefix)
.build();
// Client.Java-033 — fail-fast on overflow. A bare
// queue.offer(value) silently drops messages past capacity,
// which violates the JavaStyleGuide "do not drop events"
// contract and lets the CLI exit 0 on a truncated feed.
// Mirrors MxEventStream's overflow branch: detect a failed
// offer, cancel the subscription, drain the buffer, then
// queue an explicit overflow exception followed by the END
// sentinel so the drain loop surfaces a non-zero exit.
AtomicReference<MxGatewayAlarmFeedSubscription> subscriptionRef = new AtomicReference<>();
MxGatewayAlarmFeedSubscription subscription =
client.streamAlarms(request, new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
queue.offer(value);
if (!queue.offer(value)) {
MxGatewayAlarmFeedSubscription sub = subscriptionRef.get();
if (sub != null) {
sub.cancel();
}
queue.clear();
queue.offer(new IllegalStateException(
"stream-alarms queue overflowed (capacity 1024); consumer too slow"));
queue.offer(ALARM_FEED_END);
}
}
@Override
@@ -1096,6 +1229,7 @@ public final class MxGatewayCli implements Callable<Integer> {
queue.offer(ALARM_FEED_END);
}
});
subscriptionRef.set(subscription);
try {
int count = 0;
while (true) {
@@ -225,6 +225,89 @@ final class MxGatewayCliTests {
assertTrue(run.errors().contains("--reference"), run.errors());
}
@Test
void readmeDocumentedStreamAlarmsExampleParsesCleanly() {
// Client.Java-032 regression — the README's stream-alarms example
// (clients/java/README.md:182) must round-trip through picocli's
// parser without a parse error. Before the fix, the example used
// a non-existent --session-id option and picocli failed at parse
// time. This test pins the exact tokens documented in the README.
String[] args = {
"stream-alarms",
"--endpoint",
"localhost:5000",
"--api-key-env",
"MXGATEWAY_API_KEY",
"--plaintext",
"--filter-prefix",
"Galaxy",
"--limit",
"1",
"--json"
};
assertReadmeExampleParses(args);
}
@Test
void readmeDocumentedAcknowledgeAlarmExampleParsesCleanly() {
// Client.Java-032 regression — the README's acknowledge-alarm
// example (clients/java/README.md:183) must parse without error.
// Before the fix it used --session-id (no such option) and
// --alarm-reference (the real option is --reference), so picocli
// rejected the invocation immediately.
String[] args = {
"acknowledge-alarm",
"--endpoint",
"localhost:5000",
"--api-key-env",
"MXGATEWAY_API_KEY",
"--plaintext",
"--reference",
"\\Galaxy\\Area001.Pump001.PumpFault",
"--json"
};
assertReadmeExampleParses(args);
}
/**
* Parses the given args through the production picocli {@link CommandLine}
* and asserts no parser error, no unknown option, and no missing required
* option. Does not execute the command body — only the option / subcommand
* parser is exercised, so no network call is made.
*/
private static void assertReadmeExampleParses(String[] args) {
picocli.CommandLine commandLine = MxGatewayCli.commandLine(new FakeClientFactory());
try {
commandLine.parseArgs(args);
} catch (picocli.CommandLine.ParameterException ex) {
throw new AssertionError(
"documented README invocation failed picocli parse: "
+ String.join(" ", args)
+ " -> "
+ ex.getMessage(),
ex);
}
}
@Test
void streamAlarmsCommandFailsFastOnQueueOverflow() {
// Client.Java-033 regression — the CLI's stream-alarms bounded queue
// used queue.offer(value) which silently dropped messages past
// capacity (1024). After the fix the CLI must surface the overflow
// as a non-zero exit (mirroring MxEventStream's fail-fast contract).
//
// The OverflowingFakeClient floods the gRPC observer with 2000
// messages synchronously, which exceeds the bounded 1024-element
// queue. The fix detects the failed offer, cancels the subscription,
// queues an overflow exception, and the drain loop surfaces it.
OverflowingFakeClientFactory factory = new OverflowingFakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Flood");
assertFalse(run.exitCode() == 0,
"expected non-zero exit when the alarm queue overflows; got exit=" + run.exitCode()
+ " out=\n" + run.output() + "\nerr=\n" + run.errors());
}
@Test
void batchCommandExecutesVersionAndEmitsEorMarker() {
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
@@ -235,6 +318,68 @@ final class MxGatewayCliTests {
assertTrue(out.contains(MxGatewayCli.BATCH_EOR), out);
}
@Test
void batchCommandTokenisesDoubleQuotedArgumentWithEmbeddedSpaces() {
// Client.Java-034 regression — a real shell-style tokenizer must not
// shred `"needs verification"` into two arguments. Drives
// acknowledge-alarm through batch and asserts the captured --comment
// is the un-quoted string with the embedded space preserved.
FakeClientFactory factory = new FakeClientFactory();
String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"needs verification\" --operator op1\n";
CliRun run = executeBatch(factory, line);
assertEquals(0, run.exitCode());
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
assertEquals("op1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
assertEquals(
"Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
}
@Test
void batchCommandTokenisesSingleQuotedArgumentWithEmbeddedSpaces() {
FakeClientFactory factory = new FakeClientFactory();
String line =
"acknowledge-alarm --reference Tank01.Level.HiHi --comment 'needs verification' --operator op1\n";
CliRun run = executeBatch(factory, line);
assertEquals(0, run.exitCode());
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
}
@Test
void batchCommandTokenisesBackslashEscapedSpaceOutsideQuotes() {
FakeClientFactory factory = new FakeClientFactory();
String line =
"acknowledge-alarm --reference Tank01.Level.HiHi --comment needs\\ verification\n";
CliRun run = executeBatch(factory, line);
assertEquals(0, run.exitCode());
assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment());
}
@Test
void batchCommandPreservesEmptyQuotedArgument() {
FakeClientFactory factory = new FakeClientFactory();
String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"\"\n";
CliRun run = executeBatch(factory, line);
assertEquals(0, run.exitCode());
assertEquals("", factory.client.lastAcknowledgeAlarmRequest.getComment());
}
@Test
void batchCommandSupportsBackslashEscapedQuoteInsideDoubleQuotes() {
// `--comment "with \"inner\" quote"` should round-trip the inner
// double-quote into the comment string.
FakeClientFactory factory = new FakeClientFactory();
String line =
"acknowledge-alarm --reference Tank01.Level.HiHi --comment \"with \\\"inner\\\" quote\"\n";
CliRun run = executeBatch(factory, line);
assertEquals(0, run.exitCode());
assertEquals("with \"inner\" quote", factory.client.lastAcknowledgeAlarmRequest.getComment());
}
@Test
void batchCommandEmitsEorAfterFailedCommandAndContinues() {
// An unknown subcommand causes a picocli parse error (non-zero exit).
@@ -290,6 +435,77 @@ final class MxGatewayCliTests {
}
}
/**
* Factory whose fake client floods the {@code streamAlarms} observer with
* 2000 messages synchronously, exceeding the CLI's bounded 1024-element
* queue. Used by the Client.Java-033 fail-fast overflow regression.
*/
private static final class OverflowingFakeClientFactory implements MxGatewayCli.MxGatewayCliClientFactory {
@Override
public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) {
return new OverflowingFakeClient(options.spec.commandLine().getOut());
}
}
private static final class OverflowingFakeClient implements MxGatewayCli.MxGatewayCliClient {
private final PrintWriter out;
OverflowingFakeClient(PrintWriter out) {
this.out = out;
}
@Override
public PrintWriter out() {
return out;
}
@Override
public OpenSessionReply openSession(OpenSessionRequest request) {
return OpenSessionReply.newBuilder().setSessionId("flood-session").setProtocolStatus(ok()).build();
}
@Override
public CloseSessionReply closeSession(CloseSessionRequest request) {
return CloseSessionReply.newBuilder()
.setSessionId(request.getSessionId())
.setFinalState(SessionState.SESSION_STATE_CLOSED)
.setProtocolStatus(ok())
.build();
}
@Override
public MxGatewayCli.MxGatewayCliSession session(String sessionId) {
throw new UnsupportedOperationException();
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
throw new UnsupportedOperationException();
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
// Synchronously push 2000 messages to overflow the CLI's bounded
// 1024-element queue. The CLI must surface the overflow rather
// than silently dropping the trailing ~976 messages.
for (int i = 0; i < 2000; i++) {
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Flood." + i)
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
}
observer.onCompleted();
return new MxGatewayAlarmFeedSubscription();
}
@Override
public void close() {
}
}
private static final class FakeClient implements MxGatewayCli.MxGatewayCliClient {
private final PrintWriter out;
private final FakeSession session = new FakeSession();
@@ -2,64 +2,19 @@ package com.zb.mom.ww.mxgateway.client;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Cancellable handle returned by the async {@code watchDeployEvents} variant.
* Mirrors {@link MxGatewayEventSubscription} but for the Galaxy Repository
* deploy-event stream.
*
* <p>All lifecycle / cancellation behaviour is inherited from
* {@link MxGatewayStreamSubscription} (Client.Java-036).
*/
public final class DeployEventSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<WatchDeployEventsRequest>> requestStream =
new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
ClientResponseObserver<WatchDeployEventsRequest, DeployEvent> wrap(StreamObserver<DeployEvent> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<WatchDeployEventsRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel("client cancelled deploy event stream", null);
}
}
@Override
public void onNext(DeployEvent value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
/**
* Cancels the underlying gRPC call. Safe to invoke before the call has
* started; cancellation is recorded and applied as soon as the stream
* attaches.
*/
public void cancel() {
cancelled.set(true);
ClientCallStreamObserver<WatchDeployEventsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled deploy event stream", null);
}
}
@Override
public void close() {
cancel();
public final class DeployEventSubscription
extends MxGatewayStreamSubscription<WatchDeployEventsRequest, DeployEvent> {
public DeployEventSubscription() {
super("client cancelled deploy event stream");
}
}
@@ -1,10 +1,6 @@
package com.zb.mom.ww.mxgateway.client;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
* subscription also implements {@link AutoCloseable} so it can participate in
* try-with-resources blocks.
*
* <p>All lifecycle / cancellation behaviour is inherited from
* {@link MxGatewayStreamSubscription} (Client.Java-036).
*/
public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<QueryActiveAlarmsRequest>> requestStream = new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
ClientResponseObserver<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> wrap(StreamObserver<ActiveAlarmSnapshot> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<QueryActiveAlarmsRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel("client cancelled active-alarms query", null);
}
}
@Override
public void onNext(ActiveAlarmSnapshot value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
/**
* Cancels the underlying gRPC call. Safe to invoke before the call has
* started; cancellation is recorded and applied as soon as the stream
* attaches.
*/
public void cancel() {
cancelled.set(true);
ClientCallStreamObserver<QueryActiveAlarmsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled active-alarms query", null);
}
}
@Override
public void close() {
cancel();
public final class MxGatewayActiveAlarmsSubscription
extends MxGatewayStreamSubscription<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> {
public MxGatewayActiveAlarmsSubscription() {
super("client cancelled active-alarms query");
}
}
@@ -1,10 +1,6 @@
package com.zb.mom.ww.mxgateway.client;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
* subscription also implements {@link AutoCloseable} so it can participate in
* try-with-resources blocks.
*
* <p>All lifecycle / cancellation behaviour is inherited from
* {@link MxGatewayStreamSubscription} (Client.Java-036).
*/
public final class MxGatewayAlarmFeedSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<StreamAlarmsRequest>> requestStream = new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
ClientResponseObserver<StreamAlarmsRequest, AlarmFeedMessage> wrap(StreamObserver<AlarmFeedMessage> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamAlarmsRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel("client cancelled alarm feed", null);
}
}
@Override
public void onNext(AlarmFeedMessage value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
/**
* Cancels the underlying gRPC call. Safe to invoke before the call has
* started; cancellation is recorded and applied as soon as the stream
* attaches.
*/
public void cancel() {
cancelled.set(true);
ClientCallStreamObserver<StreamAlarmsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled alarm feed", null);
}
}
@Override
public void close() {
cancel();
public final class MxGatewayAlarmFeedSubscription
extends MxGatewayStreamSubscription<StreamAlarmsRequest, AlarmFeedMessage> {
public MxGatewayAlarmFeedSubscription() {
super("client cancelled alarm feed");
}
}
@@ -1,10 +1,6 @@
package com.zb.mom.ww.mxgateway.client;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
@@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
* subscription also implements {@link AutoCloseable} so it can participate in
* try-with-resources blocks.
*
* <p>All lifecycle / cancellation behaviour is inherited from
* {@link MxGatewayStreamSubscription} (Client.Java-036).
*/
public final class MxGatewayEventSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<StreamEventsRequest>> requestStream = new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
ClientResponseObserver<StreamEventsRequest, MxEvent> wrap(StreamObserver<MxEvent> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel("client cancelled event stream", null);
}
}
@Override
public void onNext(MxEvent value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
/**
* Cancels the underlying gRPC call. Safe to invoke before the call has
* started; cancellation is recorded and applied as soon as the stream
* attaches.
*/
public void cancel() {
cancelled.set(true);
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled event stream", null);
}
}
@Override
public void close() {
cancel();
public final class MxGatewayEventSubscription
extends MxGatewayStreamSubscription<StreamEventsRequest, MxEvent> {
public MxGatewayEventSubscription() {
super("client cancelled event stream");
}
}
@@ -0,0 +1,89 @@
package com.zb.mom.ww.mxgateway.client;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Shared base for the cancellable subscription handles returned by the
* async-style server-streaming RPCs ({@code streamEvents}, {@code streamAlarms},
* {@code queryActiveAlarms}, {@code watchDeployEvents}).
*
* <p>All four subscription classes share the same lifecycle and cancellation
* contract:
*
* <ul>
* <li>{@link #wrap(StreamObserver)} returns a {@link ClientResponseObserver}
* that captures the underlying {@link ClientCallStreamObserver} in
* {@code beforeStart}. If {@link #cancel()} was called before the gRPC
* call attached, the stream is cancelled eagerly inside
* {@code beforeStart} (the Client.Java-014 close-before-beforeStart
* fix).</li>
* <li>{@link #cancel()} is idempotent. It records the cancellation flag and
* forwards {@code cancel(message, cause)} to the underlying stream when
* one is attached; otherwise the flag is checked in {@code beforeStart}
* once the stream attaches.</li>
* <li>{@link #close()} delegates to {@link #cancel()} so the handle can be
* used with try-with-resources.</li>
* </ul>
*
* <p>Subclasses supply only the cancel-message string used by {@code cancel()}.
* Refactor introduced for Client.Java-036 — the four prior subscription
* classes were structural near-clones (~60 lines each).
*/
abstract class MxGatewayStreamSubscription<TRequest, TResponse> implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<TRequest>> requestStream = new AtomicReference<>();
private final AtomicBoolean cancelled = new AtomicBoolean();
private final String cancelMessage;
MxGatewayStreamSubscription(String cancelMessage) {
this.cancelMessage = cancelMessage;
}
final ClientResponseObserver<TRequest, TResponse> wrap(StreamObserver<TResponse> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<TRequest> stream) {
requestStream.set(stream);
if (cancelled.get()) {
stream.cancel(cancelMessage, null);
}
}
@Override
public void onNext(TResponse value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
/**
* Cancels the underlying gRPC call. Safe to invoke before the call has
* started; cancellation is recorded and applied as soon as the stream
* attaches.
*/
public final void cancel() {
cancelled.set(true);
ClientCallStreamObserver<TRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel(cancelMessage, null);
}
}
@Override
public final void close() {
cancel();
}
}
@@ -27,7 +27,10 @@ import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
import mxaccess_gateway.v1.MxaccessGateway.BulkSubscribeReply;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
@@ -41,6 +44,7 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import org.junit.jupiter.api.Test;
@@ -268,6 +272,100 @@ final class MxGatewayClientSessionTests {
}
}
@Test
void streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages() throws Exception {
AtomicReference<StreamAlarmsRequest> streamRequest = new AtomicReference<>();
CountDownLatch serverCancelled = new CountDownLatch(1);
TestGatewayService service = new TestGatewayService() {
@Override
public void streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> responseObserver) {
streamRequest.set(request);
ServerCallStreamObserver<AlarmFeedMessage> server =
(ServerCallStreamObserver<AlarmFeedMessage>) responseObserver;
server.setOnCancelHandler(serverCancelled::countDown);
// Active-alarm snapshot, snapshot-complete sentinel, then a
// transition — mirrors the shape of a real alarm feed open.
server.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
server.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
server.onNext(AlarmFeedMessage.newBuilder()
.setTransition(OnAlarmTransitionEvent.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
.setSeverity(700))
.build());
// Note: we deliberately do NOT call onCompleted() so the call
// remains open for the cancellation assertion below.
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
java.util.List<AlarmFeedMessage> received = new java.util.ArrayList<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
CountDownLatch threeReceived = new CountDownLatch(3);
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix("Tank01")
.build();
MxGatewayAlarmFeedSubscription subscription = client.streamAlarms(
request,
new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
received.add(value);
threeReceived.countDown();
}
@Override
public void onError(Throwable t) {
errorRef.set(t);
}
@Override
public void onCompleted() {
}
});
assertTrue(threeReceived.await(5, TimeUnit.SECONDS),
"expected three alarm feed messages within 5s");
// The request shape (filter prefix in particular) must reach the
// server — proves MxGatewayClient.streamAlarms calls the production
// subscription.wrap(observer) glue and not a CLI override.
assertNotNull(streamRequest.get());
assertEquals("Tank01", streamRequest.get().getAlarmFilterPrefix());
// Order and payload-case must be preserved (the wrapping observer
// is just a pass-through).
assertEquals(3, received.size());
assertEquals(AlarmFeedMessage.PayloadCase.ACTIVE_ALARM, received.get(0).getPayloadCase());
assertEquals(
"Tank01.Level.HiHi",
received.get(0).getActiveAlarm().getAlarmFullReference());
assertEquals(AlarmFeedMessage.PayloadCase.SNAPSHOT_COMPLETE, received.get(1).getPayloadCase());
assertEquals(AlarmFeedMessage.PayloadCase.TRANSITION, received.get(2).getPayloadCase());
assertEquals(
AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE,
received.get(2).getTransition().getTransitionKind());
// No error expected before cancellation — proves the wrapping
// observer forwarded only data, not a synthetic error.
assertNull(errorRef.get(), "no error expected before cancellation");
// Cancellation must propagate to the underlying gRPC call.
subscription.cancel();
assertTrue(serverCancelled.await(5, TimeUnit.SECONDS),
"server should observe RPC cancellation after subscription.cancel()");
}
}
@Test
void commandFailureKeepsRawReply() throws Exception {
TestGatewayService service = new TestGatewayService() {
@@ -0,0 +1,275 @@
package com.zb.mom.ww.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import org.junit.jupiter.api.Test;
/**
* Lifecycle / cancellation contract tests applied uniformly to each of the
* four subscription classes that extend {@link MxGatewayStreamSubscription}.
*
* <p>Locks in the Client.Java-036 refactor: every subclass must exhibit the
* same behaviour for (a) cancel-before-beforeStart eagerly cancelling the
* stream once it attaches, (b) cancel-after-beforeStart forwarding directly
* to the stream, (c) the cancel message matching the subclass's documented
* value, (d) {@code close()} delegating to {@code cancel()}, and (e) the
* wrapping observer forwarding {@code onNext}/{@code onError}/{@code onCompleted}
* to the caller's observer.
*/
final class MxGatewayStreamSubscriptionContractTests {
@Test
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_eventSubscription() {
runCancelBeforeBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream");
}
@Test
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_alarmFeedSubscription() {
runCancelBeforeBeforeStartTest(
new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed");
}
@Test
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_activeAlarmsSubscription() {
runCancelBeforeBeforeStartTest(
new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query");
}
@Test
void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_deployEventSubscription() {
runCancelBeforeBeforeStartTest(
new DeployEventSubscription(), "client cancelled deploy event stream");
}
@Test
void cancelAfterBeforeStartForwardsToStream_eventSubscription() {
runCancelAfterBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream");
}
@Test
void cancelAfterBeforeStartForwardsToStream_alarmFeedSubscription() {
runCancelAfterBeforeStartTest(
new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed");
}
@Test
void cancelAfterBeforeStartForwardsToStream_activeAlarmsSubscription() {
runCancelAfterBeforeStartTest(
new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query");
}
@Test
void cancelAfterBeforeStartForwardsToStream_deployEventSubscription() {
runCancelAfterBeforeStartTest(
new DeployEventSubscription(), "client cancelled deploy event stream");
}
@Test
void closeDelegatesToCancel_eventSubscription() {
runCloseDelegatesToCancelTest(new MxGatewayEventSubscription());
}
@Test
void closeDelegatesToCancel_alarmFeedSubscription() {
runCloseDelegatesToCancelTest(new MxGatewayAlarmFeedSubscription());
}
@Test
void closeDelegatesToCancel_activeAlarmsSubscription() {
runCloseDelegatesToCancelTest(new MxGatewayActiveAlarmsSubscription());
}
@Test
void closeDelegatesToCancel_deployEventSubscription() {
runCloseDelegatesToCancelTest(new DeployEventSubscription());
}
@Test
void wrappedObserverForwardsOnNextOnErrorOnCompleted_eventSubscription() {
MxEvent event = MxEvent.newBuilder().setWorkerSequence(7L).build();
runForwardingTest(new MxGatewayEventSubscription(), event);
}
@Test
void wrappedObserverForwardsOnNextOnErrorOnCompleted_alarmFeedSubscription() {
AlarmFeedMessage msg = AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build();
runForwardingTest(new MxGatewayAlarmFeedSubscription(), msg);
}
@Test
void wrappedObserverForwardsOnNextOnErrorOnCompleted_activeAlarmsSubscription() {
ActiveAlarmSnapshot snap = ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("ref")
.setSeverity(500)
.build();
runForwardingTest(new MxGatewayActiveAlarmsSubscription(), snap);
}
@Test
void wrappedObserverForwardsOnNextOnErrorOnCompleted_deployEventSubscription() {
DeployEvent ev = DeployEvent.newBuilder().setSequence(1L).build();
runForwardingTest(new DeployEventSubscription(), ev);
}
private static <Req, Resp> void runCancelBeforeBeforeStartTest(
MxGatewayStreamSubscription<Req, Resp> subscription, String expectedMessage) {
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
subscription.cancel();
wrapped.beforeStart(stream);
assertTrue(stream.cancelled, "stream should have been cancelled by beforeStart after prior cancel()");
assertEquals(expectedMessage, stream.cancelMessage);
}
private static <Req, Resp> void runCancelAfterBeforeStartTest(
MxGatewayStreamSubscription<Req, Resp> subscription, String expectedMessage) {
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
wrapped.beforeStart(stream);
assertFalse(stream.cancelled, "stream should not be cancelled before cancel() is called");
subscription.cancel();
assertTrue(stream.cancelled, "stream should have been cancelled by direct cancel()");
assertEquals(expectedMessage, stream.cancelMessage);
}
private static <Req, Resp> void runCloseDelegatesToCancelTest(
MxGatewayStreamSubscription<Req, Resp> subscription) {
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(new NoopObserver<>());
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
wrapped.beforeStart(stream);
subscription.close();
assertTrue(stream.cancelled, "close() should delegate to cancel()");
}
private static <Req, Resp> void runForwardingTest(
MxGatewayStreamSubscription<Req, Resp> subscription, Resp value) {
List<Resp> received = new ArrayList<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
AtomicReference<Boolean> completed = new AtomicReference<>(false);
StreamObserver<Resp> caller = new StreamObserver<>() {
@Override
public void onNext(Resp v) {
received.add(v);
}
@Override
public void onError(Throwable t) {
errorRef.set(t);
}
@Override
public void onCompleted() {
completed.set(true);
}
};
ClientResponseObserver<Req, Resp> wrapped = subscription.wrap(caller);
RecordingClientCallStreamObserver<Req> stream = new RecordingClientCallStreamObserver<>();
wrapped.beforeStart(stream);
wrapped.onNext(value);
IllegalStateException boom = new IllegalStateException("boom");
wrapped.onError(boom);
wrapped.onCompleted();
assertEquals(1, received.size());
assertEquals(value, received.get(0));
assertNotNull(errorRef.get());
assertEquals(boom, errorRef.get());
assertTrue(completed.get());
}
private static final class NoopObserver<T> implements StreamObserver<T> {
@Override
public void onNext(T value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
private static final class RecordingClientCallStreamObserver<T> extends ClientCallStreamObserver<T> {
boolean cancelled;
String cancelMessage;
@Override
public boolean isReady() {
return true;
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
}
@Override
public void disableAutoInboundFlowControl() {
}
@Override
public void request(int count) {
}
@Override
public void setMessageCompression(boolean enable) {
}
@Override
public void cancel(String message, Throwable cause) {
cancelled = true;
cancelMessage = message;
}
@Override
public void onNext(T value) {
}
@Override
public void onError(Throwable error) {
}
@Override
public void onCompleted() {
}
}
// Compile-time guarantee that the parameter types still match the
// generic bounds — catches a regression where a subclass changes its
// request/response types out from under the shared base.
@SuppressWarnings("unused")
private static void typeBoundsCheck() {
MxGatewayStreamSubscription<StreamEventsRequest, MxEvent> a = new MxGatewayEventSubscription();
MxGatewayStreamSubscription<StreamAlarmsRequest, AlarmFeedMessage> b = new MxGatewayAlarmFeedSubscription();
MxGatewayStreamSubscription<QueryActiveAlarmsRequest, ActiveAlarmSnapshot> c =
new MxGatewayActiveAlarmsSubscription();
MxGatewayStreamSubscription<WatchDeployEventsRequest, DeployEvent> d = new DeployEventSubscription();
}
}