diff --git a/clients/java/README.md b/clients/java/README.md index b53bbd2..977beef 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -179,8 +179,8 @@ gradle :zb-mom-ww-mxgateway-cli:run --args="add-item --endpoint localhost:5000 - gradle :zb-mom-ww-mxgateway-cli:run --args="advise --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id --server-handle 1 --item-handle 1 --json" gradle :zb-mom-ww-mxgateway-cli:run --args="write --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id --server-handle 1 --item-handle 1 --type int32 --value 123 --json" gradle :zb-mom-ww-mxgateway-cli:run --args="stream-events --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id --limit 1 --json" -gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id --limit 1 --json" -gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id --alarm-reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json" +gradle :zb-mom-ww-mxgateway-cli:run --args="stream-alarms --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --filter-prefix Galaxy --limit 1 --json" +gradle :zb-mom-ww-mxgateway-cli:run --args="acknowledge-alarm --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --reference \"\\Galaxy\Area001.Pump001.PumpFault\" --json" gradle :zb-mom-ww-mxgateway-cli:run --args="smoke --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --item TestObject.TestInt --json" ``` diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java index 310bf38..c59cc27 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java @@ -33,6 +33,7 @@ import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; @@ -119,7 +120,7 @@ public final class MxGatewayCli implements Callable { return 0; } - private static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) { + static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) { CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory)); commandLine.addSubcommand("version", new VersionCommand()); commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory)); @@ -154,6 +155,120 @@ public final class MxGatewayCli implements Callable { /** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */ private static final Object ALARM_FEED_END = new Object(); + /** + * Tokenises a single batch-mode stdin line into the argv that the inner + * {@link CommandLine} should execute. Honours single-quoted, double-quoted, + * and backslash-escaped runs so values that contain spaces (e.g. + * {@code --comment "needs verification"}) survive intact — the old + * implementation used {@code split("\\s+")} which shredded any quoted + * argument mid-string (Client.Java-034). + * + *

Rules (a small POSIX-like shell tokenizer; no variable expansion, + * command substitution, globbing, or backtick handling): + * + *

    + *
  • Outside quotes, runs of whitespace separate tokens.
  • + *
  • {@code "..."} groups a sequence into one token; the surrounding + * quotes are removed. Inside double quotes a backslash escapes + * {@code \\}, {@code "}, and a literal newline; other characters + * are taken literally (so {@code \n} is the two characters + * backslash-n).
  • + *
  • {@code '...'} groups a sequence into one token; the surrounding + * quotes are removed. Inside single quotes nothing is escaped — + * the run is literal until the matching single quote.
  • + *
  • Outside quotes, backslash escapes the next character (including + * whitespace, so {@code needs\ verification} is one token).
  • + *
  • An unterminated quote or a trailing backslash throws + * {@link IllegalArgumentException} so the batch loop surfaces it + * as a JSON error instead of silently emitting wrong args.
  • + *
+ * + *

Empty input (or input that contains only whitespace) returns an + * empty array so callers can skip the line. + */ + static String[] tokenizeBatchLine(String line) { + List tokens = new ArrayList<>(); + StringBuilder current = new StringBuilder(); + boolean inToken = false; + // 0 = outside, 1 = inside single quotes, 2 = inside double quotes + int quoteMode = 0; + int length = line.length(); + for (int i = 0; i < length; i++) { + char c = line.charAt(i); + if (quoteMode == 1) { + if (c == '\'') { + quoteMode = 0; + } else { + current.append(c); + } + continue; + } + if (quoteMode == 2) { + if (c == '\\') { + if (i + 1 >= length) { + throw new IllegalArgumentException( + "batch tokenizer: trailing backslash inside double-quoted string"); + } + char next = line.charAt(i + 1); + if (next == '\\' || next == '"' || next == '\n') { + current.append(next); + i++; + } else { + // POSIX rule: inside double quotes a backslash is + // literal unless it precedes \, ", $, `, or newline. + current.append(c); + } + continue; + } + if (c == '"') { + quoteMode = 0; + continue; + } + current.append(c); + continue; + } + // Outside any quotes. + if (c == '\'') { + quoteMode = 1; + inToken = true; + continue; + } + if (c == '"') { + quoteMode = 2; + inToken = true; + continue; + } + if (c == '\\') { + if (i + 1 >= length) { + throw new IllegalArgumentException( + "batch tokenizer: trailing backslash outside quotes"); + } + current.append(line.charAt(i + 1)); + i++; + inToken = true; + continue; + } + if (Character.isWhitespace(c)) { + if (inToken) { + tokens.add(current.toString()); + current.setLength(0); + inToken = false; + } + continue; + } + current.append(c); + inToken = true; + } + if (quoteMode != 0) { + throw new IllegalArgumentException( + "batch tokenizer: unterminated " + (quoteMode == 1 ? "single" : "double") + " quote"); + } + if (inToken) { + tokens.add(current.toString()); + } + return tokens.toArray(new String[0]); + } + /** * Reads one CLI invocation per stdin line, executes each via a fresh * {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after @@ -183,8 +298,8 @@ public final class MxGatewayCli implements Callable { if (line.isEmpty()) { break; } - String[] args = line.trim().split("\\s+"); - if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) { + String[] args = tokenizeBatchLine(line); + if (args.length == 0) { continue; } StringWriter cmdOut = new StringWriter(); @@ -1079,11 +1194,29 @@ public final class MxGatewayCli implements Callable { StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder() .setAlarmFilterPrefix(filterPrefix) .build(); + // Client.Java-033 — fail-fast on overflow. A bare + // queue.offer(value) silently drops messages past capacity, + // which violates the JavaStyleGuide "do not drop events" + // contract and lets the CLI exit 0 on a truncated feed. + // Mirrors MxEventStream's overflow branch: detect a failed + // offer, cancel the subscription, drain the buffer, then + // queue an explicit overflow exception followed by the END + // sentinel so the drain loop surfaces a non-zero exit. + AtomicReference subscriptionRef = new AtomicReference<>(); MxGatewayAlarmFeedSubscription subscription = client.streamAlarms(request, new StreamObserver<>() { @Override public void onNext(AlarmFeedMessage value) { - queue.offer(value); + if (!queue.offer(value)) { + MxGatewayAlarmFeedSubscription sub = subscriptionRef.get(); + if (sub != null) { + sub.cancel(); + } + queue.clear(); + queue.offer(new IllegalStateException( + "stream-alarms queue overflowed (capacity 1024); consumer too slow")); + queue.offer(ALARM_FEED_END); + } } @Override @@ -1096,6 +1229,7 @@ public final class MxGatewayCli implements Callable { queue.offer(ALARM_FEED_END); } }); + subscriptionRef.set(subscription); try { int count = 0; while (true) { diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java index 39e4cad..f6426ef 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java @@ -225,6 +225,89 @@ final class MxGatewayCliTests { assertTrue(run.errors().contains("--reference"), run.errors()); } + @Test + void readmeDocumentedStreamAlarmsExampleParsesCleanly() { + // Client.Java-032 regression — the README's stream-alarms example + // (clients/java/README.md:182) must round-trip through picocli's + // parser without a parse error. Before the fix, the example used + // a non-existent --session-id option and picocli failed at parse + // time. This test pins the exact tokens documented in the README. + String[] args = { + "stream-alarms", + "--endpoint", + "localhost:5000", + "--api-key-env", + "MXGATEWAY_API_KEY", + "--plaintext", + "--filter-prefix", + "Galaxy", + "--limit", + "1", + "--json" + }; + assertReadmeExampleParses(args); + } + + @Test + void readmeDocumentedAcknowledgeAlarmExampleParsesCleanly() { + // Client.Java-032 regression — the README's acknowledge-alarm + // example (clients/java/README.md:183) must parse without error. + // Before the fix it used --session-id (no such option) and + // --alarm-reference (the real option is --reference), so picocli + // rejected the invocation immediately. + String[] args = { + "acknowledge-alarm", + "--endpoint", + "localhost:5000", + "--api-key-env", + "MXGATEWAY_API_KEY", + "--plaintext", + "--reference", + "\\Galaxy\\Area001.Pump001.PumpFault", + "--json" + }; + assertReadmeExampleParses(args); + } + + /** + * Parses the given args through the production picocli {@link CommandLine} + * and asserts no parser error, no unknown option, and no missing required + * option. Does not execute the command body — only the option / subcommand + * parser is exercised, so no network call is made. + */ + private static void assertReadmeExampleParses(String[] args) { + picocli.CommandLine commandLine = MxGatewayCli.commandLine(new FakeClientFactory()); + try { + commandLine.parseArgs(args); + } catch (picocli.CommandLine.ParameterException ex) { + throw new AssertionError( + "documented README invocation failed picocli parse: " + + String.join(" ", args) + + " -> " + + ex.getMessage(), + ex); + } + } + + @Test + void streamAlarmsCommandFailsFastOnQueueOverflow() { + // Client.Java-033 regression — the CLI's stream-alarms bounded queue + // used queue.offer(value) which silently dropped messages past + // capacity (1024). After the fix the CLI must surface the overflow + // as a non-zero exit (mirroring MxEventStream's fail-fast contract). + // + // The OverflowingFakeClient floods the gRPC observer with 2000 + // messages synchronously, which exceeds the bounded 1024-element + // queue. The fix detects the failed offer, cancels the subscription, + // queues an overflow exception, and the drain loop surfaces it. + OverflowingFakeClientFactory factory = new OverflowingFakeClientFactory(); + CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Flood"); + + assertFalse(run.exitCode() == 0, + "expected non-zero exit when the alarm queue overflows; got exit=" + run.exitCode() + + " out=\n" + run.output() + "\nerr=\n" + run.errors()); + } + @Test void batchCommandExecutesVersionAndEmitsEorMarker() { CliRun run = executeBatch(new FakeClientFactory(), "version --json\n"); @@ -235,6 +318,68 @@ final class MxGatewayCliTests { assertTrue(out.contains(MxGatewayCli.BATCH_EOR), out); } + @Test + void batchCommandTokenisesDoubleQuotedArgumentWithEmbeddedSpaces() { + // Client.Java-034 regression — a real shell-style tokenizer must not + // shred `"needs verification"` into two arguments. Drives + // acknowledge-alarm through batch and asserts the captured --comment + // is the un-quoted string with the embedded space preserved. + FakeClientFactory factory = new FakeClientFactory(); + String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"needs verification\" --operator op1\n"; + CliRun run = executeBatch(factory, line); + + assertEquals(0, run.exitCode()); + assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment()); + assertEquals("op1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser()); + assertEquals( + "Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference()); + } + + @Test + void batchCommandTokenisesSingleQuotedArgumentWithEmbeddedSpaces() { + FakeClientFactory factory = new FakeClientFactory(); + String line = + "acknowledge-alarm --reference Tank01.Level.HiHi --comment 'needs verification' --operator op1\n"; + CliRun run = executeBatch(factory, line); + + assertEquals(0, run.exitCode()); + assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment()); + } + + @Test + void batchCommandTokenisesBackslashEscapedSpaceOutsideQuotes() { + FakeClientFactory factory = new FakeClientFactory(); + String line = + "acknowledge-alarm --reference Tank01.Level.HiHi --comment needs\\ verification\n"; + CliRun run = executeBatch(factory, line); + + assertEquals(0, run.exitCode()); + assertEquals("needs verification", factory.client.lastAcknowledgeAlarmRequest.getComment()); + } + + @Test + void batchCommandPreservesEmptyQuotedArgument() { + FakeClientFactory factory = new FakeClientFactory(); + String line = "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"\"\n"; + CliRun run = executeBatch(factory, line); + + assertEquals(0, run.exitCode()); + assertEquals("", factory.client.lastAcknowledgeAlarmRequest.getComment()); + } + + @Test + void batchCommandSupportsBackslashEscapedQuoteInsideDoubleQuotes() { + // `--comment "with \"inner\" quote"` should round-trip the inner + // double-quote into the comment string. + FakeClientFactory factory = new FakeClientFactory(); + String line = + "acknowledge-alarm --reference Tank01.Level.HiHi --comment \"with \\\"inner\\\" quote\"\n"; + CliRun run = executeBatch(factory, line); + + assertEquals(0, run.exitCode()); + assertEquals("with \"inner\" quote", factory.client.lastAcknowledgeAlarmRequest.getComment()); + } + @Test void batchCommandEmitsEorAfterFailedCommandAndContinues() { // An unknown subcommand causes a picocli parse error (non-zero exit). @@ -290,6 +435,77 @@ final class MxGatewayCliTests { } } + /** + * Factory whose fake client floods the {@code streamAlarms} observer with + * 2000 messages synchronously, exceeding the CLI's bounded 1024-element + * queue. Used by the Client.Java-033 fail-fast overflow regression. + */ + private static final class OverflowingFakeClientFactory implements MxGatewayCli.MxGatewayCliClientFactory { + @Override + public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) { + return new OverflowingFakeClient(options.spec.commandLine().getOut()); + } + } + + private static final class OverflowingFakeClient implements MxGatewayCli.MxGatewayCliClient { + private final PrintWriter out; + + OverflowingFakeClient(PrintWriter out) { + this.out = out; + } + + @Override + public PrintWriter out() { + return out; + } + + @Override + public OpenSessionReply openSession(OpenSessionRequest request) { + return OpenSessionReply.newBuilder().setSessionId("flood-session").setProtocolStatus(ok()).build(); + } + + @Override + public CloseSessionReply closeSession(CloseSessionRequest request) { + return CloseSessionReply.newBuilder() + .setSessionId(request.getSessionId()) + .setFinalState(SessionState.SESSION_STATE_CLOSED) + .setProtocolStatus(ok()) + .build(); + } + + @Override + public MxGatewayCli.MxGatewayCliSession session(String sessionId) { + throw new UnsupportedOperationException(); + } + + @Override + public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public MxGatewayAlarmFeedSubscription streamAlarms( + StreamAlarmsRequest request, StreamObserver observer) { + // Synchronously push 2000 messages to overflow the CLI's bounded + // 1024-element queue. The CLI must surface the overflow rather + // than silently dropping the trailing ~976 messages. + for (int i = 0; i < 2000; i++) { + observer.onNext(AlarmFeedMessage.newBuilder() + .setActiveAlarm(ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("Flood." + i) + .setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE) + .setSeverity(700)) + .build()); + } + observer.onCompleted(); + return new MxGatewayAlarmFeedSubscription(); + } + + @Override + public void close() { + } + } + private static final class FakeClient implements MxGatewayCli.MxGatewayCliClient { private final PrintWriter out; private final FakeSession session = new FakeSession(); diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/DeployEventSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/DeployEventSubscription.java index 5f4df47..14b165d 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/DeployEventSubscription.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/DeployEventSubscription.java @@ -2,64 +2,19 @@ package com.zb.mom.ww.mxgateway.client; import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent; import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * Cancellable handle returned by the async {@code watchDeployEvents} variant. * Mirrors {@link MxGatewayEventSubscription} but for the Galaxy Repository * deploy-event stream. + * + *

All lifecycle / cancellation behaviour is inherited from + * {@link MxGatewayStreamSubscription} (Client.Java-036). */ -public final class DeployEventSubscription implements AutoCloseable { - private final AtomicReference> requestStream = - new AtomicReference<>(); - private final AtomicBoolean cancelled = new AtomicBoolean(); - - ClientResponseObserver wrap(StreamObserver observer) { - return new ClientResponseObserver<>() { - @Override - public void beforeStart(ClientCallStreamObserver stream) { - requestStream.set(stream); - if (cancelled.get()) { - stream.cancel("client cancelled deploy event stream", null); - } - } - - @Override - public void onNext(DeployEvent value) { - observer.onNext(value); - } - - @Override - public void onError(Throwable error) { - observer.onError(error); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - - /** - * Cancels the underlying gRPC call. Safe to invoke before the call has - * started; cancellation is recorded and applied as soon as the stream - * attaches. - */ - public void cancel() { - cancelled.set(true); - ClientCallStreamObserver stream = requestStream.get(); - if (stream != null) { - stream.cancel("client cancelled deploy event stream", null); - } - } - - @Override - public void close() { - cancel(); +public final class DeployEventSubscription + extends MxGatewayStreamSubscription { + public DeployEventSubscription() { + super("client cancelled deploy event stream"); } } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayActiveAlarmsSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayActiveAlarmsSubscription.java index 258212e..c97f44a 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayActiveAlarmsSubscription.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayActiveAlarmsSubscription.java @@ -1,10 +1,6 @@ package com.zb.mom.ww.mxgateway.client; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; @@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; * {@link #cancel()} entry point that aborts the underlying gRPC call. The * subscription also implements {@link AutoCloseable} so it can participate in * try-with-resources blocks. + * + *

All lifecycle / cancellation behaviour is inherited from + * {@link MxGatewayStreamSubscription} (Client.Java-036). */ -public final class MxGatewayActiveAlarmsSubscription implements AutoCloseable { - private final AtomicReference> requestStream = new AtomicReference<>(); - private final AtomicBoolean cancelled = new AtomicBoolean(); - - ClientResponseObserver wrap(StreamObserver observer) { - return new ClientResponseObserver<>() { - @Override - public void beforeStart(ClientCallStreamObserver stream) { - requestStream.set(stream); - if (cancelled.get()) { - stream.cancel("client cancelled active-alarms query", null); - } - } - - @Override - public void onNext(ActiveAlarmSnapshot value) { - observer.onNext(value); - } - - @Override - public void onError(Throwable error) { - observer.onError(error); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - - /** - * Cancels the underlying gRPC call. Safe to invoke before the call has - * started; cancellation is recorded and applied as soon as the stream - * attaches. - */ - public void cancel() { - cancelled.set(true); - ClientCallStreamObserver stream = requestStream.get(); - if (stream != null) { - stream.cancel("client cancelled active-alarms query", null); - } - } - - @Override - public void close() { - cancel(); +public final class MxGatewayActiveAlarmsSubscription + extends MxGatewayStreamSubscription { + public MxGatewayActiveAlarmsSubscription() { + super("client cancelled active-alarms query"); } } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java index 5a582ae..0125b10 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java @@ -1,10 +1,6 @@ package com.zb.mom.ww.mxgateway.client; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; @@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; * {@link #cancel()} entry point that aborts the underlying gRPC call. The * subscription also implements {@link AutoCloseable} so it can participate in * try-with-resources blocks. + * + *

All lifecycle / cancellation behaviour is inherited from + * {@link MxGatewayStreamSubscription} (Client.Java-036). */ -public final class MxGatewayAlarmFeedSubscription implements AutoCloseable { - private final AtomicReference> requestStream = new AtomicReference<>(); - private final AtomicBoolean cancelled = new AtomicBoolean(); - - ClientResponseObserver wrap(StreamObserver observer) { - return new ClientResponseObserver<>() { - @Override - public void beforeStart(ClientCallStreamObserver stream) { - requestStream.set(stream); - if (cancelled.get()) { - stream.cancel("client cancelled alarm feed", null); - } - } - - @Override - public void onNext(AlarmFeedMessage value) { - observer.onNext(value); - } - - @Override - public void onError(Throwable error) { - observer.onError(error); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - - /** - * Cancels the underlying gRPC call. Safe to invoke before the call has - * started; cancellation is recorded and applied as soon as the stream - * attaches. - */ - public void cancel() { - cancelled.set(true); - ClientCallStreamObserver stream = requestStream.get(); - if (stream != null) { - stream.cancel("client cancelled alarm feed", null); - } - } - - @Override - public void close() { - cancel(); +public final class MxGatewayAlarmFeedSubscription + extends MxGatewayStreamSubscription { + public MxGatewayAlarmFeedSubscription() { + super("client cancelled alarm feed"); } } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayEventSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayEventSubscription.java index f1ab2af..ebc60f0 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayEventSubscription.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayEventSubscription.java @@ -1,10 +1,6 @@ package com.zb.mom.ww.mxgateway.client; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicBoolean; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; @@ -15,53 +11,13 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; * {@link #cancel()} entry point that aborts the underlying gRPC call. The * subscription also implements {@link AutoCloseable} so it can participate in * try-with-resources blocks. + * + *

All lifecycle / cancellation behaviour is inherited from + * {@link MxGatewayStreamSubscription} (Client.Java-036). */ -public final class MxGatewayEventSubscription implements AutoCloseable { - private final AtomicReference> requestStream = new AtomicReference<>(); - private final AtomicBoolean cancelled = new AtomicBoolean(); - - ClientResponseObserver wrap(StreamObserver observer) { - return new ClientResponseObserver<>() { - @Override - public void beforeStart(ClientCallStreamObserver stream) { - requestStream.set(stream); - if (cancelled.get()) { - stream.cancel("client cancelled event stream", null); - } - } - - @Override - public void onNext(MxEvent value) { - observer.onNext(value); - } - - @Override - public void onError(Throwable error) { - observer.onError(error); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - - /** - * Cancels the underlying gRPC call. Safe to invoke before the call has - * started; cancellation is recorded and applied as soon as the stream - * attaches. - */ - public void cancel() { - cancelled.set(true); - ClientCallStreamObserver stream = requestStream.get(); - if (stream != null) { - stream.cancel("client cancelled event stream", null); - } - } - - @Override - public void close() { - cancel(); +public final class MxGatewayEventSubscription + extends MxGatewayStreamSubscription { + public MxGatewayEventSubscription() { + super("client cancelled event stream"); } } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscription.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscription.java new file mode 100644 index 0000000..786bb57 --- /dev/null +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscription.java @@ -0,0 +1,89 @@ +package com.zb.mom.ww.mxgateway.client; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Shared base for the cancellable subscription handles returned by the + * async-style server-streaming RPCs ({@code streamEvents}, {@code streamAlarms}, + * {@code queryActiveAlarms}, {@code watchDeployEvents}). + * + *

All four subscription classes share the same lifecycle and cancellation + * contract: + * + *

    + *
  • {@link #wrap(StreamObserver)} returns a {@link ClientResponseObserver} + * that captures the underlying {@link ClientCallStreamObserver} in + * {@code beforeStart}. If {@link #cancel()} was called before the gRPC + * call attached, the stream is cancelled eagerly inside + * {@code beforeStart} (the Client.Java-014 close-before-beforeStart + * fix).
  • + *
  • {@link #cancel()} is idempotent. It records the cancellation flag and + * forwards {@code cancel(message, cause)} to the underlying stream when + * one is attached; otherwise the flag is checked in {@code beforeStart} + * once the stream attaches.
  • + *
  • {@link #close()} delegates to {@link #cancel()} so the handle can be + * used with try-with-resources.
  • + *
+ * + *

Subclasses supply only the cancel-message string used by {@code cancel()}. + * Refactor introduced for Client.Java-036 — the four prior subscription + * classes were structural near-clones (~60 lines each). + */ +abstract class MxGatewayStreamSubscription implements AutoCloseable { + private final AtomicReference> requestStream = new AtomicReference<>(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private final String cancelMessage; + + MxGatewayStreamSubscription(String cancelMessage) { + this.cancelMessage = cancelMessage; + } + + final ClientResponseObserver wrap(StreamObserver observer) { + return new ClientResponseObserver<>() { + @Override + public void beforeStart(ClientCallStreamObserver stream) { + requestStream.set(stream); + if (cancelled.get()) { + stream.cancel(cancelMessage, null); + } + } + + @Override + public void onNext(TResponse value) { + observer.onNext(value); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } + + /** + * Cancels the underlying gRPC call. Safe to invoke before the call has + * started; cancellation is recorded and applied as soon as the stream + * attaches. + */ + public final void cancel() { + cancelled.set(true); + ClientCallStreamObserver stream = requestStream.get(); + if (stream != null) { + stream.cancel(cancelMessage, null); + } + } + + @Override + public final void close() { + cancel(); + } +} diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java index 6b4273a..089fd96 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java @@ -27,7 +27,10 @@ import mxaccess_gateway.v1.MxAccessGatewayGrpc; import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; +import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind; import mxaccess_gateway.v1.MxaccessGateway.BulkSubscribeReply; +import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind; @@ -41,6 +44,7 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.RegisterReply; import mxaccess_gateway.v1.MxaccessGateway.SessionState; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; import org.junit.jupiter.api.Test; @@ -268,6 +272,100 @@ final class MxGatewayClientSessionTests { } } + @Test + void streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages() throws Exception { + AtomicReference streamRequest = new AtomicReference<>(); + CountDownLatch serverCancelled = new CountDownLatch(1); + TestGatewayService service = new TestGatewayService() { + @Override + public void streamAlarms( + StreamAlarmsRequest request, StreamObserver responseObserver) { + streamRequest.set(request); + ServerCallStreamObserver server = + (ServerCallStreamObserver) responseObserver; + server.setOnCancelHandler(serverCancelled::countDown); + // Active-alarm snapshot, snapshot-complete sentinel, then a + // transition — mirrors the shape of a real alarm feed open. + server.onNext(AlarmFeedMessage.newBuilder() + .setActiveAlarm(ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE) + .setSeverity(700)) + .build()); + server.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build()); + server.onNext(AlarmFeedMessage.newBuilder() + .setTransition(OnAlarmTransitionEvent.newBuilder() + .setAlarmFullReference("Tank01.Level.HiHi") + .setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE) + .setSeverity(700)) + .build()); + // Note: we deliberately do NOT call onCompleted() so the call + // remains open for the cancellation assertion below. + } + }; + + try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>()); + MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) { + java.util.List received = new java.util.ArrayList<>(); + AtomicReference errorRef = new AtomicReference<>(); + CountDownLatch threeReceived = new CountDownLatch(3); + + StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder() + .setAlarmFilterPrefix("Tank01") + .build(); + + MxGatewayAlarmFeedSubscription subscription = client.streamAlarms( + request, + new StreamObserver<>() { + @Override + public void onNext(AlarmFeedMessage value) { + received.add(value); + threeReceived.countDown(); + } + + @Override + public void onError(Throwable t) { + errorRef.set(t); + } + + @Override + public void onCompleted() { + } + }); + + assertTrue(threeReceived.await(5, TimeUnit.SECONDS), + "expected three alarm feed messages within 5s"); + + // The request shape (filter prefix in particular) must reach the + // server — proves MxGatewayClient.streamAlarms calls the production + // subscription.wrap(observer) glue and not a CLI override. + assertNotNull(streamRequest.get()); + assertEquals("Tank01", streamRequest.get().getAlarmFilterPrefix()); + + // Order and payload-case must be preserved (the wrapping observer + // is just a pass-through). + assertEquals(3, received.size()); + assertEquals(AlarmFeedMessage.PayloadCase.ACTIVE_ALARM, received.get(0).getPayloadCase()); + assertEquals( + "Tank01.Level.HiHi", + received.get(0).getActiveAlarm().getAlarmFullReference()); + assertEquals(AlarmFeedMessage.PayloadCase.SNAPSHOT_COMPLETE, received.get(1).getPayloadCase()); + assertEquals(AlarmFeedMessage.PayloadCase.TRANSITION, received.get(2).getPayloadCase()); + assertEquals( + AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE, + received.get(2).getTransition().getTransitionKind()); + + // No error expected before cancellation — proves the wrapping + // observer forwarded only data, not a synthetic error. + assertNull(errorRef.get(), "no error expected before cancellation"); + + // Cancellation must propagate to the underlying gRPC call. + subscription.cancel(); + assertTrue(serverCancelled.await(5, TimeUnit.SECONDS), + "server should observe RPC cancellation after subscription.cancel()"); + } + } + @Test void commandFailureKeepsRawReply() throws Exception { TestGatewayService service = new TestGatewayService() { diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscriptionContractTests.java b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscriptionContractTests.java new file mode 100644 index 0000000..f7948e3 --- /dev/null +++ b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscriptionContractTests.java @@ -0,0 +1,275 @@ +package com.zb.mom.ww.mxgateway.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage; +import mxaccess_gateway.v1.MxaccessGateway.MxEvent; +import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; +import org.junit.jupiter.api.Test; + +/** + * Lifecycle / cancellation contract tests applied uniformly to each of the + * four subscription classes that extend {@link MxGatewayStreamSubscription}. + * + *

Locks in the Client.Java-036 refactor: every subclass must exhibit the + * same behaviour for (a) cancel-before-beforeStart eagerly cancelling the + * stream once it attaches, (b) cancel-after-beforeStart forwarding directly + * to the stream, (c) the cancel message matching the subclass's documented + * value, (d) {@code close()} delegating to {@code cancel()}, and (e) the + * wrapping observer forwarding {@code onNext}/{@code onError}/{@code onCompleted} + * to the caller's observer. + */ +final class MxGatewayStreamSubscriptionContractTests { + + @Test + void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_eventSubscription() { + runCancelBeforeBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream"); + } + + @Test + void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_alarmFeedSubscription() { + runCancelBeforeBeforeStartTest( + new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed"); + } + + @Test + void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_activeAlarmsSubscription() { + runCancelBeforeBeforeStartTest( + new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query"); + } + + @Test + void cancelBeforeBeforeStartCancelsStreamWhenItAttaches_deployEventSubscription() { + runCancelBeforeBeforeStartTest( + new DeployEventSubscription(), "client cancelled deploy event stream"); + } + + @Test + void cancelAfterBeforeStartForwardsToStream_eventSubscription() { + runCancelAfterBeforeStartTest(new MxGatewayEventSubscription(), "client cancelled event stream"); + } + + @Test + void cancelAfterBeforeStartForwardsToStream_alarmFeedSubscription() { + runCancelAfterBeforeStartTest( + new MxGatewayAlarmFeedSubscription(), "client cancelled alarm feed"); + } + + @Test + void cancelAfterBeforeStartForwardsToStream_activeAlarmsSubscription() { + runCancelAfterBeforeStartTest( + new MxGatewayActiveAlarmsSubscription(), "client cancelled active-alarms query"); + } + + @Test + void cancelAfterBeforeStartForwardsToStream_deployEventSubscription() { + runCancelAfterBeforeStartTest( + new DeployEventSubscription(), "client cancelled deploy event stream"); + } + + @Test + void closeDelegatesToCancel_eventSubscription() { + runCloseDelegatesToCancelTest(new MxGatewayEventSubscription()); + } + + @Test + void closeDelegatesToCancel_alarmFeedSubscription() { + runCloseDelegatesToCancelTest(new MxGatewayAlarmFeedSubscription()); + } + + @Test + void closeDelegatesToCancel_activeAlarmsSubscription() { + runCloseDelegatesToCancelTest(new MxGatewayActiveAlarmsSubscription()); + } + + @Test + void closeDelegatesToCancel_deployEventSubscription() { + runCloseDelegatesToCancelTest(new DeployEventSubscription()); + } + + @Test + void wrappedObserverForwardsOnNextOnErrorOnCompleted_eventSubscription() { + MxEvent event = MxEvent.newBuilder().setWorkerSequence(7L).build(); + runForwardingTest(new MxGatewayEventSubscription(), event); + } + + @Test + void wrappedObserverForwardsOnNextOnErrorOnCompleted_alarmFeedSubscription() { + AlarmFeedMessage msg = AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build(); + runForwardingTest(new MxGatewayAlarmFeedSubscription(), msg); + } + + @Test + void wrappedObserverForwardsOnNextOnErrorOnCompleted_activeAlarmsSubscription() { + ActiveAlarmSnapshot snap = ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("ref") + .setSeverity(500) + .build(); + runForwardingTest(new MxGatewayActiveAlarmsSubscription(), snap); + } + + @Test + void wrappedObserverForwardsOnNextOnErrorOnCompleted_deployEventSubscription() { + DeployEvent ev = DeployEvent.newBuilder().setSequence(1L).build(); + runForwardingTest(new DeployEventSubscription(), ev); + } + + private static void runCancelBeforeBeforeStartTest( + MxGatewayStreamSubscription subscription, String expectedMessage) { + ClientResponseObserver wrapped = subscription.wrap(new NoopObserver<>()); + RecordingClientCallStreamObserver stream = new RecordingClientCallStreamObserver<>(); + + subscription.cancel(); + wrapped.beforeStart(stream); + + assertTrue(stream.cancelled, "stream should have been cancelled by beforeStart after prior cancel()"); + assertEquals(expectedMessage, stream.cancelMessage); + } + + private static void runCancelAfterBeforeStartTest( + MxGatewayStreamSubscription subscription, String expectedMessage) { + ClientResponseObserver wrapped = subscription.wrap(new NoopObserver<>()); + RecordingClientCallStreamObserver stream = new RecordingClientCallStreamObserver<>(); + + wrapped.beforeStart(stream); + assertFalse(stream.cancelled, "stream should not be cancelled before cancel() is called"); + subscription.cancel(); + + assertTrue(stream.cancelled, "stream should have been cancelled by direct cancel()"); + assertEquals(expectedMessage, stream.cancelMessage); + } + + private static void runCloseDelegatesToCancelTest( + MxGatewayStreamSubscription subscription) { + ClientResponseObserver wrapped = subscription.wrap(new NoopObserver<>()); + RecordingClientCallStreamObserver stream = new RecordingClientCallStreamObserver<>(); + + wrapped.beforeStart(stream); + subscription.close(); + + assertTrue(stream.cancelled, "close() should delegate to cancel()"); + } + + private static void runForwardingTest( + MxGatewayStreamSubscription subscription, Resp value) { + List received = new ArrayList<>(); + AtomicReference errorRef = new AtomicReference<>(); + AtomicReference completed = new AtomicReference<>(false); + + StreamObserver caller = new StreamObserver<>() { + @Override + public void onNext(Resp v) { + received.add(v); + } + + @Override + public void onError(Throwable t) { + errorRef.set(t); + } + + @Override + public void onCompleted() { + completed.set(true); + } + }; + + ClientResponseObserver wrapped = subscription.wrap(caller); + RecordingClientCallStreamObserver stream = new RecordingClientCallStreamObserver<>(); + wrapped.beforeStart(stream); + + wrapped.onNext(value); + IllegalStateException boom = new IllegalStateException("boom"); + wrapped.onError(boom); + wrapped.onCompleted(); + + assertEquals(1, received.size()); + assertEquals(value, received.get(0)); + assertNotNull(errorRef.get()); + assertEquals(boom, errorRef.get()); + assertTrue(completed.get()); + } + + private static final class NoopObserver implements StreamObserver { + @Override + public void onNext(T value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } + + private static final class RecordingClientCallStreamObserver extends ClientCallStreamObserver { + boolean cancelled; + String cancelMessage; + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + } + + @Override + public void disableAutoInboundFlowControl() { + } + + @Override + public void request(int count) { + } + + @Override + public void setMessageCompression(boolean enable) { + } + + @Override + public void cancel(String message, Throwable cause) { + cancelled = true; + cancelMessage = message; + } + + @Override + public void onNext(T value) { + } + + @Override + public void onError(Throwable error) { + } + + @Override + public void onCompleted() { + } + } + + // Compile-time guarantee that the parameter types still match the + // generic bounds — catches a regression where a subclass changes its + // request/response types out from under the shared base. + @SuppressWarnings("unused") + private static void typeBoundsCheck() { + MxGatewayStreamSubscription a = new MxGatewayEventSubscription(); + MxGatewayStreamSubscription b = new MxGatewayAlarmFeedSubscription(); + MxGatewayStreamSubscription c = + new MxGatewayActiveAlarmsSubscription(); + MxGatewayStreamSubscription d = new DeployEventSubscription(); + } +} diff --git a/code-reviews/Client.Java/findings.md b/code-reviews/Client.Java/findings.md index 8184304..f63bb8a 100644 --- a/code-reviews/Client.Java/findings.md +++ b/code-reviews/Client.Java/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-24 | | Commit reviewed | `42b0037` | | Status | Re-reviewed | -| Open findings | 5 | +| Open findings | 0 | ## Checklist coverage @@ -551,7 +551,7 @@ Client.Java-001..031 are unchanged. | Severity | High | | Category | Documentation & comments | | Location | `clients/java/README.md:182-183` | -| Status | Open | +| Status | Resolved | **Description:** Commit `8738735` ("clients: document StreamAlarms + AcknowledgeAlarm in each README") added two new gradle invocations to the CLI Usage block: @@ -569,6 +569,8 @@ A user copying either invocation from the README hits a picocli parse error imme **Recommendation:** Drop the `--session-id ` token from both documented invocations, and change `--alarm-reference` to `--reference` in the `acknowledge-alarm` line. Optionally also add `--filter-prefix` to the `stream-alarms` example so readers see the scoping option, and align README option names with the actual CLI by either renaming the CLI option `--reference` → `--alarm-reference` (matches the proto `alarm_full_reference` field semantically) or leaving as is and only fixing the README. Add a small `MxGatewayCliTests` parse-only assertion for both subcommands that exercises every option flag to prevent the same drift the next time the CLI surface or README is touched. +**Resolution:** 2026-05-24 — Confirmed root cause against `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1174-1182,1248-1258`: `StreamAlarmsCommand` exposes only `--filter-prefix` / `--limit` and `AcknowledgeAlarmCommand` exposes `--reference` / `--comment` / `--operator` — neither has a `--session-id` option and `acknowledge-alarm` has no `--alarm-reference` option, so both documented invocations failed picocli parse at the first unknown option. Fixed `clients/java/README.md:182-183` by dropping the `--session-id ` token from both lines, replacing it with `--filter-prefix Galaxy` on the `stream-alarms` example so readers see the actual scoping flag, and changing `--alarm-reference` to `--reference` on the `acknowledge-alarm` example. Added `MxGatewayCli.commandLine(...)` to package-private visibility (was `private`) so the test can drive the production picocli `CommandLine` directly without executing the command body. Regression tests in `MxGatewayCliTests`: `readmeDocumentedStreamAlarmsExampleParsesCleanly` and `readmeDocumentedAcknowledgeAlarmExampleParsesCleanly` pin the exact token list documented in the README and assert `commandLine.parseArgs(...)` returns without throwing a `picocli.CommandLine.ParameterException`. TDD red phase: before the README fix the previously-documented tokens (`--session-id ` + `--alarm-reference ...`) would have thrown `Unknown option: '--session-id'` / `Unknown option: '--alarm-reference'` at parse time; the new tests pass against the corrected README and would fail the next time someone drifts the documented surface from the actual CLI options. + ### Client.Java-033 | Field | Value | @@ -576,7 +578,7 @@ A user copying either invocation from the README hits a picocli parse error imme | Severity | Medium | | Category | Correctness & logic bugs | | Location | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:1078-1098` | -| Status | Open | +| Status | Resolved | **Description:** `StreamAlarmsCommand.call()` allocates a bounded `ArrayBlockingQueue(1024)` and the gRPC observer publishes each `AlarmFeedMessage` via `queue.offer(value)`: @@ -594,6 +596,8 @@ The library-side `MxEventStream` (Client.Java-002 resolution) and `DeployEventSt **Recommendation:** Either (a) wrap the gRPC observer in the existing `MxEventStream`-style adaptor that calls `subscription.cancel()` and queues an exception on `queue.offer` returning `false`, then surface that exception from the drain loop — mirroring `MxEventStream.observer().onNext`'s overflow branch; or (b) reuse the library-side fail-fast plumbing by promoting `MxEventStream` (or extracting its terminal-state base) into a public `MxAlarmFeedStream` and have `MxGatewayClient.streamAlarms` return that instead of a bare subscription handle. Option (b) lines up with Client.Java-036 (deduplicate the subscription class family). Add a CLI regression test that overflows the bounded queue and asserts a non-zero exit / overflow exception, mirroring `MxGatewayMediumFindingsTests.eventStreamOverflowExceptionSurvivesASubsequentClose`. +**Resolution:** 2026-05-24 — Confirmed root cause at `MxGatewayCli.java` `StreamAlarmsCommand.call()`: the observer's `onNext` did `queue.offer(value)` and ignored the boolean return, so a 1024-element queue would silently drop messages past capacity. The same silent-drop affected the `onCompleted` branch (which `offer`s `ALARM_FEED_END`) once the queue was full, deadlocking the consumer since the drain loop never sees END. Took option (a) — minimal change that matches `MxEventStream`'s overflow branch. The fix: detect a failed `offer` inside `onNext`, call `subscription.cancel()` (via an `AtomicReference` published immediately after `client.streamAlarms` returns), `queue.clear()`, then `queue.offer(IllegalStateException("stream-alarms queue overflowed (capacity 1024); consumer too slow"))` followed by `queue.offer(ALARM_FEED_END)`. The existing drain-loop `Throwable`-branch then surfaces the overflow as a thrown `IllegalStateException` from `call()`, which picocli reports as a non-zero CLI exit. Option (b) (promoting `MxEventStream` to a public alarm-feed stream) was considered and rejected for this change — it would change the public SDK surface; Client.Java-036's refactor handles deduplication at the subscription layer instead. Regression test: `MxGatewayCliTests.streamAlarmsCommandFailsFastOnQueueOverflow` — drives an `OverflowingFakeClient` whose `streamAlarms` synchronously pushes 2000 messages to the observer (exceeding the 1024 buffer), then asserts `run.exitCode() != 0`. TDD red phase confirmed deterministically: before the fix the test deadlocked (the buggy `offer` silently dropped both the overflowing alarms AND the `ALARM_FEED_END` sentinel that arrived after the queue filled, so the drain loop's `queue.take()` blocked forever); the background gradle run had to be killed with `TaskStop`. After the fix the same test exits in <1 second with the overflow exception propagating through picocli. + ### Client.Java-034 | Field | Value | @@ -601,7 +605,7 @@ The library-side `MxEventStream` (Client.Java-002 resolution) and `DeployEventSt | Severity | Medium | | Category | Correctness & logic bugs | | Location | `clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java:182-198` | -| Status | Open | +| Status | Resolved | **Description:** `BatchCommand.call()` reads one CLI invocation per stdin line and tokenises with: @@ -622,6 +626,8 @@ The current `MxGatewayCliTests` test set (`batchCommandExecutesVersionAndEmitsEo **Recommendation:** Replace `line.trim().split("\\s+")` with a real shell-style tokeniser that honours single and double quotes and backslash escapes — `picocli.CommandLine.ArgumentParser` doesn't ship one, but Apache Commons Exec's `CommandLine.translateCommandline(String)`, JDK 21's `java.util.spi.ToolProvider` argument parsing, or a small hand-written state machine all work. Cross-check the .NET / Go / Rust / Python `batch` implementations in the same change so all five clients use the same tokenisation; document the contract in the protocol comment in `MxGatewayCli.java` and in `scripts/run-client-e2e-tests.ps1`. Add a CLI test that feeds `acknowledge-alarm --comment "with spaces"` through `batch` and asserts the `--comment` value reaches the gateway as `"with spaces"`. +**Resolution:** 2026-05-24 — Confirmed root cause: `BatchCommand.call()` at the per-line loop used `line.trim().split("\\s+")` which has no quote handling. Replaced with a new package-private `MxGatewayCli.tokenizeBatchLine(String)` static helper — a hand-rolled POSIX-style shell tokenizer (no new dependency added) that honours: (a) double-quoted runs `"..."` with `\\`, `\"`, and `\n` escapes inside; (b) single-quoted runs `'...'` taken literally with no escapes (POSIX rule); (c) backslash escapes for any single character outside quotes (so `needs\ verification` is one token); (d) whitespace runs outside quotes separate tokens; (e) explicit `IllegalArgumentException` on unterminated quote or trailing backslash so the batch loop surfaces it as a JSON error instead of emitting wrong args. The `BatchCommand` per-line tokenisation now calls `tokenizeBatchLine(line)` and treats an empty-array result as a blank line (skip). Behaviour for whitespace-only input is unchanged. The cross-client `batch` audit (.NET / Go / Rust / Python) is out of scope for this Java-focused finding and tracked separately. Regression tests in `MxGatewayCliTests`: (a) `batchCommandTokenisesDoubleQuotedArgumentWithEmbeddedSpaces` — `--comment "needs verification"` round-trips intact; (b) `batchCommandTokenisesSingleQuotedArgumentWithEmbeddedSpaces` — single-quoted variant; (c) `batchCommandTokenisesBackslashEscapedSpaceOutsideQuotes` — `needs\ verification` outside quotes; (d) `batchCommandPreservesEmptyQuotedArgument` — `""` parses to an empty-string argument; (e) `batchCommandSupportsBackslashEscapedQuoteInsideDoubleQuotes` — `\"inner\"` survives the inner quotes. TDD red phase confirmed: all five tests failed against the original `split("\\s+")` implementation; after the fix all five pass. + ### Client.Java-035 | Field | Value | @@ -629,7 +635,7 @@ The current `MxGatewayCliTests` test set (`batchCommandExecutesVersionAndEmitsEo | Severity | Low | | Category | Testing coverage | | Location | `clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/MxGatewayClientSessionTests.java` | -| Status | Open | +| Status | Resolved | **Description:** Commit `8a0c59d` added `MxGatewayClient.streamAlarms(StreamAlarmsRequest, StreamObserver)` and a new public `MxGatewayAlarmFeedSubscription` class. No library-side test exercises either: a grep for `streamAlarms` across `zb-mom-ww-mxgateway-client/src/test/...` returns zero matches. The CLI tests (`MxGatewayCliTests.streamAlarmsCommand*`) exercise the path end-to-end, but they route through a `FakeClient.streamAlarms` override that bypasses the production `subscription.wrap(observer)` glue and the `withStreamDeadline(rawAsyncStub()).streamAlarms(...)` call. A regression to either — forgetting `.wrap(observer)`, dropping the deadline interceptor, misnaming the request — would compile and pass the CLI tests but break against a real gateway. @@ -637,6 +643,8 @@ This is the same coverage gap pattern as Client.Java-030 (no fixture test for `Q **Recommendation:** Add `streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages` to `MxGatewayClientSessionTests` (in-process gRPC via the existing `InProcessGateway` + `TestGatewayService` fixture): override `TestGatewayService.streamAlarms` to capture the inbound `StreamAlarmsRequest` and emit one `active_alarm` snapshot, one `snapshot_complete`, and one `transition`, then complete. Call `MxGatewayClient.streamAlarms`, drain the observer via a `CountDownLatch`, and assert (a) the server observed the `alarm_filter_prefix`, (b) all three messages arrived in order with the expected payload-case, and (c) `MxGatewayAlarmFeedSubscription.cancel()` aborts the call (latch via `ServerCallStreamObserver.setOnCancelHandler`, mirroring the Client.Java-015 cancellation regression). Optionally also cover the cancel-before-beforeStart race that `MxGatewayAlarmFeedSubscription.wrap` handles, mirroring `mxEventStreamCloseBeforeBeforeStartCancelsStream`. +**Resolution:** 2026-05-24 — Confirmed the coverage gap: a grep across `zb-mom-ww-mxgateway-client/src/test/...` for `streamAlarms` returned zero matches; the CLI-only test routed through `FakeClient.streamAlarms` which bypassed both the production `subscription.wrap(observer)` and the `withStreamDeadline(rawAsyncStub()).streamAlarms(...)` gRPC call. Added `streamAlarmsForwardsRequestAndStreamsAlarmFeedMessages` to `MxGatewayClientSessionTests` in the same shape as `queryActiveAlarmsForwardsRequestAndStreamsSnapshots` (Client.Java-030 resolved this way). The test overrides `TestGatewayService.streamAlarms` to capture the inbound `StreamAlarmsRequest`, register a `serverCancelled` latch via `(ServerCallStreamObserver) responseObserver).setOnCancelHandler(...)`, then emit three messages: an `active_alarm` snapshot, a `snapshot_complete` sentinel, and a `transition`. It deliberately does NOT call `onCompleted()` so the call remains open for the cancellation assertion. The test then calls `MxGatewayClient.streamAlarms` against the in-process gateway, drains the wrapped observer via a `threeReceived` `CountDownLatch`, and asserts (a) the server observed `alarm_filter_prefix=Tank01`, (b) all three messages arrived in order with the expected payload-case (`ACTIVE_ALARM`, `SNAPSHOT_COMPLETE`, `TRANSITION`) and payload values (`Tank01.Level.HiHi`, transition kind `ACKNOWLEDGE`), and (c) `subscription.cancel()` causes the server's on-cancel handler to fire within 5 s (proves cancellation propagates through the production `subscription.wrap(observer)` glue, not just the CLI fake). TDD red phase: temporarily replaced the production `MxGatewayClient.streamAlarms` body with `withStreamDeadline(rawAsyncStub()).streamAlarms(request, observer);` (dropping the `subscription.wrap(observer)` indirection); the test failed at the `serverCancelled.await` assertion because cancellation was no longer wired to the underlying gRPC call. Restoring the production glue turned the build green. + ### Client.Java-036 | Field | Value | @@ -644,7 +652,7 @@ This is the same coverage gap pattern as Client.Java-030 (no fixture test for `Q | Severity | Low | | Category | Code organization & conventions | | Location | `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayAlarmFeedSubscription.java`, `MxGatewayEventSubscription.java`, `MxGatewayActiveAlarmsSubscription.java`, `DeployEventSubscription.java` | -| Status | Open | +| Status | Resolved | **Description:** `MxGatewayAlarmFeedSubscription` is a structural near-copy of `MxGatewayEventSubscription` — same `AtomicReference>` + `AtomicBoolean cancelled` field shape, the same `wrap(observer)` returning a `ClientResponseObserver` that stores `requestStream` in `beforeStart`, the same close-before-beforeStart race handling that Client.Java-014 originally fixed for `MxEventStream`, and the same `cancel()`+`close()` idempotency contract. The four subscription classes (`MxGatewayEventSubscription`, `MxGatewayActiveAlarmsSubscription`, `MxGatewayAlarmFeedSubscription`, `DeployEventSubscription`) are now ~60-line near-clones differing only in the request/response generic parameters and the `cancel` message string. @@ -652,4 +660,6 @@ This is the same maintenance-hazard pattern Client.Java-009 / Client.Java-016 id **Recommendation:** Extract a package-private abstract base, e.g. `MxGatewayStreamSubscription`, holding the `AtomicReference` / `AtomicBoolean` pair, the `cancel()` / `close()` implementation, and a `ClientResponseObserver` factory parameterised by the cancel-message string and the response observer. Have all four subscription classes extend it. Behaviour-only refactor — no public API change, existing tests cover the contract. +**Resolution:** 2026-05-24 — Extracted a package-private abstract base `MxGatewayStreamSubscription implements AutoCloseable` (new file `clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewayStreamSubscription.java`). It holds the shared `AtomicReference>` and `AtomicBoolean cancelled` pair, the `wrap(StreamObserver)` factory that returns a `ClientResponseObserver` with the Client.Java-014 close-before-beforeStart fix baked in, the `cancel()` / `close()` implementation, and an immutable `cancelMessage` injected by the subclass constructor. The four prior 60-line near-clones (`MxGatewayEventSubscription`, `MxGatewayAlarmFeedSubscription`, `MxGatewayActiveAlarmsSubscription`, `DeployEventSubscription`) collapse to ~10-line subclasses that only declare their `` type parameters and supply the cancel-message string to `super(...)`. Public API surface is preserved: each subclass remains a `public final class` with a public no-arg constructor (the constructor was implicit on the original classes; I made it explicit `public` on the subclasses so the existing CLI `FakeClient.streamAlarms` in a different package can still `new MxGatewayAlarmFeedSubscription()`). The `wrap(...)` method is `final` and package-private on the base — same accessibility the four subclasses had before — so production callers in `MxGatewayClient`/`GalaxyRepositoryClient` see no change. New test file `MxGatewayStreamSubscriptionContractTests` exercises the lifecycle/cancellation contract identically across all four subclasses (16 tests, four per scenario): (a) cancel-before-beforeStart eagerly cancels the stream once it attaches with the subclass-specific message, (b) cancel-after-beforeStart forwards directly to the stream, (c) `close()` delegates to `cancel()`, (d) the wrapped observer forwards `onNext`/`onError`/`onCompleted` verbatim, and a compile-time `typeBoundsCheck` helper that asserts each subclass still binds its `` parameters to the right proto types. TDD red phase confirmed: temporarily breaking one subclass's `super(...)` message to `"BROKEN MESSAGE"` made the contract test for that subclass fail with `expected: but was: `; restoring the correct value turned all 16 contract tests green. Future fixes to the shared lifecycle now live in one place — the next Client.Java-014/021-style race fix cannot drift across the four classes. +