fix: resolve Client.Java + Worker.Tests findings (pending windev verification)

Client.Java-040..048, Worker.Tests-034/035/036. Edits applied on the Mac,
which has no JRE and cannot build the x86+MXAccess worker tests; findings are
marked In Progress pending gradle + x86 build verification on windev. Do not
mark Resolved until verified there.
This commit is contained in:
Joseph Doherty
2026-06-17 05:23:14 -04:00
parent 6b5fe6aa82
commit 8df0479b99
9 changed files with 252 additions and 116 deletions
@@ -39,7 +39,10 @@ import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
@@ -105,8 +108,14 @@ public final class MxGatewayCli implements Callable<Integer> {
}
/**
* Test-friendly entry point that runs the CLI against the supplied
* {@link PrintWriter} pair instead of the system streams.
* Entry point that runs the CLI against the supplied {@link PrintWriter}
* pair instead of the system streams. This overload wires the production
* {@link GrpcMxGatewayCliClientFactory} (a real gRPC channel), so it is
* suitable for embedding the CLI but not for unit tests that need to stub
* the gateway. Tests should use the package-private
* {@link #execute(MxGatewayCliClientFactory, PrintWriter, PrintWriter, String...)}
* / {@link #commandLine(MxGatewayCliClientFactory)} overloads, which accept
* an injectable client factory.
*
* @param out writer that receives standard output
* @param err writer that receives standard error
@@ -1536,50 +1545,74 @@ public final class MxGatewayCli implements Callable<Integer> {
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix(filterPrefix)
.build();
// Client.Java-033 — fail-fast on overflow. A bare
// queue.offer(value) silently drops messages past capacity,
// which violates the JavaStyleGuide "do not drop events"
// contract and lets the CLI exit 0 on a truncated feed.
// Mirrors MxEventStream's overflow branch: detect a failed
// offer, cancel the subscription, drain the buffer, then
// queue an explicit overflow exception followed by the END
// sentinel so the drain loop surfaces a non-zero exit.
// Client.Java-033/040/042 — fail-fast on overflow and on
// transport errors. A bare queue.offer(value) silently drops
// messages past capacity (violating the JavaStyleGuide "do not
// drop events" contract and letting the CLI exit 0 on a
// truncated feed), and a bare queue.offer(error) on a full
// queue would drop the terminal item and deadlock the drain on
// queue.take().
//
// Terminal transitions (overflow, transport error, clean
// completion) are now serialised through a single AtomicBoolean
// guard plus a dedicated `terminal` slot rather than
// re-clearing the shared queue. The first terminal condition
// wins; a concurrent onNext on the gRPC I/O thread can no
// longer displace it (Client.Java-040). The drain reads the
// terminal slot independently of the bounded queue, so a full
// queue can never strand the terminal item (Client.Java-042).
AtomicReference<MxGatewayAlarmFeedSubscription> subscriptionRef = new AtomicReference<>();
AtomicBoolean terminated = new AtomicBoolean();
AtomicReference<Object> terminal = new AtomicReference<>();
Consumer<Object> terminate = item -> {
if (terminated.compareAndSet(false, true)) {
terminal.set(item);
MxGatewayAlarmFeedSubscription sub = subscriptionRef.get();
if (sub != null) {
sub.cancel();
}
}
};
MxGatewayAlarmFeedSubscription subscription =
client.streamAlarms(request, new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
if (terminated.get()) {
return;
}
if (!queue.offer(value)) {
MxGatewayAlarmFeedSubscription sub = subscriptionRef.get();
if (sub != null) {
sub.cancel();
}
queue.clear();
queue.offer(new IllegalStateException(
terminate.accept(new IllegalStateException(
"stream-alarms queue overflowed (capacity 1024); consumer too slow"));
queue.offer(ALARM_FEED_END);
}
}
@Override
public void onError(Throwable error) {
queue.offer(error);
terminate.accept(error);
}
@Override
public void onCompleted() {
queue.offer(ALARM_FEED_END);
terminate.accept(ALARM_FEED_END);
}
});
subscriptionRef.set(subscription);
try {
int count = 0;
while (true) {
Object item = queue.take();
if (item == ALARM_FEED_END) {
break;
}
if (item instanceof Throwable error) {
// Poll with a short timeout so the dedicated terminal
// slot is observed even when the bounded queue is full
// of normal messages the consumer has not yet drained.
Object item = queue.poll(50, TimeUnit.MILLISECONDS);
if (item == null) {
Object end = terminal.get();
if (end == null) {
continue;
}
if (end == ALARM_FEED_END) {
break;
}
Throwable error = (Throwable) end;
throw new IllegalStateException(
"gateway stream alarms failed: " + error.getMessage(), error);
}
@@ -2184,13 +2217,36 @@ public final class MxGatewayCli implements Callable<Integer> {
return jsonString(value.toString());
}
private static String jsonString(String value) {
return '"'
+ value.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\r", "\\r")
.replace("\n", "\\n")
+ '"';
// Package-private for the Client.Java-041 escaping regression test.
static String jsonString(String value) {
// RFC 8259 requires the two-character escapes for the named control
// characters and \u00XX escapes for the remaining U+0000U+001F (and
// U+007F) range. The old implementation escaped only \\ \" \r \n, so a
// value containing a tab, backspace, form-feed, or any other control
// character produced malformed JSON (Client.Java-041).
StringBuilder builder = new StringBuilder(value.length() + 2);
builder.append('"');
for (int i = 0; i < value.length(); i++) {
char c = value.charAt(i);
switch (c) {
case '\\' -> builder.append("\\\\");
case '"' -> builder.append("\\\"");
case '\r' -> builder.append("\\r");
case '\n' -> builder.append("\\n");
case '\t' -> builder.append("\\t");
case '\b' -> builder.append("\\b");
case '\f' -> builder.append("\\f");
default -> {
if (c < 0x20 || c == 0x7f) {
builder.append(String.format("\\u%04x", (int) c));
} else {
builder.append(c);
}
}
}
}
builder.append('"');
return builder.toString();
}
private record RawJson(String value) {
@@ -46,6 +46,22 @@ import mxaccess_gateway.v1.MxaccessGateway.SessionState;
* instance uses a unique server name so harnesses do not collide. The
* {@code directExecutor()} wiring keeps all dispatch on the calling thread, so
* no background threads are leaked.
*
* <p><strong>Implemented RPCs.</strong> The scripted services override only the
* RPCs the CLI tests currently exercise:
*
* <ul>
* <li>{@code MxAccessGateway}: {@code streamEvents}, {@code closeSession}.</li>
* <li>{@code GalaxyRepository}: {@code discoverHierarchy},
* {@code watchDeployEvents}.</li>
* </ul>
*
* Every other RPC (e.g. {@code openSession}, {@code invoke}, {@code register},
* {@code streamAlarms}, {@code queryActiveAlarms}, {@code browseChildren}) is
* left at the generated {@code *ImplBase} default and therefore returns gRPC
* {@code UNIMPLEMENTED} by design. A future test that needs one of those paths
* must add the corresponding scripted override here first — otherwise the call
* fails with {@code UNIMPLEMENTED} rather than the behaviour under test.
*/
final class InProcessGatewayHarness implements AutoCloseable {
private final String serverName;
@@ -56,17 +56,37 @@ final class MxGatewayCliTests {
assertEquals(0, run.exitCode());
assertEquals("", run.errors());
assertTrue(run.output().contains("mxgateway-java 0.1.0"));
assertTrue(run.output().contains("mxgateway-java 0.1.1"));
assertTrue(run.output().contains("gatewayProtocolVersion=3"));
assertTrue(run.output().contains("workerProtocolVersion=1"));
}
@Test
void jsonStringEscapesControlCharacters() {
// Client.Java-041 — the hand-rolled jsonString escaped only \\ \" \r \n,
// so a tab/backspace/form-feed or any other control char produced
// malformed JSON (RFC 8259). After the fix the named control chars use
// their two-character escapes and the rest use \u00XX.
assertEquals("\"a\\tb\"", MxGatewayCli.jsonString("a\tb"));
assertEquals("\"a\\bb\"", MxGatewayCli.jsonString("a\bb"));
assertEquals("\"a\\fb\"", MxGatewayCli.jsonString("a\fb"));
assertEquals("\"a\\rb\"", MxGatewayCli.jsonString("a\rb"));
assertEquals("\"a\\nb\"", MxGatewayCli.jsonString("a\nb"));
// A non-named control character (U+0001) must become .
assertEquals("\"a\\u0001b\"", MxGatewayCli.jsonString("ab"));
// DEL (U+007F) is also escaped.
assertEquals("\"a\\u007fb\"", MxGatewayCli.jsonString("ab"));
// Quote and backslash still escape; ordinary printable text is verbatim.
assertEquals("\"a\\\"\\\\b\"", MxGatewayCli.jsonString("a\"\\b"));
assertEquals("\"plain\"", MxGatewayCli.jsonString("plain"));
}
@Test
void versionCommandPrintsJson() {
CliRun run = execute(new FakeClientFactory(), "version", "--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"clientVersion\":\"0.1.0\""));
assertTrue(run.output().contains("\"clientVersion\":\"0.1.1\""));
assertTrue(run.output().contains("\"gatewayProtocolVersion\":3"));
}
@@ -241,27 +261,20 @@ final class MxGatewayCliTests {
void galaxyBrowseParentZeroEmitsWarningToStderr() {
// --parent 0 is the server sentinel for roots; passing it explicitly is
// almost certainly a mistake. The CLI must print a warning to stderr
// (matching Go/Rust client behaviour) but must still attempt the call
// (exit behaviour depends on gateway reachability, not tested here;
// we only assert the warning path is triggered by checking the error
// writer before any gRPC connection is attempted).
// (matching Go/Rust client behaviour) but must still attempt the call.
//
// GalaxyBrowseCommand connects to a real GalaxyRepositoryClient, so the
// call() body will throw after printing the warning when no gateway is
// reachable. We only assert the warning appears on stderr.
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
// Non-zero exit is expected (no live gateway), but the warning must
// appear on stderr regardless of what happens next.
MxGatewayCli.execute(
new FakeClientFactory(),
new PrintWriter(output, true),
new PrintWriter(errors, true),
// GalaxyBrowseCommand prints the warning, then calls connect() on the
// GalaxyClientFactory. We inject a stub factory whose connect() throws,
// so only the warning path runs — no live Netty channel to localhost is
// constructed (Client.Java-043). The warning is emitted before
// connect() is reached, so it appears on stderr regardless.
CliRun run = executeGalaxy(
new ThrowingGalaxyClientFactory(),
"galaxy-browse", "--parent", "0", "--depth", "1");
assertTrue(
errors.toString().contains("--parent 0"),
"expected '--parent 0' warning on stderr; got: " + errors);
run.errors().contains("--parent 0"),
"expected '--parent 0' warning on stderr; got: " + run.errors());
}
// ---- galaxy command-name aliases (D9-java) ----
@@ -678,21 +691,28 @@ final class MxGatewayCliTests {
@Test
void streamAlarmsCommandFailsFastOnQueueOverflow() {
// Client.Java-033 regression — the CLI's stream-alarms bounded queue
// used queue.offer(value) which silently dropped messages past
// capacity (1024). After the fix the CLI must surface the overflow
// as a non-zero exit (mirroring MxEventStream's fail-fast contract).
// Client.Java-033/040/046 regression — the CLI's stream-alarms bounded
// queue used queue.offer(value) which silently dropped messages past
// capacity (1024). After the fix the CLI must surface the overflow as a
// non-zero exit (mirroring MxEventStream's fail-fast contract).
//
// The OverflowingFakeClient floods the gRPC observer with 2000
// messages synchronously, which exceeds the bounded 1024-element
// queue. The fix detects the failed offer, cancels the subscription,
// queues an overflow exception, and the drain loop surfaces it.
// The OverflowingFakeClient floods the gRPC observer on a BACKGROUND
// thread so the subscription is already published when the overflow
// fires — exercising the terminate() cancel path with a non-null
// subscription (Client.Java-046), not just the synchronous-flood path
// where subscriptionRef is still null. The fix records the overflow in
// a dedicated terminal slot (no queue.clear, Client.Java-040) and the
// drain loop surfaces it with the overflow message text.
OverflowingFakeClientFactory factory = new OverflowingFakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Flood");
assertFalse(run.exitCode() == 0,
"expected non-zero exit when the alarm queue overflows; got exit=" + run.exitCode()
+ " out=\n" + run.output() + "\nerr=\n" + run.errors());
assertTrue(
run.errors().contains("queue overflowed") || run.output().contains("queue overflowed"),
"expected the overflow message text to surface; out=\n" + run.output()
+ "\nerr=\n" + run.errors());
}
@Test
@@ -1050,6 +1070,18 @@ final class MxGatewayCliTests {
}
}
/**
* Galaxy client factory whose {@code connect} throws, so a test can exercise
* a command's pre-connect path (e.g. the {@code --parent 0} warning) without
* constructing a live Netty channel to localhost (Client.Java-043).
*/
private static final class ThrowingGalaxyClientFactory implements MxGatewayCli.GalaxyClientFactory {
@Override
public com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient connect(MxGatewayClientOptions options) {
throw new IllegalStateException("galaxy connect not available in this test");
}
}
private static final class OverflowingFakeClient implements MxGatewayCli.MxGatewayCliClient {
private final PrintWriter out;
@@ -1089,19 +1121,31 @@ final class MxGatewayCliTests {
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
// Synchronously push 2000 messages to overflow the CLI's bounded
// 1024-element queue. The CLI must surface the overflow rather
// than silently dropping the trailing ~976 messages.
for (int i = 0; i < 2000; i++) {
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Flood." + i)
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
}
observer.onCompleted();
return new MxGatewayAlarmFeedSubscription();
// Push messages on a BACKGROUND thread (mirroring real gRPC, which
// delivers onNext on a netty I/O thread) so the CLI's
// subscriptionRef is already published when the overflow fires —
// this exercises the terminate() cancel path with a non-null
// subscription (Client.Java-046), unlike a synchronous flood that
// overflows before streamAlarms even returns. Keeps pushing until
// it observes the CLI cancelling the subscription on overflow, so
// no fixed message count is needed and the thread always exits.
MxGatewayAlarmFeedSubscription subscription = new MxGatewayAlarmFeedSubscription();
Thread flood = new Thread(() -> {
int i = 0;
while (!Thread.currentThread().isInterrupted() && i < 100_000) {
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Flood." + i)
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
i++;
}
observer.onCompleted();
}, "overflowing-fake-alarm-feed");
flood.setDaemon(true);
flood.start();
return subscription;
}
@Override
@@ -9,7 +9,7 @@ package com.zb.mom.ww.mxgateway.client;
public final class MxGatewayClientVersion {
private static final int GATEWAY_PROTOCOL_VERSION = 3;
private static final int WORKER_PROTOCOL_VERSION = 1;
private static final String CLIENT_VERSION = "0.1.0";
private static final String CLIENT_VERSION = "0.1.1";
private MxGatewayClientVersion() {
}