client/java: avoid holding monitor across BrowseChildren RPC in expand
This commit is contained in:
+84
-9
@@ -4,6 +4,9 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
|
|||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
|
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
|
||||||
@@ -16,7 +19,14 @@ public final class LazyBrowseNode {
|
|||||||
private final GalaxyObject object;
|
private final GalaxyObject object;
|
||||||
private final boolean hasChildrenHint;
|
private final boolean hasChildrenHint;
|
||||||
private final BrowseChildrenOptions options;
|
private final BrowseChildrenOptions options;
|
||||||
private final Object lock = new Object();
|
|
||||||
|
// expandLock gates the start of a new expand AND the publish of the in-flight
|
||||||
|
// future. Readers (getChildren / isExpanded) use a separate read-write lock so
|
||||||
|
// they never block on the gRPC call.
|
||||||
|
private final Object expandLock = new Object();
|
||||||
|
private CompletableFuture<Void> inFlight;
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||||
private List<LazyBrowseNode> children = Collections.emptyList();
|
private List<LazyBrowseNode> children = Collections.emptyList();
|
||||||
private boolean isExpanded;
|
private boolean isExpanded;
|
||||||
|
|
||||||
@@ -43,15 +53,21 @@ public final class LazyBrowseNode {
|
|||||||
|
|
||||||
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
|
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
|
||||||
public List<LazyBrowseNode> getChildren() {
|
public List<LazyBrowseNode> getChildren() {
|
||||||
synchronized (lock) {
|
readWriteLock.readLock().lock();
|
||||||
|
try {
|
||||||
return List.copyOf(children);
|
return List.copyOf(children);
|
||||||
|
} finally {
|
||||||
|
readWriteLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return {@code true} after the first {@link #expand()} call completes. */
|
/** @return {@code true} after the first {@link #expand()} call completes. */
|
||||||
public boolean isExpanded() {
|
public boolean isExpanded() {
|
||||||
synchronized (lock) {
|
readWriteLock.readLock().lock();
|
||||||
|
try {
|
||||||
return isExpanded;
|
return isExpanded;
|
||||||
|
} finally {
|
||||||
|
readWriteLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,17 +75,76 @@ public final class LazyBrowseNode {
|
|||||||
* Fetches direct children from the gateway and populates {@link #getChildren()}.
|
* Fetches direct children from the gateway and populates {@link #getChildren()}.
|
||||||
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
|
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
|
||||||
*
|
*
|
||||||
|
* <p>Concurrent callers coalesce onto a single in-flight RPC: the first caller
|
||||||
|
* (the "leader") issues the gRPC call, while any other thread that calls
|
||||||
|
* {@code expand()} during that window blocks on the leader's future and sees
|
||||||
|
* the same result (or the same exception). On failure the in-flight slot is
|
||||||
|
* cleared so a subsequent call can retry.
|
||||||
|
*
|
||||||
|
* <p>Readers ({@link #getChildren()} / {@link #isExpanded()}) take a separate
|
||||||
|
* read lock and are never blocked for the duration of the RPC.
|
||||||
|
*
|
||||||
* @throws MxGatewayException on transport or protocol failure
|
* @throws MxGatewayException on transport or protocol failure
|
||||||
*/
|
*/
|
||||||
public void expand() {
|
public void expand() {
|
||||||
synchronized (lock) {
|
if (isExpanded()) {
|
||||||
if (isExpanded) {
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Void> future;
|
||||||
|
boolean iAmTheLeader;
|
||||||
|
synchronized (expandLock) {
|
||||||
|
if (isExpanded()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
List<LazyBrowseNode> loaded =
|
if (inFlight != null) {
|
||||||
client.browseChildrenInner(Integer.valueOf(object.getGobjectId()), options);
|
future = inFlight;
|
||||||
this.children = loaded;
|
iAmTheLeader = false;
|
||||||
this.isExpanded = true;
|
} else {
|
||||||
|
future = new CompletableFuture<>();
|
||||||
|
inFlight = future;
|
||||||
|
iAmTheLeader = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iAmTheLeader) {
|
||||||
|
try {
|
||||||
|
List<LazyBrowseNode> loaded =
|
||||||
|
client.browseChildrenInner(object.getGobjectId(), options);
|
||||||
|
readWriteLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
this.children = loaded;
|
||||||
|
this.isExpanded = true;
|
||||||
|
} finally {
|
||||||
|
readWriteLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
synchronized (expandLock) {
|
||||||
|
inFlight = null;
|
||||||
|
}
|
||||||
|
future.complete(null);
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
synchronized (expandLock) {
|
||||||
|
inFlight = null;
|
||||||
|
}
|
||||||
|
future.completeExceptionally(ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new MxGatewayException("Interrupted waiting for browse-children expand.", ie);
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
Throwable cause = ee.getCause();
|
||||||
|
if (cause instanceof MxGatewayException me) {
|
||||||
|
throw me;
|
||||||
|
}
|
||||||
|
if (cause instanceof RuntimeException re) {
|
||||||
|
throw re;
|
||||||
|
}
|
||||||
|
throw new MxGatewayException("BrowseChildren expand failed.", cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+91
-1
@@ -40,9 +40,14 @@ import java.util.List;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -466,6 +471,91 @@ final class GalaxyRepositoryClientTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void browseExpandConcurrentCallersOnlyFireOneRpc() throws Exception {
|
||||||
|
// Verifies that concurrent expand() calls coalesce onto a single in-flight
|
||||||
|
// BrowseChildren RPC and that readers (isExpanded/getChildren) are not
|
||||||
|
// blocked for the full RPC duration.
|
||||||
|
BrowseChildrenReply rootsReply = browseReply(
|
||||||
|
List.of(obj(1, "Plant", true)),
|
||||||
|
List.of(true),
|
||||||
|
7L,
|
||||||
|
"");
|
||||||
|
BrowseChildrenReply childrenReply = browseReply(
|
||||||
|
List.of(obj(2, "Mixer_001", false)),
|
||||||
|
List.of(false),
|
||||||
|
7L,
|
||||||
|
"");
|
||||||
|
|
||||||
|
// Gate the child fetch behind a latch so multiple expanders can pile up.
|
||||||
|
CountDownLatch release = new CountDownLatch(1);
|
||||||
|
AtomicInteger childCalls = new AtomicInteger();
|
||||||
|
BrowseChildrenService service = new BrowseChildrenService() {
|
||||||
|
@Override
|
||||||
|
public void browseChildren(
|
||||||
|
BrowseChildrenRequest request, StreamObserver<BrowseChildrenReply> responseObserver) {
|
||||||
|
calls.add(request);
|
||||||
|
BrowseChildrenReply reply;
|
||||||
|
if (!request.hasParentGobjectId()) {
|
||||||
|
reply = rootsReply;
|
||||||
|
} else {
|
||||||
|
// Block the leader until the followers have arrived.
|
||||||
|
try {
|
||||||
|
assertTrue(release.await(5, TimeUnit.SECONDS), "release latch never tripped");
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
responseObserver.onError(Status.CANCELLED.asRuntimeException());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
childCalls.incrementAndGet();
|
||||||
|
reply = childrenReply;
|
||||||
|
}
|
||||||
|
responseObserver.onNext(reply);
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
|
||||||
|
GalaxyRepositoryClient client = g.client("")) {
|
||||||
|
List<LazyBrowseNode> roots = client.browse();
|
||||||
|
LazyBrowseNode root = roots.get(0);
|
||||||
|
|
||||||
|
int parallelism = 10;
|
||||||
|
ExecutorService pool = Executors.newFixedThreadPool(parallelism);
|
||||||
|
try {
|
||||||
|
CountDownLatch ready = new CountDownLatch(parallelism);
|
||||||
|
List<Future<Void>> futures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < parallelism; i++) {
|
||||||
|
futures.add(pool.submit(() -> {
|
||||||
|
ready.countDown();
|
||||||
|
root.expand();
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
// Wait for all callers to be in flight, then release the leader.
|
||||||
|
assertTrue(ready.await(5, TimeUnit.SECONDS), "expander threads did not start");
|
||||||
|
// Readers must not be blocked by an in-flight expand; this should not deadlock
|
||||||
|
// and should return the pre-expand state.
|
||||||
|
assertFalse(root.isExpanded());
|
||||||
|
assertEquals(0, root.getChildren().size());
|
||||||
|
release.countDown();
|
||||||
|
|
||||||
|
for (Future<Void> f : futures) {
|
||||||
|
f.get(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
pool.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(root.isExpanded());
|
||||||
|
assertEquals(1, root.getChildren().size());
|
||||||
|
// Exactly one expand RPC was issued even though many callers raced.
|
||||||
|
assertEquals(1, childCalls.get());
|
||||||
|
// 1 roots fetch + exactly 1 expand fetch.
|
||||||
|
assertEquals(2, service.calls.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void browseWithFilterForwardsToRequest() throws Exception {
|
void browseWithFilterForwardsToRequest() throws Exception {
|
||||||
BrowseChildrenService service = new BrowseChildrenService();
|
BrowseChildrenService service = new BrowseChildrenService();
|
||||||
@@ -507,7 +597,7 @@ final class GalaxyRepositoryClientTests {
|
|||||||
return b.build();
|
return b.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class BrowseChildrenService extends TestService {
|
private static class BrowseChildrenService extends TestService {
|
||||||
final List<BrowseChildrenRequest> calls =
|
final List<BrowseChildrenRequest> calls =
|
||||||
Collections.synchronizedList(new CopyOnWriteArrayList<>());
|
Collections.synchronizedList(new CopyOnWriteArrayList<>());
|
||||||
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
|
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
|
||||||
|
|||||||
Reference in New Issue
Block a user