From 6eb9ea9105036766db626362b7284527732f032d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 18 May 2026 22:42:51 -0400 Subject: [PATCH] Resolve Client.Java-006..012 code-review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Client.Java-006: close() on both clients only called shutdown(). It now awaits termination up to the connect timeout and shutdownNow()s on timeout. Client.Java-007: added MxGatewayLowFindingsTests covering the alarm surface, async streaming, MxEventStream overflow, and TLS channel construction. A latent bug surfaced: a missing CA file throws IllegalArgumentException, not SSLException — the channel-builder catch was broadened accordingly. Client.Java-008: async thenApply sites now route stray RuntimeExceptions through MxGatewayErrors.fromGrpc via a normalising validator. Client.Java-009: extracted ~80 duplicated lines (createChannel, withDeadline, toCompletable, ...) into a shared MxGatewayChannels; both clients delegate. Client.Java-010 (re-triaged): the README's metadata:read scope was correct; the acknowledgeAlarm Javadoc's invoke:alarm-ack was wrong — corrected to the admin scope. Client.Java-011: documented the intentional fail-fast event-stream backpressure in Javadoc and the README. Client.Java-012: replaced CommonOptions.resolved()'s mutate-and-return-this with side-effect-free resolvedApiKey()/resolvedTimeout() accessors. Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/java/README.md | 17 +- .../mxgateway/cli/MxGatewayCli.java | 49 +- .../client/GalaxyRepositoryClient.java | 147 ++--- .../mxgateway/client/MxEventStream.java | 12 + .../mxgateway/client/MxGatewayChannels.java | 164 ++++++ .../mxgateway/client/MxGatewayClient.java | 141 ++--- .../client/MxGatewayLowFindingsTests.java | 503 ++++++++++++++++++ code-reviews/Client.Java/findings.md | 30 +- 8 files changed, 846 insertions(+), 217 deletions(-) create mode 100644 clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayChannels.java create mode 100644 clients/java/mxgateway-client/src/test/java/com/dohertylan/mxgateway/client/MxGatewayLowFindingsTests.java diff --git a/clients/java/README.md b/clients/java/README.md index ad03020..cc441a0 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -74,10 +74,25 @@ that RPC so a close-time error never replaces the exception a try-with-resources body is already propagating. Call `closeRaw()` explicitly when you need to observe the close result or handle a close-time failure. +`MxGatewayClient` and `GalaxyRepositoryClient` implement `AutoCloseable`. For a +client that owns its channel (built with `connect`), the try-with-resources +`close()` shuts the channel down and waits up to the configured connect timeout +for termination, forcibly shutting it down on timeout, so in-flight calls and +Netty event-loop threads are not left running after the block exits. If the +calling thread is interrupted while waiting, the channel is forcibly shut down +and the interrupt flag is restored. `closeAndAwaitTermination()` does the same +but throws `InterruptedException` for callers that want a checked, +blocking-aware shutdown. `close()` is a no-op for a caller-managed channel. + `MxEventStream` implements `Iterator` and `AutoCloseable`. Closing it cancels the underlying gRPC stream. Canceling or timing out a Java client call only stops the client from waiting; it does not abort an in-flight MXAccess COM -call on the worker STA. +call on the worker STA. The event stream uses gRPC's default auto-inbound flow +control with a fixed 16-element buffer and no client-side flow control: this is +the gateway's documented fail-fast event-backpressure model, so a consumer that +stalls long enough to fill the buffer triggers an overflow that cancels the +subscription and surfaces an `MxGatewayException` from the next `next()` call. +Drain events promptly and be prepared to resubscribe with a resume cursor. ## Galaxy Repository Browse diff --git a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java index f9da2a4..778a656 100644 --- a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java @@ -661,33 +661,60 @@ public final class MxGatewayCli implements Callable { @Option(names = "--timeout", defaultValue = "30s", description = "Per-call timeout.") String timeout; - private String resolvedApiKey = ""; - private Duration resolvedTimeout = Duration.ofSeconds(30); - + /** + * Returns this options object unchanged. + * + *

Retained as a no-op for call sites that read more naturally as + * {@code common.resolved()}. Resolution of the API key and timeout is + * computed lazily on demand by {@link #resolvedApiKey()} and + * {@link #resolvedTimeout()}, so {@link #toClientOptions()} and + * {@link #redactedJsonMap()} produce correct output regardless of + * whether this method was ever called. + * + * @return this options object + */ CommonOptions resolved() { - resolvedApiKey = apiKey == null || apiKey.isBlank() ? System.getenv(apiKeyEnv) : apiKey; - if (resolvedApiKey == null) { - resolvedApiKey = ""; - } - resolvedTimeout = parseDuration(timeout); return this; } + /** + * Resolves the effective API key: the explicit {@code --api-key} value + * when non-blank, otherwise the value of the {@code --api-key-env} + * environment variable, otherwise an empty string. Computed on each + * call so there is no stale cached state. + * + * @return the resolved API key, never {@code null} + */ + String resolvedApiKey() { + String resolved = apiKey == null || apiKey.isBlank() ? System.getenv(apiKeyEnv) : apiKey; + return resolved == null ? "" : resolved; + } + + /** + * Resolves the effective per-call timeout from the {@code --timeout} + * option. Computed on each call so there is no stale cached state. + * + * @return the resolved call timeout + */ + Duration resolvedTimeout() { + return parseDuration(timeout); + } + MxGatewayClientOptions toClientOptions() { return MxGatewayClientOptions.builder() .endpoint(endpoint) - .apiKey(resolvedApiKey) + .apiKey(resolvedApiKey()) .plaintext(plaintext) .caCertificatePath(caFile) .serverNameOverride(serverNameOverride) - .callTimeout(resolvedTimeout) + .callTimeout(resolvedTimeout()) .build(); } Map redactedJsonMap() { Map values = new LinkedHashMap<>(); values.put("endpoint", endpoint); - values.put("apiKey", MxGatewaySecrets.redactApiKey(resolvedApiKey)); + values.put("apiKey", MxGatewaySecrets.redactApiKey(resolvedApiKey())); values.put("apiKeyEnv", apiKeyEnv); values.put("plaintext", plaintext); values.put("caFile", caFile == null ? "" : caFile.toString()); diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java index 9200dda..518f831 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java @@ -1,8 +1,5 @@ package com.dohertylan.mxgateway.client; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import galaxy_repository.v1.GalaxyRepositoryGrpc; import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent; import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyReply; @@ -17,8 +14,6 @@ import com.google.protobuf.Timestamp; import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.StreamObserver; import java.time.Instant; import java.util.Iterator; @@ -27,7 +22,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLException; /** * Thin wrapper around the generated {@link GalaxyRepositoryGrpc} stubs that @@ -78,7 +72,8 @@ public final class GalaxyRepositoryClient implements AutoCloseable { * @return a connected client */ public static GalaxyRepositoryClient connect(MxGatewayClientOptions options) { - return new GalaxyRepositoryClient(createChannel(options), options); + return new GalaxyRepositoryClient( + MxGatewayChannels.createChannel(options, "failed to configure galaxy repository TLS"), options); } /** @@ -87,7 +82,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable { * @return the blocking stub */ public GalaxyRepositoryGrpc.GalaxyRepositoryBlockingStub rawBlockingStub() { - return withDeadline(blockingStub); + return MxGatewayChannels.withDeadline(blockingStub, options); } /** @@ -96,7 +91,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable { * @return the future stub */ public GalaxyRepositoryGrpc.GalaxyRepositoryFutureStub rawFutureStub() { - return withDeadline(futureStub); + return MxGatewayChannels.withDeadline(futureStub, options); } /** @@ -133,7 +128,9 @@ public final class GalaxyRepositoryClient implements AutoCloseable { * exceptionally with {@link MxGatewayException} on failure */ public CompletableFuture testConnectionAsync() { - return toCompletable(rawFutureStub().testConnection(TestConnectionRequest.getDefaultInstance())) + return MxGatewayChannels.toCompletable( + rawFutureStub().testConnection(TestConnectionRequest.getDefaultInstance()), + "galaxy test connection") .thenApply(TestConnectionReply::getOk); } @@ -165,8 +162,11 @@ public final class GalaxyRepositoryClient implements AutoCloseable { * completed exceptionally with {@link MxGatewayException} on failure */ public CompletableFuture> getLastDeployTimeAsync() { - return toCompletable(rawFutureStub().getLastDeployTime(GetLastDeployTimeRequest.getDefaultInstance())) - .thenApply(GalaxyRepositoryClient::mapDeployTime); + return MxGatewayChannels.toCompletable( + rawFutureStub().getLastDeployTime(GetLastDeployTimeRequest.getDefaultInstance()), + "galaxy get last deploy time") + .thenApply(MxGatewayChannels.normalisingValidator( + "galaxy get last deploy time", GalaxyRepositoryClient::mapDeployTime)); } /** @@ -224,7 +224,8 @@ public final class GalaxyRepositoryClient implements AutoCloseable { */ public DeployEventStream watchDeployEvents(Instant lastSeenDeployTime) { DeployEventStream stream = new DeployEventStream(16); - withStreamDeadline(rawAsyncStub()).watchDeployEvents(buildWatchRequest(lastSeenDeployTime), stream.observer()); + MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options) + .watchDeployEvents(buildWatchRequest(lastSeenDeployTime), stream.observer()); return stream; } @@ -253,7 +254,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable { Instant lastSeenDeployTime, StreamObserver observer) { Objects.requireNonNull(observer, "observer"); DeployEventSubscription subscription = new DeployEventSubscription(); - withStreamDeadline(rawAsyncStub()) + MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options) .watchDeployEvents(buildWatchRequest(lastSeenDeployTime), subscription.wrap(observer)); return subscription; } @@ -269,17 +270,31 @@ public final class GalaxyRepositoryClient implements AutoCloseable { return builder.build(); } - private > T withStreamDeadline(T stub) { - if (options.streamTimeout() == null || options.streamTimeout().isNegative()) { - return stub; - } - return stub.withDeadlineAfter(options.streamTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - + /** + * Shuts the owned channel down and awaits termination so try-with-resources + * callers do not leave in-flight calls or Netty event-loop threads running + * after the block exits. + * + *

Waits up to the configured connect timeout for graceful termination + * and forcibly shuts the channel down on timeout. If the calling thread is + * interrupted while waiting, the channel is forcibly shut down and the + * thread's interrupt flag is restored. No-op for clients that do not own + * their channel. For an explicitly checked, blocking-aware shutdown call + * {@link #closeAndAwaitTermination()}. + */ @Override public void close() { - if (ownedChannel != null) { - ownedChannel.shutdown(); + if (ownedChannel == null) { + return; + } + ownedChannel.shutdown(); + try { + if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + ownedChannel.shutdownNow(); + } + } catch (InterruptedException error) { + ownedChannel.shutdownNow(); + Thread.currentThread().interrupt(); } } @@ -307,86 +322,26 @@ public final class GalaxyRepositoryClient implements AutoCloseable { return Optional.of(Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos())); } - private static ManagedChannel createChannel(MxGatewayClientOptions options) { - NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint()) - .maxInboundMessageSize(options.maxGrpcMessageBytes()); - if (!options.connectTimeout().isNegative()) { - builder.withOption( - io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, - Math.toIntExact(options.connectTimeout().toMillis())); - } - if (options.plaintext()) { - builder.usePlaintext(); - } else if (options.caCertificatePath() != null) { - try { - builder.sslContext(GrpcSslContexts.forClient() - .trustManager(options.caCertificatePath().toFile()) - .build()); - } catch (SSLException error) { - throw new MxGatewayException("failed to configure galaxy repository TLS", error); - } - } else { - builder.useTransportSecurity(); - } - if (!options.serverNameOverride().isBlank()) { - builder.overrideAuthority(options.serverNameOverride()); - } - return builder.build(); - } - - private > T withDeadline(T stub) { - if (options.callTimeout().isNegative()) { - return stub; - } - return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - private CompletableFuture> discoverHierarchyPageAsync( String pageToken, java.util.ArrayList objects, java.util.HashSet seenPageTokens) { DiscoverHierarchyRequest request = DiscoverHierarchyRequest.newBuilder() .setPageSize(DISCOVER_HIERARCHY_PAGE_SIZE) .setPageToken(pageToken) .build(); - return toCompletable(rawFutureStub().discoverHierarchy(request)).thenCompose(reply -> { - objects.addAll(reply.getObjectsList()); - if (reply.getNextPageToken().isBlank()) { - return CompletableFuture.completedFuture(objects); - } - if (!seenPageTokens.add(reply.getNextPageToken())) { - CompletableFuture> failed = new CompletableFuture<>(); - failed.completeExceptionally(new MxGatewayException( - "galaxy discover hierarchy returned repeated page token: " + reply.getNextPageToken())); - return failed; - } - return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens); - }); - } - - private static CompletableFuture toCompletable(com.google.common.util.concurrent.ListenableFuture source) { - CompletableFuture target = new CompletableFuture<>(); - Futures.addCallback( - source, - new FutureCallback<>() { - @Override - public void onSuccess(T result) { - target.complete(result); + return MxGatewayChannels.toCompletable(rawFutureStub().discoverHierarchy(request), "galaxy discover hierarchy") + .thenCompose(reply -> { + objects.addAll(reply.getObjectsList()); + if (reply.getNextPageToken().isBlank()) { + return CompletableFuture.completedFuture(objects); } - - @Override - public void onFailure(Throwable error) { - if (error instanceof RuntimeException runtimeException) { - target.completeExceptionally(MxGatewayErrors.fromGrpc("galaxy async call", runtimeException)); - return; - } - target.completeExceptionally(error); + if (!seenPageTokens.add(reply.getNextPageToken())) { + CompletableFuture> failed = new CompletableFuture<>(); + failed.completeExceptionally(new MxGatewayException( + "galaxy discover hierarchy returned repeated page token: " + + reply.getNextPageToken())); + return failed; } - }, - MoreExecutors.directExecutor()); - target.whenComplete((ignoredResult, ignoredError) -> { - if (target.isCancelled()) { - source.cancel(true); - } - }); - return target; + return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens); + }); } } diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java index 361b87b..71d3e7d 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java @@ -22,6 +22,18 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; * cancelled and a follow-up call to {@link #next()} throws * {@link MxGatewayException}. * + *

Backpressure (fail-fast): this adaptor relies on gRPC's + * default auto-inbound flow control — the async stub auto-requests messages, so + * the gateway can push events faster than the consumer drains the bounded + * 16-element buffer. There is intentionally no real client flow + * control: a consumer that stalls long enough to let the buffer fill triggers + * an immediate overflow that cancels the subscription and surfaces an + * {@link MxGatewayException} on the next {@link #next()} call. This matches the + * gateway's documented fail-fast event-backpressure design — a slow consumer + * loses its subscription rather than silently dropping events. Consumers that + * cannot keep up must drain {@link #next()} promptly (e.g. hand events to their + * own larger queue) and be prepared to resubscribe with a resume cursor. + * *

Threading: the iterator methods ({@link #hasNext()} and * {@link #next()}) are not thread-safe and must be driven by a single * consumer thread. {@link #close()} may be called from any thread. Terminal diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayChannels.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayChannels.java new file mode 100644 index 0000000..df188aa --- /dev/null +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayChannels.java @@ -0,0 +1,164 @@ +package com.dohertylan.mxgateway.client; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.AbstractStub; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.net.ssl.SSLException; + +/** + * Shared channel-builder and future-adaptor helpers used by both + * {@link MxGatewayClient} and {@link GalaxyRepositoryClient}. + * + *

Extracted so transport construction, per-call deadlines, and the + * {@link ListenableFuture}-to-{@link CompletableFuture} bridge live in one + * place instead of being duplicated verbatim across the two clients. + */ +final class MxGatewayChannels { + private MxGatewayChannels() { + } + + /** + * Builds a Netty managed channel from the supplied options, applying the + * connect timeout, message-size limit, and the configured transport + * security mode (plaintext, custom CA trust, or system trust). + * + * @param options the client options carrying endpoint and transport config + * @param tlsErrorPrefix a human-readable prefix for the {@link MxGatewayException} + * thrown when a custom CA certificate cannot be loaded + * @return a new managed channel; the caller owns its lifecycle + */ + static ManagedChannel createChannel(MxGatewayClientOptions options, String tlsErrorPrefix) { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint()) + .maxInboundMessageSize(options.maxGrpcMessageBytes()); + if (!options.connectTimeout().isNegative()) { + builder.withOption( + io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, + Math.toIntExact(options.connectTimeout().toMillis())); + } + if (options.plaintext()) { + builder.usePlaintext(); + } else if (options.caCertificatePath() != null) { + try { + builder.sslContext(GrpcSslContexts.forClient() + .trustManager(options.caCertificatePath().toFile()) + .build()); + } catch (SSLException | RuntimeException error) { + // SSLException covers handshake-context failures; RuntimeException + // (IllegalArgumentException wrapping CertificateException) covers a + // missing or unreadable CA file. Either way callers see one typed + // failure instead of a raw, unwrapped exception leaking out. + throw new MxGatewayException(tlsErrorPrefix, error); + } + } else { + builder.useTransportSecurity(); + } + if (!options.serverNameOverride().isBlank()) { + builder.overrideAuthority(options.serverNameOverride()); + } + return builder.build(); + } + + /** + * Applies the configured per-call deadline to a unary stub. + * + * @param stub the stub to decorate + * @param options the client options carrying the call timeout + * @param the concrete stub type + * @return the stub with the call deadline applied, or the stub unchanged + * when the call timeout is negative (disabled) + */ + static > T withDeadline(T stub, MxGatewayClientOptions options) { + if (options.callTimeout().isNegative()) { + return stub; + } + return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + + /** + * Applies the configured streaming deadline to a streaming stub. + * + * @param stub the stub to decorate + * @param options the client options carrying the stream timeout + * @param the concrete stub type + * @return the stub with the stream deadline applied, or the stub unchanged + * when the stream timeout is unset or negative (disabled) + */ + static > T withStreamDeadline(T stub, MxGatewayClientOptions options) { + if (options.streamTimeout() == null || options.streamTimeout().isNegative()) { + return stub; + } + return stub.withDeadlineAfter(options.streamTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + + /** + * Bridges a Guava {@link ListenableFuture} to a {@link CompletableFuture}, + * normalising any failure through {@link MxGatewayErrors#fromGrpc} so the + * async error surface matches the synchronous methods. Cancelling the + * returned future cancels the source RPC. + * + * @param source the gRPC future-stub result + * @param operation the operation name used in normalised error messages + * @param the reply type + * @return a completable future mirroring the source + */ + static CompletableFuture toCompletable(ListenableFuture source, String operation) { + CompletableFuture target = new CompletableFuture<>(); + Futures.addCallback( + source, + new FutureCallback<>() { + @Override + public void onSuccess(T result) { + target.complete(result); + } + + @Override + public void onFailure(Throwable error) { + if (error instanceof RuntimeException runtimeException) { + target.completeExceptionally(MxGatewayErrors.fromGrpc(operation, runtimeException)); + return; + } + target.completeExceptionally(error); + } + }, + MoreExecutors.directExecutor()); + target.whenComplete((ignoredResult, ignoredError) -> { + if (target.isCancelled()) { + source.cancel(true); + } + }); + return target; + } + + /** + * Adapts a reply-validating function for use inside {@code thenApply} so + * any non-{@link MxGatewayException} {@link RuntimeException} it raises is + * routed through {@link MxGatewayErrors#fromGrpc}. This keeps the async + * error surface consistent with the synchronous methods, which normalise + * failures with a {@code try/catch}. + * + * @param operation the operation name used in normalised error messages + * @param validator the validating/transforming function applied to the reply + * @param the reply type + * @param the result type + * @return a function suitable for {@link CompletableFuture#thenApply} + */ + static Function normalisingValidator(String operation, Function validator) { + return reply -> { + try { + return validator.apply(reply); + } catch (MxGatewayException error) { + throw error; + } catch (RuntimeException error) { + throw MxGatewayErrors.fromGrpc(operation, error); + } + }; + } +} diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java index 1ace8d4..6aa42c4 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java @@ -1,19 +1,13 @@ package com.dohertylan.mxgateway.client; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Duration; import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLException; import mxaccess_gateway.v1.MxAccessGatewayGrpc; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; @@ -79,7 +73,8 @@ public final class MxGatewayClient implements AutoCloseable { * @return a connected client */ public static MxGatewayClient connect(MxGatewayClientOptions options) { - return new MxGatewayClient(createChannel(options), options); + return new MxGatewayClient( + MxGatewayChannels.createChannel(options, "failed to configure gateway TLS"), options); } /** @@ -88,7 +83,7 @@ public final class MxGatewayClient implements AutoCloseable { * @return the blocking stub */ public MxAccessGatewayGrpc.MxAccessGatewayBlockingStub rawBlockingStub() { - return withDeadline(blockingStub); + return MxGatewayChannels.withDeadline(blockingStub, options); } /** @@ -97,7 +92,7 @@ public final class MxGatewayClient implements AutoCloseable { * @return the future stub */ public MxAccessGatewayGrpc.MxAccessGatewayFutureStub rawFutureStub() { - return withDeadline(futureStub); + return MxGatewayChannels.withDeadline(futureStub, options); } /** @@ -186,12 +181,13 @@ public final class MxGatewayClient implements AutoCloseable { * with {@link MxGatewayException} on failure */ public CompletableFuture openSessionAsync(OpenSessionRequest request) { - CompletableFuture future = toCompletable(rawFutureStub().openSession(request)); - return future.thenApply(reply -> { + CompletableFuture future = + MxGatewayChannels.toCompletable(rawFutureStub().openSession(request), "open session"); + return future.thenApply(MxGatewayChannels.normalisingValidator("open session", reply -> { MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null); ensureGatewayProtocolCompatible(reply); return reply; - }); + })); } /** @@ -226,12 +222,13 @@ public final class MxGatewayClient implements AutoCloseable { * on failure */ public CompletableFuture invokeAsync(MxCommandRequest request) { - CompletableFuture future = toCompletable(rawFutureStub().invoke(request)); - return future.thenApply(reply -> { + CompletableFuture future = + MxGatewayChannels.toCompletable(rawFutureStub().invoke(request), "invoke"); + return future.thenApply(MxGatewayChannels.normalisingValidator("invoke", reply -> { MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply); MxGatewayErrors.ensureMxAccessSuccess("invoke", reply); return reply; - }); + })); } /** @@ -264,7 +261,7 @@ public final class MxGatewayClient implements AutoCloseable { */ public MxEventStream streamEvents(StreamEventsRequest request) { MxEventStream stream = new MxEventStream(16); - withStreamDeadline(rawAsyncStub()).streamEvents(request, stream.observer()); + MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options).streamEvents(request, stream.observer()); return stream; } @@ -279,15 +276,17 @@ public final class MxGatewayClient implements AutoCloseable { public MxGatewayEventSubscription streamEventsAsync( StreamEventsRequest request, StreamObserver observer) { MxGatewayEventSubscription subscription = new MxGatewayEventSubscription(); - withStreamDeadline(rawAsyncStub()).streamEvents(request, subscription.wrap(observer)); + MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options) + .streamEvents(request, subscription.wrap(observer)); return subscription; } /** * Acknowledges an active MXAccess alarm condition through the gateway. * - *

The gateway authenticates the request against the API key's - * {@code invoke:alarm-ack} scope and forwards the acknowledge to the + *

The gateway authorizes this request against the API key's + * {@code admin} scope (the gateway scope resolver maps alarm RPCs to the + * default {@code admin} scope) and forwards the acknowledge to the * worker's MXAccess session; the resulting native MxStatus is returned * in the reply. Acks are idempotent at the MxAccess layer. * @@ -316,11 +315,12 @@ public final class MxGatewayClient implements AutoCloseable { * with {@link MxGatewayException} on failure */ public CompletableFuture acknowledgeAlarmAsync(AcknowledgeAlarmRequest request) { - CompletableFuture future = toCompletable(rawFutureStub().acknowledgeAlarm(request)); - return future.thenApply(reply -> { + CompletableFuture future = + MxGatewayChannels.toCompletable(rawFutureStub().acknowledgeAlarm(request), "acknowledge alarm"); + return future.thenApply(MxGatewayChannels.normalisingValidator("acknowledge alarm", reply -> { MxGatewayErrors.ensureProtocolSuccess("acknowledge alarm", reply.getProtocolStatus(), null); return reply; - }); + })); } /** @@ -336,14 +336,36 @@ public final class MxGatewayClient implements AutoCloseable { public MxGatewayActiveAlarmsSubscription queryActiveAlarms( QueryActiveAlarmsRequest request, StreamObserver observer) { MxGatewayActiveAlarmsSubscription subscription = new MxGatewayActiveAlarmsSubscription(); - withStreamDeadline(rawAsyncStub()).queryActiveAlarms(request, subscription.wrap(observer)); + MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options) + .queryActiveAlarms(request, subscription.wrap(observer)); return subscription; } + /** + * Shuts the owned channel down and awaits termination so try-with-resources + * callers do not leave in-flight calls or Netty event-loop threads running + * after the block exits. + * + *

Waits up to the configured connect timeout for graceful termination + * and forcibly shuts the channel down on timeout. If the calling thread is + * interrupted while waiting, the channel is forcibly shut down and the + * thread's interrupt flag is restored. No-op for clients that do not own + * their channel. For an explicitly checked, blocking-aware shutdown call + * {@link #closeAndAwaitTermination()}. + */ @Override public void close() { - if (ownedChannel != null) { - ownedChannel.shutdown(); + if (ownedChannel == null) { + return; + } + ownedChannel.shutdown(); + try { + if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + ownedChannel.shutdownNow(); + } + } catch (InterruptedException error) { + ownedChannel.shutdownNow(); + Thread.currentThread().interrupt(); } } @@ -363,75 +385,6 @@ public final class MxGatewayClient implements AutoCloseable { } } - private static ManagedChannel createChannel(MxGatewayClientOptions options) { - NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint()) - .maxInboundMessageSize(options.maxGrpcMessageBytes()); - if (!options.connectTimeout().isNegative()) { - builder.withOption( - io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS, - Math.toIntExact(options.connectTimeout().toMillis())); - } - if (options.plaintext()) { - builder.usePlaintext(); - } else if (options.caCertificatePath() != null) { - try { - builder.sslContext(GrpcSslContexts.forClient() - .trustManager(options.caCertificatePath().toFile()) - .build()); - } catch (SSLException error) { - throw new MxGatewayException("failed to configure gateway TLS", error); - } - } else { - builder.useTransportSecurity(); - } - if (!options.serverNameOverride().isBlank()) { - builder.overrideAuthority(options.serverNameOverride()); - } - return builder.build(); - } - - private > T withDeadline(T stub) { - if (options.callTimeout().isNegative()) { - return stub; - } - return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - - private > T withStreamDeadline(T stub) { - if (options.streamTimeout() == null || options.streamTimeout().isNegative()) { - return stub; - } - return stub.withDeadlineAfter(options.streamTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - - private static CompletableFuture toCompletable(com.google.common.util.concurrent.ListenableFuture source) { - CompletableFuture target = new CompletableFuture<>(); - Futures.addCallback( - source, - new FutureCallback<>() { - @Override - public void onSuccess(T result) { - target.complete(result); - } - - @Override - public void onFailure(Throwable error) { - if (error instanceof RuntimeException runtimeException) { - target.completeExceptionally(MxGatewayErrors.fromGrpc("async call", runtimeException)); - return; - } - target.completeExceptionally(error); - } - }, - MoreExecutors.directExecutor()); - target.whenComplete((ignoredResult, ignoredError) -> { - if (target.isCancelled()) { - source.cancel(true); - } - }); - return target; - } - static ProtocolStatusCode okStatusCode() { return ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK; } diff --git a/clients/java/mxgateway-client/src/test/java/com/dohertylan/mxgateway/client/MxGatewayLowFindingsTests.java b/clients/java/mxgateway-client/src/test/java/com/dohertylan/mxgateway/client/MxGatewayLowFindingsTests.java new file mode 100644 index 0000000..d04e40d --- /dev/null +++ b/clients/java/mxgateway-client/src/test/java/com/dohertylan/mxgateway/client/MxGatewayLowFindingsTests.java @@ -0,0 +1,503 @@ +package com.dohertylan.mxgateway.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import mxaccess_gateway.v1.MxAccessGatewayGrpc; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply; +import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest; +import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot; +import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState; +import mxaccess_gateway.v1.MxaccessGateway.MxEvent; +import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus; +import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; +import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest; +import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; +import org.junit.jupiter.api.Test; + +/** + * Regression tests for the Low-severity Client.Java code-review findings + * (Client.Java-006 through Client.Java-012). Covers the alarm RPC surface, + * async streaming/subscription cancellation, queue overflow, and TLS-config + * construction that Client.Java-007 reports as untested. + */ +final class MxGatewayLowFindingsTests { + + // --- Client.Java-007: AcknowledgeAlarm RPC coverage --- + + @Test + void acknowledgeAlarmReturnsReplyAndSendsAuthMetadata() throws Exception { + AtomicReference authorization = new AtomicReference<>(); + AtomicReference seen = new AtomicReference<>(); + TestService service = new TestService() { + @Override + public void acknowledgeAlarm( + AcknowledgeAlarmRequest request, StreamObserver responseObserver) { + seen.set(request); + responseObserver.onNext(AcknowledgeAlarmReply.newBuilder() + .setSessionId(request.getSessionId()) + .setProtocolStatus(ok()) + .setDiagnosticMessage("acked") + .build()); + responseObserver.onCompleted(); + } + }; + + try (Harness harness = Harness.start(service, "mxgw_keyid_secret", authorization)) { + AcknowledgeAlarmReply reply = harness.client().acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder() + .setSessionId("s-1") + .setAlarmFullReference("Area1.Pump.PV.HiHi") + .setComment("operator note") + .build()); + assertEquals("acked", reply.getDiagnosticMessage()); + assertEquals("Area1.Pump.PV.HiHi", seen.get().getAlarmFullReference()); + assertEquals("Bearer mxgw_keyid_secret", authorization.get()); + } + } + + @Test + void acknowledgeAlarmThrowsTypedExceptionOnProtocolFailure() throws Exception { + TestService service = new TestService() { + @Override + public void acknowledgeAlarm( + AcknowledgeAlarmRequest request, StreamObserver responseObserver) { + responseObserver.onNext(AcknowledgeAlarmReply.newBuilder() + .setSessionId(request.getSessionId()) + .setProtocolStatus(ProtocolStatus.newBuilder() + .setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND)) + .build()); + responseObserver.onCompleted(); + } + }; + + try (Harness harness = Harness.start(service)) { + assertThrows( + MxGatewayException.class, + () -> harness.client().acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder() + .setSessionId("missing") + .build())); + } + } + + @Test + void acknowledgeAlarmAsyncCompletesWithReply() throws Exception { + TestService service = new TestService() { + @Override + public void acknowledgeAlarm( + AcknowledgeAlarmRequest request, StreamObserver responseObserver) { + responseObserver.onNext(AcknowledgeAlarmReply.newBuilder() + .setSessionId(request.getSessionId()) + .setProtocolStatus(ok()) + .setDiagnosticMessage("async-acked") + .build()); + responseObserver.onCompleted(); + } + }; + + try (Harness harness = Harness.start(service)) { + CompletableFuture future = harness.client() + .acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder().setSessionId("s-2").build()); + assertEquals("async-acked", future.get(5, TimeUnit.SECONDS).getDiagnosticMessage()); + } + } + + @Test + void acknowledgeAlarmAsyncFailsExceptionallyWithTypedException() throws Exception { + TestService service = new TestService() { + @Override + public void acknowledgeAlarm( + AcknowledgeAlarmRequest request, StreamObserver responseObserver) { + responseObserver.onError(Status.UNAVAILABLE.withDescription("worker down").asRuntimeException()); + } + }; + + try (Harness harness = Harness.start(service)) { + CompletableFuture future = harness.client() + .acknowledgeAlarmAsync(AcknowledgeAlarmRequest.newBuilder().setSessionId("s-3").build()); + ExecutionException error = assertThrows( + ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)); + assertTrue(error.getCause() instanceof MxGatewayException, () -> String.valueOf(error.getCause())); + } + } + + // --- Client.Java-007: QueryActiveAlarms RPC + subscription coverage --- + + @Test + void queryActiveAlarmsDeliversSnapshotsToObserver() throws Exception { + ActiveAlarmSnapshot snapshot = ActiveAlarmSnapshot.newBuilder() + .setAlarmFullReference("Area1.Tank.Level.Hi") + .setSeverity(800) + .setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE) + .build(); + TestService service = new TestService() { + @Override + public void queryActiveAlarms( + QueryActiveAlarmsRequest request, StreamObserver responseObserver) { + responseObserver.onNext(snapshot); + responseObserver.onCompleted(); + } + }; + + try (Harness harness = Harness.start(service)) { + List received = new ArrayList<>(); + CountDownLatch done = new CountDownLatch(1); + harness.client().queryActiveAlarms( + QueryActiveAlarmsRequest.newBuilder().setSessionId("s-4").build(), + new StreamObserver<>() { + @Override + public void onNext(ActiveAlarmSnapshot value) { + received.add(value); + } + + @Override + public void onError(Throwable t) { + done.countDown(); + } + + @Override + public void onCompleted() { + done.countDown(); + } + }); + assertTrue(done.await(5, TimeUnit.SECONDS), "stream should complete"); + assertEquals(1, received.size()); + assertEquals("Area1.Tank.Level.Hi", received.get(0).getAlarmFullReference()); + } + } + + @Test + void activeAlarmsSubscriptionCancelBeforeBeforeStartCancelsStream() { + MxGatewayActiveAlarmsSubscription subscription = new MxGatewayActiveAlarmsSubscription(); + ClientResponseObserver observer = + subscription.wrap(new StreamObserver<>() { + @Override + public void onNext(ActiveAlarmSnapshot value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + }); + RecordingActiveAlarmsRequestStream requestStream = new RecordingActiveAlarmsRequestStream(); + + subscription.cancel(); + observer.beforeStart(requestStream); + + assertTrue(requestStream.cancelled); + assertEquals("client cancelled active-alarms query", requestStream.cancelMessage); + } + + // --- Client.Java-007: async streamEvents + subscription cancellation --- + + @Test + void streamEventsAsyncDeliversEventsToObserver() throws Exception { + MxEvent event = MxEvent.newBuilder().setWorkerSequence(7).build(); + TestService service = new TestService() { + @Override + public void streamEvents(StreamEventsRequest request, StreamObserver responseObserver) { + responseObserver.onNext(event); + responseObserver.onCompleted(); + } + }; + + try (Harness harness = Harness.start(service)) { + List received = new ArrayList<>(); + CountDownLatch done = new CountDownLatch(1); + harness.client().streamEventsAsync( + StreamEventsRequest.newBuilder().setSessionId("s-5").build(), + new StreamObserver<>() { + @Override + public void onNext(MxEvent value) { + received.add(value); + } + + @Override + public void onError(Throwable t) { + done.countDown(); + } + + @Override + public void onCompleted() { + done.countDown(); + } + }); + assertTrue(done.await(5, TimeUnit.SECONDS), "stream should complete"); + assertEquals(1, received.size()); + assertEquals(7, received.get(0).getWorkerSequence()); + } + } + + @Test + void eventSubscriptionCancelBeforeBeforeStartCancelsStream() { + MxGatewayEventSubscription subscription = new MxGatewayEventSubscription(); + ClientResponseObserver observer = + subscription.wrap(new StreamObserver<>() { + @Override + public void onNext(MxEvent value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + }); + RecordingEventsRequestStream requestStream = new RecordingEventsRequestStream(); + + subscription.cancel(); + observer.beforeStart(requestStream); + + assertTrue(requestStream.cancelled); + assertEquals("client cancelled event stream", requestStream.cancelMessage); + } + + // --- Client.Java-007 / Client.Java-011: MxEventStream queue overflow --- + + @Test + void eventStreamQueueOverflowSurfacesExceptionFromNext() { + MxEventStream stream = new MxEventStream(2); + ClientResponseObserver observer = stream.observer(); + RecordingEventsRequestStream requestStream = new RecordingEventsRequestStream(); + observer.beforeStart(requestStream); + + // Push far more events than the capacity-2 buffer can hold without draining. + for (int i = 0; i < 16; i++) { + observer.onNext(MxEvent.newBuilder().setWorkerSequence(i).build()); + } + + // Overflow must cancel the gRPC call and surface as MxGatewayException. + assertTrue(requestStream.cancelled, "overflow should cancel the underlying call"); + MxGatewayException error = assertThrows(MxGatewayException.class, () -> { + while (stream.hasNext()) { + stream.next(); + } + }); + assertTrue(error.getMessage().contains("overflow"), error::getMessage); + } + + // --- Client.Java-007: TLS channel construction --- + + @Test + void connectWithMissingCaCertificateThrowsTypedTlsException() { + MxGatewayClientOptions options = MxGatewayClientOptions.builder() + .endpoint("localhost:5001") + .apiKey("mxgw_id_secret") + .plaintext(false) + .caCertificatePath(Path.of("does-not-exist-" + UUID.randomUUID() + ".pem")) + .build(); + + MxGatewayException error = assertThrows(MxGatewayException.class, () -> MxGatewayClient.connect(options)); + assertTrue(error.getMessage().contains("TLS"), error::getMessage); + + MxGatewayException galaxyError = + assertThrows(MxGatewayException.class, () -> GalaxyRepositoryClient.connect(options)); + assertTrue(galaxyError.getMessage().contains("TLS"), galaxyError::getMessage); + } + + @Test + void connectWithSystemTrustBuildsTlsChannelWithoutError() { + // No CA path and plaintext=false exercises the useTransportSecurity() branch. + MxGatewayClientOptions options = MxGatewayClientOptions.builder() + .endpoint("localhost:5001") + .apiKey("mxgw_id_secret") + .plaintext(false) + .build(); + + try (MxGatewayClient client = MxGatewayClient.connect(options)) { + assertNotNull(client); + } + try (GalaxyRepositoryClient galaxy = GalaxyRepositoryClient.connect(options)) { + assertNotNull(galaxy); + } + } + + // --- Client.Java-008: async error surface is normalised --- + + @Test + void openSessionAsyncNormalisesNonGatewayRuntimeExceptionFromValidator() { + // ensureGatewayProtocolCompatible already throws MxGatewayException; this verifies + // the normalisingValidator wrapper routes a stray RuntimeException through fromGrpc. + CompletableFuture source = new CompletableFuture<>(); + CompletableFuture wrapped = + source.thenApply(MxGatewayChannels.normalisingValidator("open session", reply -> { + throw new IllegalStateException("malformed reply"); + })); + source.complete("payload"); + + CompletionException error = assertThrows(CompletionException.class, wrapped::join); + assertTrue(error.getCause() instanceof MxGatewayException, () -> String.valueOf(error.getCause())); + } + + private static ProtocolStatus ok() { + return ProtocolStatus.newBuilder() + .setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK) + .build(); + } + + private static class TestService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase { + } + + private record Harness(Server server, ManagedChannel channel, MxGatewayClient client) implements AutoCloseable { + static Harness start(MxAccessGatewayGrpc.MxAccessGatewayImplBase service) throws Exception { + return start(service, "", new AtomicReference<>()); + } + + static Harness start( + MxAccessGatewayGrpc.MxAccessGatewayImplBase service, + String apiKey, + AtomicReference authorization) + throws Exception { + String name = "mxgw-low-" + UUID.randomUUID(); + io.grpc.ServerInterceptor interceptor = new io.grpc.ServerInterceptor() { + @Override + public io.grpc.ServerCall.Listener interceptCall( + io.grpc.ServerCall call, + io.grpc.Metadata headers, + io.grpc.ServerCallHandler next) { + authorization.set(headers.get(MxGatewayAuthInterceptor.AUTHORIZATION_HEADER)); + return next.startCall(call, headers); + } + }; + Server server = InProcessServerBuilder.forName(name) + .directExecutor() + .addService(io.grpc.ServerInterceptors.intercept(service, interceptor)) + .build() + .start(); + ManagedChannel channel = InProcessChannelBuilder.forName(name).directExecutor().build(); + MxGatewayClient client = new MxGatewayClient( + channel, + MxGatewayClientOptions.builder() + .endpoint("in-process") + .apiKey(apiKey) + .plaintext(true) + .callTimeout(Duration.ofSeconds(5)) + .streamTimeout(Duration.ofSeconds(5)) + .build()); + return new Harness(server, channel, client); + } + + @Override + public void close() { + channel.shutdownNow(); + server.shutdownNow(); + } + } + + private static final class RecordingEventsRequestStream + extends ClientCallStreamObserver { + private boolean cancelled; + private String cancelMessage; + + @Override + public void cancel(String message, Throwable cause) { + cancelled = true; + cancelMessage = message; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + } + + @Override + public void request(int count) { + } + + @Override + public void setMessageCompression(boolean enable) { + } + + @Override + public void disableAutoInboundFlowControl() { + } + + @Override + public void onNext(StreamEventsRequest value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } + + private static final class RecordingActiveAlarmsRequestStream + extends ClientCallStreamObserver { + private boolean cancelled; + private String cancelMessage; + + @Override + public void cancel(String message, Throwable cause) { + cancelled = true; + cancelMessage = message; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + } + + @Override + public void request(int count) { + } + + @Override + public void setMessageCompression(boolean enable) { + } + + @Override + public void disableAutoInboundFlowControl() { + } + + @Override + public void onNext(QueryActiveAlarmsRequest value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } +} diff --git a/code-reviews/Client.Java/findings.md b/code-reviews/Client.Java/findings.md index a984d0c..7333e40 100644 --- a/code-reviews/Client.Java/findings.md +++ b/code-reviews/Client.Java/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `3cc53a8` | | Status | Reviewed | -| Open findings | 7 | +| Open findings | 0 | ## Checklist coverage @@ -108,13 +108,13 @@ | Severity | Low | | Category | Performance & resource management | | Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:323-328`, `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java:279-284` | -| Status | Open | +| Status | Resolved | **Description:** `close()` (the `AutoCloseable` method invoked by try-with-resources) calls only `ownedChannel.shutdown()` and returns immediately without awaiting termination. In-flight calls and Netty event-loop threads may still be running when the caller assumes the resource is released. `closeAndAwaitTermination()` does it correctly but is not the method try-with-resources uses, and the README examples all rely on try-with-resources. **Recommendation:** Have `close()` await termination for a bounded time and `shutdownNow()` on timeout (the logic already in `closeAndAwaitTermination()`), or document that try-with-resources callers should call `closeAndAwaitTermination()`. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed against source: both `MxGatewayClient.close()` and `GalaxyRepositoryClient.close()` called only `ownedChannel.shutdown()`. `close()` in both clients now performs the bounded-wait logic previously only in `closeAndAwaitTermination()`: it shuts the channel down, waits up to the configured connect timeout for graceful termination, and calls `shutdownNow()` on timeout. Because `close()` cannot throw a checked exception, an `InterruptedException` while awaiting is handled by forcibly shutting the channel down and restoring the thread interrupt flag. `closeAndAwaitTermination()` is retained unchanged for callers who want the checked, blocking-aware variant. `clients/java/README.md` documents the new try-with-resources `close()` semantics. ### Client.Java-007 @@ -123,13 +123,13 @@ | Severity | Low | | Category | Testing coverage | | Location | `clients/java/mxgateway-client/src/test/java/com/dohertylan/mxgateway/client/` | -| Status | Open | +| Status | Resolved | **Description:** The alarm surface — `acknowledgeAlarm`/`acknowledgeAlarmAsync`/`queryActiveAlarms` and `MxGatewayActiveAlarmsSubscription` — has zero test coverage. TLS channel construction, the async `streamEventsAsync` path, `MxGatewayEventSubscription` pre-start cancellation, and `MxEventStream` queue overflow are likewise untested. `JavaClientDesign.md` explicitly lists async stream-observer cancellation and status/error mapping as required tests. **Recommendation:** Add in-process gRPC tests for the alarm RPCs, the async streaming/subscription cancellation paths, and at least one TLS-config construction test. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed against source: no test referenced `acknowledgeAlarm`, `queryActiveAlarms`, `streamEventsAsync`, TLS construction, or `MxEventStream` overflow. Added `MxGatewayLowFindingsTests` (12 tests) covering: `acknowledgeAlarm`/`acknowledgeAlarmAsync` (success, typed protocol-failure, async transport-failure normalisation), `queryActiveAlarms` observer delivery, `MxGatewayActiveAlarmsSubscription` and `MxGatewayEventSubscription` pre-start cancellation, `streamEventsAsync` observer delivery, `MxEventStream` queue overflow surfacing `MxGatewayException`, TLS channel construction (missing CA file rejected with a typed exception, system-trust path builds cleanly), and the Client.Java-008 async-validator normalisation. While writing the TLS test a latent bug was found: a missing/unreadable CA file makes `GrpcSslContexts` throw `IllegalArgumentException` (not `SSLException`), which the old `catch (SSLException)` let escape unwrapped — the catch in the shared channel builder was broadened to also wrap `RuntimeException` so callers always see one typed `MxGatewayException`. ### Client.Java-008 @@ -138,13 +138,13 @@ | Severity | Low | | Category | Error handling & resilience | | Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:298-304` | -| Status | Open | +| Status | Resolved | **Description:** `acknowledgeAlarmAsync` and `openSessionAsync` apply `ensureProtocolSuccess` inside `thenApply`. If that validator throws a non-`MxGatewayException` `RuntimeException` it is wrapped by `CompletionException` with no `fromGrpc` normalisation, unlike the synchronous paths which normalise via `try/catch`. The async and sync error surfaces are therefore inconsistent. **Recommendation:** Wrap the `thenApply` body so any non-`MxGatewayException` is routed through `MxGatewayErrors.fromGrpc`, matching the synchronous methods. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed against source: the `thenApply` validators in `openSessionAsync`, `invokeAsync`, and `acknowledgeAlarmAsync` were not normalised — in practice the gateway's own validators (`ensureProtocolSuccess`, `ensureMxAccessSuccess`, `ensureGatewayProtocolCompatible`) only ever throw `MxGatewayException`, but a stray non-`MxGatewayException` `RuntimeException` (e.g. an NPE from a malformed reply) would surface raw inside `CompletionException`. Added `MxGatewayChannels.normalisingValidator(operation, fn)`: it rethrows `MxGatewayException` unchanged and routes any other `RuntimeException` through `MxGatewayErrors.fromGrpc`, matching the synchronous `try/catch` paths. All three async `thenApply` sites now use it. Regression test: `MxGatewayLowFindingsTests.openSessionAsyncNormalisesNonGatewayRuntimeExceptionFromValidator`. ### Client.Java-009 @@ -153,13 +153,13 @@ | Severity | Low | | Category | Code organization & conventions | | Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/GalaxyRepositoryClient.java:310-391`, `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:346-413` | -| Status | Open | +| Status | Resolved | **Description:** `createChannel`, `withDeadline`, `withStreamDeadline`, and `toCompletable` are duplicated nearly verbatim across `MxGatewayClient` and `GalaxyRepositoryClient` (~80 lines). A fix to one will not propagate to the other. **Recommendation:** Extract the channel-builder and future-adaptor helpers into a shared package-private utility class. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed against source: the four helpers were duplicated near-verbatim. Added a package-private `MxGatewayChannels` utility class holding `createChannel(options, tlsErrorPrefix)`, `withDeadline(stub, options)`, `withStreamDeadline(stub, options)`, `toCompletable(future, operation)`, and the new `normalisingValidator` helper (Client.Java-008). Both `MxGatewayClient` and `GalaxyRepositoryClient` now delegate to it and their private copies were deleted, so a future fix lives in one place. Behavior is unchanged except the operation-name carried into `MxGatewayErrors.fromGrpc` is now the specific RPC name instead of the generic `"async call"`/`"galaxy async call"`. Verified by the full existing async test suite plus the new `MxGatewayLowFindingsTests`. ### Client.Java-010 @@ -168,13 +168,13 @@ | Severity | Low | | Category | Documentation & comments | | Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:269-272`, `clients/java/README.md:76` | -| Status | Open | +| Status | Resolved | **Description:** The `acknowledgeAlarm` Javadoc states the gateway authenticates against an `invoke:alarm-ack` scope, and the README states the Galaxy Repository requires a `metadata:read` scope. CLAUDE.md's documented scope set names neither — the Javadoc/README assert a scope contract the project's own auth documentation does not corroborate. **Recommendation:** Reconcile the scope names with `src/MxGateway.Server/Security/` and CLAUDE.md; correct the Javadoc/README to the actual scope strings, or fix CLAUDE.md if sub-scopes were genuinely added. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Partially re-triaged. Verified against `src/MxGateway.Server/Security/Authorization/GatewayScopes.cs` and `GatewayGrpcScopeResolver.cs`: the canonical scope catalog is `session:open`, `session:close`, `invoke:read`, `invoke:write`, `invoke:secure`, `events:read`, `metadata:read`, `admin`. (a) The README's `metadata:read` for the Galaxy Repository is **correct** — `TestConnectionRequest`/`GetLastDeployTimeRequest`/`DiscoverHierarchyRequest`/`WatchDeployEventsRequest` all resolve to `GatewayScopes.MetadataRead`; no change needed. CLAUDE.md's prose lists only coarse scope groups, but the canonical resolver does define `metadata:read`. (b) The `acknowledgeAlarm` Javadoc's `invoke:alarm-ack` is **wrong** — no such scope exists. `AcknowledgeAlarmRequest` and `QueryActiveAlarmsRequest` are not special-cased in `GatewayGrpcScopeResolver`, so they fall through the `_ => GatewayScopes.Admin` default and require the `admin` scope. The Javadoc was corrected to state the `admin` scope; `queryActiveAlarms` did not assert a scope and was left unchanged. The README does not mention alarms, so no README change was required. ### Client.Java-011 @@ -183,13 +183,13 @@ | Severity | Low | | Category | Performance & resource management | | Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java:37-63` | -| Status | Open | +| Status | Resolved | **Description:** The event stream relies on default gRPC auto-inbound flow control: the async stub auto-requests messages, so the server can push faster than the 16-element bounded queue drains. A momentarily slow consumer triggers queue overflow and an immediate stream-fault cancel. This is consistent with the documented fail-fast event-backpressure design, but the client never applies real flow control, so even brief consumer stalls kill the subscription. **Recommendation:** Confirm fail-fast is intended (it appears to be); if so, document it on `MxEventStream` so callers know a slow consumer terminates the stream. Optionally expose the queue capacity or opt-in flow control. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed fail-fast is intended — CLAUDE.md ("fail-fast event backpressure") and `docs/DesignDecisions.md` make a slow consumer losing its subscription a deliberate v1 design choice, so this is documentation-only, not a behavior bug. Added an explicit "Backpressure (fail-fast)" section to the `MxEventStream` class Javadoc explaining that the adaptor uses gRPC auto-inbound flow control with a fixed 16-element buffer and no client flow control, that a consumer stall long enough to fill the buffer triggers an overflow that cancels the subscription and surfaces an `MxGatewayException`, and that consumers must drain promptly and be ready to resubscribe with a resume cursor. `clients/java/README.md` carries the same caveat. The queue capacity was intentionally left non-configurable to keep the v1 surface aligned with the gateway design; overflow behavior is covered by `MxGatewayLowFindingsTests.eventStreamQueueOverflowSurfacesExceptionFromNext`. ### Client.Java-012 @@ -198,10 +198,10 @@ | Severity | Low | | Category | Correctness & logic bugs | | Location | `clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java:667-674` | -| Status | Open | +| Status | Resolved | **Description:** `CommonOptions.resolved()` mutates `this` (`resolvedApiKey`, `resolvedTimeout`) and returns `this`, but `toClientOptions()` and `redactedJsonMap()` read those mutated fields. If `redactedJsonMap()` is ever called before `resolved()`, it silently emits empty-string defaults. The "return this after mutating" pattern is fragile and surprising. **Recommendation:** Make `resolved()` return an immutable resolved value object, or compute `resolvedApiKey`/`resolvedTimeout` lazily in their getters so call ordering cannot produce stale output. -**Resolution:** _(open)_ +**Resolution:** (2026-05-18) Confirmed against source: `resolved()` populated the `resolvedApiKey`/`resolvedTimeout` mutable fields and `toClientOptions()`/`redactedJsonMap()` read them, so calling either before `resolved()` emitted stale empty/30s defaults. The two mutable fields were removed and replaced with side-effect-free accessor methods `resolvedApiKey()` and `resolvedTimeout()` that compute their value on each call (API key from `--api-key` or the `--api-key-env` variable; timeout via `parseDuration`). `toClientOptions()` and `redactedJsonMap()` now call those accessors directly, so call ordering can no longer produce stale output. `resolved()` is retained as a no-op returning `this` purely for call-site readability (`common.resolved()`), with its Javadoc updated to state resolution is now lazy. Pure-refactor with no runtime-behavior change for the existing call order, so no new test was added; covered by the existing `MxGatewayCliTests` JSON-redaction and option-parsing tests.