Resolve Client.Java-001..005 code-review findings
Client.Java-001: redactApiKey echoed the last 4 secret characters. It now keeps only the non-secret mxgw_<key-id>_ prefix plus ***; non-gateway-shaped tokens return <redacted>. Client.Java-002: a close() after a queue-overflow could wipe the enqueued overflow exception. Terminal transitions are now serialized through a single guarded terminate() — first terminal condition wins. Client.Java-003: openSession never read gateway_protocol_version. Both openSession paths now call ensureGatewayProtocolCompatible, rejecting a non-zero mismatch and accepting unset (0) for older gateways. Client.Java-004: register/addItem/addItem2 fell back to a return_value that silently yields 0 when unset. The fallback is now guarded by hasReturnValue() and throws on a protocol violation. Client.Java-005: close() in try-with-resources could mask the body exception when the CloseSession RPC failed. close() now catches and logs the close-time failure; closeRaw() still surfaces it for callers that want it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -62,6 +62,18 @@ underlying protobuf messages. `MxGatewayCommandException` and
|
||||
`MxAccessException` preserve the raw `MxCommandReply` when the gateway returns a
|
||||
data-bearing MXAccess failure.
|
||||
|
||||
`openSession` verifies the gateway's reported `gateway_protocol_version` against
|
||||
the version this client was generated for and throws `MxGatewayException` on a
|
||||
mismatch, so an incompatible client fails fast with a clear message instead of
|
||||
issuing commands that fail downstream. A gateway that does not populate the
|
||||
field is accepted unchanged.
|
||||
|
||||
`MxGatewaySession` implements `AutoCloseable`. The try-with-resources `close()`
|
||||
performs a `CloseSession` network RPC but swallows (and logs) any failure of
|
||||
that RPC so a close-time error never replaces the exception a try-with-resources
|
||||
body is already propagating. Call `closeRaw()` explicitly when you need to
|
||||
observe the close result or handle a close-time failure.
|
||||
|
||||
`MxEventStream` implements `Iterator<MxEvent>` and `AutoCloseable`. Closing it
|
||||
cancels the underlying gRPC stream. Canceling or timing out a Java client call
|
||||
only stops the client from waiting; it does not abort an in-flight MXAccess COM
|
||||
|
||||
+3
-1
@@ -62,8 +62,10 @@ final class MxGatewayCliTests {
|
||||
assertEquals(0, run.exitCode());
|
||||
assertTrue(run.output().contains("\"command\":\"open-session\""));
|
||||
assertTrue(run.output().contains("\"sessionId\":\"session-cli\""));
|
||||
assertTrue(run.output().contains("mxgw***********cret"));
|
||||
// Only the non-secret mxgw_<key-id>_ prefix survives; the secret is fully masked.
|
||||
assertTrue(run.output().contains("mxgw_visible_***"));
|
||||
assertFalse(run.output().contains("visible_secret"));
|
||||
assertFalse(run.output().contains("cret"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
+44
-8
@@ -21,13 +21,23 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
* stream cancels the underlying gRPC call. If the queue overflows the call is
|
||||
* cancelled and a follow-up call to {@link #next()} throws
|
||||
* {@link MxGatewayException}.
|
||||
*
|
||||
* <p><strong>Threading:</strong> the iterator methods ({@link #hasNext()} and
|
||||
* {@link #next()}) are <em>not</em> thread-safe and must be driven by a single
|
||||
* consumer thread. {@link #close()} may be called from any thread. Terminal
|
||||
* state transitions (queue overflow, server completion, and {@code close()})
|
||||
* are serialised so that the first terminal condition wins deterministically:
|
||||
* once an overflow exception has been observed it is never silently replaced
|
||||
* by an end-of-stream marker.
|
||||
*/
|
||||
public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
|
||||
private static final Object END = new Object();
|
||||
|
||||
private final BlockingQueue<Object> queue;
|
||||
private final Object terminalLock = new Object();
|
||||
private volatile ClientCallStreamObserver<StreamEventsRequest> requestStream;
|
||||
private volatile boolean closed;
|
||||
private boolean terminated;
|
||||
private Object next;
|
||||
|
||||
MxEventStream(int capacity) {
|
||||
@@ -98,7 +108,7 @@ public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled event stream", null);
|
||||
}
|
||||
offer(END);
|
||||
terminate(null);
|
||||
}
|
||||
|
||||
private Object take() {
|
||||
@@ -115,10 +125,7 @@ public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
|
||||
private void offer(Object value) {
|
||||
Objects.requireNonNull(value, "value");
|
||||
if (value == END) {
|
||||
if (!queue.offer(value)) {
|
||||
queue.clear();
|
||||
queue.offer(value);
|
||||
}
|
||||
terminate(null);
|
||||
return;
|
||||
}
|
||||
if (!queue.offer(value)) {
|
||||
@@ -126,9 +133,38 @@ public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
|
||||
if (stream != null) {
|
||||
stream.cancel("client event stream queue overflowed", null);
|
||||
}
|
||||
queue.clear();
|
||||
queue.offer(new MxGatewayException("gateway stream events queue overflowed"));
|
||||
queue.offer(END);
|
||||
terminate(new MxGatewayException("gateway stream events queue overflowed"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Drives the single terminal transition. The first caller wins: a later
|
||||
* end-of-stream or {@code close()} cannot overwrite or discard an overflow
|
||||
* exception that has already been published to the consumer.
|
||||
*
|
||||
* @param fault the fault to surface to the consumer, or {@code null} for a
|
||||
* clean end-of-stream
|
||||
*/
|
||||
private void terminate(MxGatewayException fault) {
|
||||
synchronized (terminalLock) {
|
||||
if (terminated) {
|
||||
return;
|
||||
}
|
||||
terminated = true;
|
||||
if (fault != null) {
|
||||
// Make room for the fault marker; the consumer only needs the
|
||||
// terminal signal, queued data events are no longer relevant.
|
||||
queue.clear();
|
||||
queue.offer(fault);
|
||||
queue.offer(END);
|
||||
return;
|
||||
}
|
||||
// Clean end-of-stream: ensure the END marker is delivered even when
|
||||
// the queue is currently full of undrained data events.
|
||||
if (!queue.offer(END)) {
|
||||
queue.clear();
|
||||
queue.offer(END);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+20
@@ -150,6 +150,7 @@ public final class MxGatewayClient implements AutoCloseable {
|
||||
try {
|
||||
OpenSessionReply reply = rawBlockingStub().openSession(request);
|
||||
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
|
||||
ensureGatewayProtocolCompatible(reply);
|
||||
return reply;
|
||||
} catch (RuntimeException error) {
|
||||
if (error instanceof MxGatewayException) {
|
||||
@@ -159,6 +160,24 @@ public final class MxGatewayClient implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the gateway speaks the protocol version this client was
|
||||
* generated against. A gateway that leaves {@code gateway_protocol_version}
|
||||
* unset (value {@code 0}, e.g. an older gateway) is accepted unchanged.
|
||||
*
|
||||
* @param reply the {@code OpenSessionReply} returned by the gateway
|
||||
* @throws MxGatewayException if the gateway reports an incompatible protocol version
|
||||
*/
|
||||
private static void ensureGatewayProtocolCompatible(OpenSessionReply reply) {
|
||||
int gatewayVersion = reply.getGatewayProtocolVersion();
|
||||
int clientVersion = MxGatewayClientVersion.gatewayProtocolVersion();
|
||||
if (gatewayVersion != 0 && gatewayVersion != clientVersion) {
|
||||
throw new MxGatewayException("gateway protocol version mismatch: gateway reports "
|
||||
+ gatewayVersion + " but this client was built for " + clientVersion
|
||||
+ "; upgrade the client or gateway so the protocol versions match");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes {@code OpenSession} asynchronously.
|
||||
*
|
||||
@@ -170,6 +189,7 @@ public final class MxGatewayClient implements AutoCloseable {
|
||||
CompletableFuture<OpenSessionReply> future = toCompletable(rawFutureStub().openSession(request));
|
||||
return future.thenApply(reply -> {
|
||||
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
|
||||
ensureGatewayProtocolCompatible(reply);
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
+19
-9
@@ -11,25 +11,35 @@ public final class MxGatewaySecrets {
|
||||
}
|
||||
|
||||
/**
|
||||
* Redacts the body of an API key, leaving only short prefix and suffix
|
||||
* windows so it remains comparable in logs.
|
||||
* Redacts the secret portion of an API key, leaving only the non-secret
|
||||
* key identifier visible so the value remains comparable in logs.
|
||||
*
|
||||
* <p>A gateway API key has the form {@code mxgw_<key-id>_<secret>}. Only the
|
||||
* {@code mxgw_<key-id>_} prefix is non-secret; everything after the second
|
||||
* underscore is the secret and is masked entirely — no leading or
|
||||
* trailing characters of the secret are echoed. Tokens that do not match
|
||||
* the gateway shape are masked completely as {@code "<redacted>"}.
|
||||
*
|
||||
* @param apiKey the API key to redact, may be {@code null} or empty
|
||||
* @return an empty string for {@code null}/empty input, {@code "<redacted>"}
|
||||
* for keys eight characters or shorter, or a masked form preserving
|
||||
* the leading and trailing four characters
|
||||
* for non-gateway-shaped tokens, or {@code mxgw_<key-id>_***} with the
|
||||
* secret masked for gateway-shaped keys
|
||||
*/
|
||||
public static String redactApiKey(String apiKey) {
|
||||
if (apiKey == null || apiKey.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
if (apiKey.length() <= 8) {
|
||||
return "<redacted>";
|
||||
|
||||
// Gateway keys are mxgw_<key-id>_<secret>; keep only the non-secret prefix.
|
||||
if (apiKey.startsWith("mxgw_")) {
|
||||
int secretSeparator = apiKey.indexOf('_', "mxgw_".length());
|
||||
if (secretSeparator >= 0 && secretSeparator < apiKey.length() - 1) {
|
||||
return apiKey.substring(0, secretSeparator + 1) + "***";
|
||||
}
|
||||
}
|
||||
|
||||
return apiKey.substring(0, 4)
|
||||
+ "*".repeat(apiKey.length() - 8)
|
||||
+ apiKey.substring(apiKey.length() - 4);
|
||||
// Anything else is treated as wholly secret — reveal nothing.
|
||||
return "<redacted>";
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+34
-4
@@ -40,6 +40,7 @@ import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
|
||||
*/
|
||||
public final class MxGatewaySession implements AutoCloseable {
|
||||
private static final SecureRandom RANDOM = new SecureRandom();
|
||||
private static final System.Logger LOGGER = System.getLogger(MxGatewaySession.class.getName());
|
||||
|
||||
private final MxGatewayClient client;
|
||||
private final OpenSessionReply openReply;
|
||||
@@ -99,9 +100,26 @@ public final class MxGatewaySession implements AutoCloseable {
|
||||
return closeReply;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the session as part of try-with-resources.
|
||||
*
|
||||
* <p>This performs a {@code CloseSession} network RPC. Unlike
|
||||
* {@link #closeRaw()}, any failure of that RPC is swallowed (and recorded
|
||||
* as a suppressed exception when the JVM permits) rather than thrown: a
|
||||
* close-time transport or protocol failure must not replace the exception
|
||||
* that a try-with-resources body is already propagating. Callers that need
|
||||
* to observe the close result should call {@link #closeRaw()} explicitly.
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
closeRaw();
|
||||
try {
|
||||
closeRaw();
|
||||
} catch (MxGatewayException error) {
|
||||
LOGGER.log(
|
||||
System.Logger.Level.WARNING,
|
||||
() -> "ignoring close-time failure for session " + sessionId(),
|
||||
error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -116,7 +134,11 @@ public final class MxGatewaySession implements AutoCloseable {
|
||||
if (reply.hasRegister()) {
|
||||
return reply.getRegister().getServerHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
if (reply.hasReturnValue()) {
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
throw new MxGatewayException(
|
||||
"gateway register reply carried neither a register payload nor a return value");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -159,7 +181,11 @@ public final class MxGatewaySession implements AutoCloseable {
|
||||
if (reply.hasAddItem()) {
|
||||
return reply.getAddItem().getItemHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
if (reply.hasReturnValue()) {
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
throw new MxGatewayException(
|
||||
"gateway addItem reply carried neither an add-item payload nor a return value");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -193,7 +219,11 @@ public final class MxGatewaySession implements AutoCloseable {
|
||||
if (reply.hasAddItem2()) {
|
||||
return reply.getAddItem2().getItemHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
if (reply.hasReturnValue()) {
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
throw new MxGatewayException(
|
||||
"gateway addItem2 reply carried neither an add-item payload nor a return value");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+394
@@ -0,0 +1,394 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.time.Duration;
|
||||
import java.util.UUID;
|
||||
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* Regression tests for the Medium-severity Client.Java code-review findings
|
||||
* (Client.Java-001 through Client.Java-005).
|
||||
*/
|
||||
final class MxGatewayMediumFindingsTests {
|
||||
|
||||
// --- Client.Java-001: redactApiKey must not leak trailing secret chars ---
|
||||
|
||||
@Test
|
||||
void redactApiKeyDoesNotLeakAnyCharacterOfTheSecret() {
|
||||
// mxgw_<key-id>_<secret> — the secret is the segment after the second underscore.
|
||||
String apiKey = "mxgw_keyid01_supersecretvalue";
|
||||
String redacted = MxGatewaySecrets.redactApiKey(apiKey);
|
||||
|
||||
// None of the secret characters may appear in the redacted output.
|
||||
assertFalse(redacted.contains("value"), () -> "redacted form leaked secret tail: " + redacted);
|
||||
assertFalse(redacted.endsWith("alue"), () -> "redacted form leaked trailing secret chars: " + redacted);
|
||||
assertFalse(redacted.contains("supersecret"), () -> "redacted form leaked secret: " + redacted);
|
||||
// The non-secret key-id prefix may stay so the value is still comparable in logs.
|
||||
assertTrue(redacted.startsWith("mxgw_keyid01_"), () -> "redacted form lost key-id prefix: " + redacted);
|
||||
}
|
||||
|
||||
@Test
|
||||
void redactApiKeyForNonGatewayShapedKeyRevealsNothing() {
|
||||
String redacted = MxGatewaySecrets.redactApiKey("plain-opaque-token-1234");
|
||||
assertFalse(redacted.contains("1234"), () -> "redacted form leaked trailing chars: " + redacted);
|
||||
assertFalse(redacted.contains("plain-opaque-token"), () -> "redacted form leaked body: " + redacted);
|
||||
}
|
||||
|
||||
@Test
|
||||
void redactApiKeyStillHandlesNullAndShortInput() {
|
||||
assertEquals("", MxGatewaySecrets.redactApiKey(null));
|
||||
assertEquals("", MxGatewaySecrets.redactApiKey(""));
|
||||
assertEquals("<redacted>", MxGatewaySecrets.redactApiKey("short"));
|
||||
}
|
||||
|
||||
// --- Client.Java-002: terminal-state transition must be deterministic ---
|
||||
|
||||
@Test
|
||||
void eventStreamOverflowExceptionSurvivesASubsequentClose() {
|
||||
// Deterministic reproduction of Client.Java-002: an overflow enqueues the
|
||||
// overflow exception, then a later close() must NOT discard it. The first
|
||||
// terminal condition (overflow) must win and stay observable by next().
|
||||
MxEventStream stream = new MxEventStream(2);
|
||||
io.grpc.stub.ClientResponseObserver<
|
||||
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent>
|
||||
observer = stream.observer();
|
||||
observer.beforeStart(new NoopRequestStream());
|
||||
|
||||
// Force a queue overflow on a capacity-2 stream.
|
||||
for (int i = 0; i < 8; i++) {
|
||||
observer.onNext(testEvent(i));
|
||||
}
|
||||
|
||||
// A close() arriving after the overflow must not erase the overflow signal.
|
||||
stream.close();
|
||||
|
||||
MxGatewayException error = assertThrows(MxGatewayException.class, () -> {
|
||||
while (stream.hasNext()) {
|
||||
stream.next();
|
||||
}
|
||||
});
|
||||
assertTrue(error.getMessage().contains("overflow"), error::getMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
void eventStreamConcurrentOverflowAndCloseAlwaysTerminate() throws Exception {
|
||||
// The terminal-state transition must be serialised: whatever the interleaving
|
||||
// of overflow and close, hasNext() always reaches a terminal state.
|
||||
for (int iteration = 0; iteration < 300; iteration++) {
|
||||
MxEventStream stream = new MxEventStream(2);
|
||||
io.grpc.stub.ClientResponseObserver<
|
||||
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent>
|
||||
observer = stream.observer();
|
||||
observer.beforeStart(new NoopRequestStream());
|
||||
|
||||
Thread filler = new Thread(() -> {
|
||||
for (int i = 0; i < 8; i++) {
|
||||
observer.onNext(testEvent(i));
|
||||
}
|
||||
});
|
||||
Thread closer = new Thread(stream::close);
|
||||
filler.start();
|
||||
closer.start();
|
||||
filler.join();
|
||||
closer.join();
|
||||
|
||||
try {
|
||||
while (stream.hasNext()) {
|
||||
stream.next();
|
||||
}
|
||||
} catch (MxGatewayException expected) {
|
||||
assertTrue(expected.getMessage().contains("overflow"), expected::getMessage);
|
||||
}
|
||||
assertFalse(stream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
private static final class NoopRequestStream
|
||||
extends io.grpc.stub.ClientCallStreamObserver<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest> {
|
||||
@Override
|
||||
public void cancel(String message, Throwable cause) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOnReadyHandler(Runnable onReadyHandler) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void request(int count) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMessageCompression(boolean enable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableAutoInboundFlowControl() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest value) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
}
|
||||
|
||||
// --- Client.Java-003: gateway protocol version mismatch must be rejected ---
|
||||
|
||||
@Test
|
||||
void openSessionRejectsIncompatibleGatewayProtocolVersion() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
|
||||
responseObserver.onNext(OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-mismatch")
|
||||
.setGatewayProtocolVersion(MxGatewayClientVersion.gatewayProtocolVersion() + 1)
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
MxGatewayException error = assertThrows(
|
||||
MxGatewayException.class,
|
||||
() -> harness.client().openSession("junit-session"));
|
||||
assertTrue(error.getMessage().contains("protocol version"), error::getMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void openSessionAcceptsMatchingOrUnsetGatewayProtocolVersion() throws Exception {
|
||||
TestService matching = new TestService() {
|
||||
@Override
|
||||
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
|
||||
responseObserver.onNext(OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-ok")
|
||||
.setGatewayProtocolVersion(MxGatewayClientVersion.gatewayProtocolVersion())
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
try (Harness harness = Harness.start(matching)) {
|
||||
assertEquals("session-ok", harness.client().openSession("junit-session").sessionId());
|
||||
}
|
||||
|
||||
// A gateway that leaves the field unset (0) must not be rejected — older gateways
|
||||
// simply do not populate it.
|
||||
TestService unset = new TestService();
|
||||
try (Harness harness = Harness.start(unset)) {
|
||||
assertEquals("session-java", harness.client().openSession("junit-session").sessionId());
|
||||
}
|
||||
}
|
||||
|
||||
// --- Client.Java-004: missing typed payload AND missing return_value must throw ---
|
||||
|
||||
@Test
|
||||
void registerThrowsWhenReplyHasNeitherTypedPayloadNorReturnValue() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
// Reply with neither register payload nor return_value set.
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(request.getCommand().getKind())
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(harness.client(), "s");
|
||||
MxGatewayException error = assertThrows(
|
||||
MxGatewayException.class, () -> session.register("c"));
|
||||
assertTrue(error.getMessage().contains("register"), error::getMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void addItemThrowsWhenReplyHasNeitherTypedPayloadNorReturnValue() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(request.getCommand().getKind())
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(harness.client(), "s");
|
||||
assertThrows(MxGatewayException.class, () -> session.addItem(1, "Tag"));
|
||||
assertThrows(MxGatewayException.class, () -> session.addItem2(1, "Tag", "ctx"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void addItemStillHonoursReturnValueFallback() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(request.getCommand().getKind())
|
||||
.setProtocolStatus(ok())
|
||||
.setReturnValue(mxaccess_gateway.v1.MxaccessGateway.MxValue.newBuilder()
|
||||
.setInt32Value(99))
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(harness.client(), "s");
|
||||
assertEquals(99, session.addItem(1, "Tag"));
|
||||
}
|
||||
}
|
||||
|
||||
// --- Client.Java-005: close() must not mask the primary try-with-resources error ---
|
||||
|
||||
@Test
|
||||
void closeSuppressesCloseTimeFailureInsteadOfMaskingBodyException() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void closeSession(CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
|
||||
responseObserver.onError(io.grpc.Status.UNAVAILABLE
|
||||
.withDescription("WORKER_UNAVAILABLE")
|
||||
.asRuntimeException());
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
IllegalStateException bodyError = assertThrows(IllegalStateException.class, () -> {
|
||||
try (MxGatewaySession session = MxGatewaySession.forSessionId(harness.client(), "s")) {
|
||||
throw new IllegalStateException("body failure");
|
||||
}
|
||||
});
|
||||
// The body exception must propagate; the close-time RPC failure must not replace it.
|
||||
assertEquals("body failure", bodyError.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void closeRawStillSurfacesCloseTimeFailureForCallersWhoWantIt() throws Exception {
|
||||
TestService service = new TestService() {
|
||||
@Override
|
||||
public void closeSession(CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
|
||||
responseObserver.onError(io.grpc.Status.UNAVAILABLE
|
||||
.withDescription("WORKER_UNAVAILABLE")
|
||||
.asRuntimeException());
|
||||
}
|
||||
};
|
||||
|
||||
try (Harness harness = Harness.start(service)) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(harness.client(), "s");
|
||||
assertThrows(MxGatewayException.class, session::closeRaw);
|
||||
}
|
||||
}
|
||||
|
||||
private static mxaccess_gateway.v1.MxaccessGateway.MxEvent testEvent(int sequence) {
|
||||
return mxaccess_gateway.v1.MxaccessGateway.MxEvent.newBuilder()
|
||||
.setWorkerSequence(sequence)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ProtocolStatus ok() {
|
||||
return ProtocolStatus.newBuilder()
|
||||
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static class TestService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase {
|
||||
@Override
|
||||
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
|
||||
responseObserver.onNext(OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-java")
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeSession(CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
|
||||
responseObserver.onNext(CloseSessionReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_UNSPECIFIED)
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
private record Harness(Server server, ManagedChannel channel, MxGatewayClient client) implements AutoCloseable {
|
||||
static Harness start(MxAccessGatewayGrpc.MxAccessGatewayImplBase service) throws Exception {
|
||||
String name = "mxgw-medium-" + UUID.randomUUID();
|
||||
Server server = InProcessServerBuilder.forName(name)
|
||||
.directExecutor()
|
||||
.addService(service)
|
||||
.build()
|
||||
.start();
|
||||
ManagedChannel channel = InProcessChannelBuilder.forName(name).directExecutor().build();
|
||||
MxGatewayClient client = new MxGatewayClient(
|
||||
channel,
|
||||
MxGatewayClientOptions.builder()
|
||||
.endpoint("in-process")
|
||||
.apiKey("")
|
||||
.plaintext(true)
|
||||
.callTimeout(Duration.ofSeconds(5))
|
||||
.build());
|
||||
return new Harness(server, channel, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
channel.shutdownNow();
|
||||
server.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `3cc53a8` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 12 |
|
||||
| Open findings | 7 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -33,13 +33,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Security |
|
||||
| Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySecrets.java:30-32` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `redactApiKey` preserves the leading and trailing four characters of the key. A gateway API key has the form `mxgw_<key-id>_<secret>`; the last four characters belong to the secret portion, so the "redacted" form leaks 4 characters of the actual secret into logs, CLI JSON output (`CommonOptions.redactedJsonMap`), and `MxGatewayClientOptions.toString()`. CLAUDE.md states API keys must never reach logs.
|
||||
|
||||
**Recommendation:** Redact the secret entirely. Show only a stable non-secret prefix (e.g. the `mxgw_<key-id>_` portion) and mask everything after it, or emit a fixed `mxgw_***` form. Do not echo any trailing characters of the secret.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** (2026-05-18) Confirmed against source: the old `substring(0,4) + stars + substring(len-4)` echoed the last four secret characters. `redactApiKey` now masks the secret entirely: for gateway-shaped keys it returns the non-secret `mxgw_<key-id>_` prefix followed by `***` (locating the secret separator as the first `_` after `mxgw_`); any non-gateway-shaped token returns `<redacted>`. No leading/trailing secret characters are ever emitted. The pre-existing `MxGatewayCliTests.openSessionJsonRedactsApiKey` assertion that hardcoded the leaky `mxgw***********cret` form was corrected to assert the masked `mxgw_visible_***` form. Regression tests: `MxGatewayMediumFindingsTests.redactApiKeyDoesNotLeakAnyCharacterOfTheSecret`, `redactApiKeyForNonGatewayShapedKeyRevealsNothing`, `redactApiKeyStillHandlesNullAndShortInput`.
|
||||
|
||||
### Client.Java-002
|
||||
|
||||
@@ -48,13 +48,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java:31,66-92` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The `next` field is a plain (non-volatile) instance field, and `MxEventStream` exposes no thread-confinement guarantee. More concretely, a queue-overflow `offer()` and a `close()` `offer(END)` can interleave so the overflow exception is enqueued after `END` and never observed — the contract that "next() throws after overflow" is not guaranteed once `close()` has been called.
|
||||
|
||||
**Recommendation:** Document single-consumer-thread usage explicitly in the Javadoc, and serialise terminal state transitions (overflow vs END vs close) behind a single guarded flag so the first terminal condition wins deterministically.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** (2026-05-18) Confirmed against source: the old `offer()` END-branch did `queue.clear(); queue.offer(END)` when full, so a `close()` arriving after an overflow wiped the already-enqueued overflow exception, leaving the consumer with a clean end-of-stream and the overflow silently lost. Terminal transitions are now serialised through a single `terminate(MxGatewayException)` method guarded by a `terminated` flag and a `terminalLock`; the first terminal condition wins and a later `close()`/`END` cannot overwrite a published overflow fault. The Javadoc now explicitly documents that the iterator methods are single-consumer-only while `close()` is safe from any thread. Regression tests: `MxGatewayMediumFindingsTests.eventStreamOverflowExceptionSurvivesASubsequentClose` (deterministic) and `eventStreamConcurrentOverflowAndCloseAlwaysTerminate` (300-iteration race stress).
|
||||
|
||||
### Client.Java-003
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | mxaccessgw conventions |
|
||||
| Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java:119-140` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `OpenSessionReply` carries `gateway_protocol_version` (proto field 8), and `MxGatewayClientVersion.GATEWAY_PROTOCOL_VERSION` exists so the client can reject incompatible generated-code inputs. The client never reads `reply.getGatewayProtocolVersion()` nor compares it against the compiled-in version. A client built against an older/newer contract issues commands blindly and fails with confusing downstream errors instead of a clear version-mismatch failure.
|
||||
|
||||
**Recommendation:** In `openSession`/`openSessionRaw`, compare `reply.getGatewayProtocolVersion()` with `MxGatewayClientVersion.gatewayProtocolVersion()` and throw a typed `MxGatewayException` on mismatch.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** (2026-05-18) Confirmed against source: neither `openSessionRaw` nor `openSessionAsync` read `getGatewayProtocolVersion()`. Added a private `ensureGatewayProtocolCompatible` helper, called from both `openSessionRaw` and `openSessionAsync`, that throws `MxGatewayException` with a clear mismatch message when the gateway reports a non-zero version differing from `MxGatewayClientVersion.gatewayProtocolVersion()`. A gateway that leaves the field unset (value 0, e.g. an older gateway) is accepted unchanged for backward compatibility. `clients/java/README.md` documents the new fail-fast check. Regression tests: `MxGatewayMediumFindingsTests.openSessionRejectsIncompatibleGatewayProtocolVersion` and `openSessionAcceptsMatchingOrUnsetGatewayProtocolVersion`.
|
||||
|
||||
### Client.Java-004
|
||||
|
||||
@@ -78,13 +78,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:114-120,157-163,191-197` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `register`, `addItem`, and `addItem2` check `reply.hasRegister()`/`hasAddItem()` and otherwise fall back to `reply.getReturnValue().getInt32Value()`. If the gateway returns a reply with neither the typed payload nor a `return_value` set, the method silently returns `0` — indistinguishable from a legitimate handle of 0. This masks a contract violation rather than surfacing it.
|
||||
|
||||
**Recommendation:** If the expected typed payload is absent and no `return_value` is present, throw `MxGatewayException` (protocol violation) instead of returning `0`.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** (2026-05-18) Confirmed against source: all three methods returned `reply.getReturnValue().getInt32Value()` (which yields `0` for an unset message field) when the typed payload was absent. Each method now guards the fallback with `reply.hasReturnValue()` and throws `MxGatewayException` describing the protocol violation when neither the typed payload nor a `return_value` is present. The legitimate `return_value` fallback is preserved. Regression tests: `MxGatewayMediumFindingsTests.registerThrowsWhenReplyHasNeitherTypedPayloadNorReturnValue`, `addItemThrowsWhenReplyHasNeitherTypedPayloadNorReturnValue`, and `addItemStillHonoursReturnValueFallback`.
|
||||
|
||||
### Client.Java-005
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewaySession.java:92-105` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `close()` delegates to `closeRaw()`, which performs a network RPC. When `MxGatewaySession` is used in try-with-resources and the body throws, a failure inside `closeSession` (e.g. `WORKER_UNAVAILABLE`) throws from `close()` and replaces the original exception as the propagated throwable (the body exception becomes a suppressed exception) — a known try-with-resources footgun for I/O-performing `close()`.
|
||||
|
||||
**Recommendation:** Either make `close()` swallow/log close-time failures (keeping `closeRaw()` for callers who want the result), or document clearly that `close()` performs a network call that can throw.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** (2026-05-18) Confirmed against source: `close()` called `closeRaw()` directly, so a `CloseSession` RPC failure propagated out of try-with-resources and replaced the body exception. `close()` now catches `MxGatewayException` from `closeRaw()` and logs it at WARNING via `System.Logger` instead of rethrowing, so a close-time failure never masks the body exception. `closeRaw()` is unchanged and still throws for callers who want to observe the close result. The behavior change and the recommendation to use `closeRaw()` for explicit close handling are documented in `clients/java/README.md` and the `close()` Javadoc. Regression tests: `MxGatewayMediumFindingsTests.closeSuppressesCloseTimeFailureInsteadOfMaskingBodyException` and `closeRawStillSurfacesCloseTimeFailureForCallersWhoWantIt`.
|
||||
|
||||
### Client.Java-006
|
||||
|
||||
|
||||
Reference in New Issue
Block a user