diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/LazyBrowseNode.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/LazyBrowseNode.java index 34ebd21..0cd32ed 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/LazyBrowseNode.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/LazyBrowseNode.java @@ -4,6 +4,9 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * One node in a lazy-loaded Galaxy browse tree. Holds the underlying @@ -16,7 +19,14 @@ public final class LazyBrowseNode { private final GalaxyObject object; private final boolean hasChildrenHint; private final BrowseChildrenOptions options; - private final Object lock = new Object(); + + // expandLock gates the start of a new expand AND the publish of the in-flight + // future. Readers (getChildren / isExpanded) use a separate read-write lock so + // they never block on the gRPC call. + private final Object expandLock = new Object(); + private CompletableFuture inFlight; + + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private List children = Collections.emptyList(); private boolean isExpanded; @@ -43,15 +53,21 @@ public final class LazyBrowseNode { /** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */ public List getChildren() { - synchronized (lock) { + readWriteLock.readLock().lock(); + try { return List.copyOf(children); + } finally { + readWriteLock.readLock().unlock(); } } /** @return {@code true} after the first {@link #expand()} call completes. */ public boolean isExpanded() { - synchronized (lock) { + readWriteLock.readLock().lock(); + try { return isExpanded; + } finally { + readWriteLock.readLock().unlock(); } } @@ -59,17 +75,76 @@ public final class LazyBrowseNode { * Fetches direct children from the gateway and populates {@link #getChildren()}. * Idempotent: subsequent calls are no-ops and do not issue a second RPC. * + *

Concurrent callers coalesce onto a single in-flight RPC: the first caller + * (the "leader") issues the gRPC call, while any other thread that calls + * {@code expand()} during that window blocks on the leader's future and sees + * the same result (or the same exception). On failure the in-flight slot is + * cleared so a subsequent call can retry. + * + *

Readers ({@link #getChildren()} / {@link #isExpanded()}) take a separate + * read lock and are never blocked for the duration of the RPC. + * * @throws MxGatewayException on transport or protocol failure */ public void expand() { - synchronized (lock) { - if (isExpanded) { + if (isExpanded()) { + return; + } + + CompletableFuture future; + boolean iAmTheLeader; + synchronized (expandLock) { + if (isExpanded()) { return; } - List loaded = - client.browseChildrenInner(Integer.valueOf(object.getGobjectId()), options); - this.children = loaded; - this.isExpanded = true; + if (inFlight != null) { + future = inFlight; + iAmTheLeader = false; + } else { + future = new CompletableFuture<>(); + inFlight = future; + iAmTheLeader = true; + } + } + + if (iAmTheLeader) { + try { + List loaded = + client.browseChildrenInner(object.getGobjectId(), options); + readWriteLock.writeLock().lock(); + try { + this.children = loaded; + this.isExpanded = true; + } finally { + readWriteLock.writeLock().unlock(); + } + synchronized (expandLock) { + inFlight = null; + } + future.complete(null); + } catch (RuntimeException ex) { + synchronized (expandLock) { + inFlight = null; + } + future.completeExceptionally(ex); + throw ex; + } + } else { + try { + future.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new MxGatewayException("Interrupted waiting for browse-children expand.", ie); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof MxGatewayException me) { + throw me; + } + if (cause instanceof RuntimeException re) { + throw re; + } + throw new MxGatewayException("BrowseChildren expand failed.", cause); + } } } } diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/GalaxyRepositoryClientTests.java b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/GalaxyRepositoryClientTests.java index 88f6f58..4a50636 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/GalaxyRepositoryClientTests.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/test/java/com/zb/mom/ww/mxgateway/client/GalaxyRepositoryClientTests.java @@ -40,9 +40,14 @@ import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.UUID; +import java.util.ArrayList; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; @@ -466,6 +471,91 @@ final class GalaxyRepositoryClientTests { } } + @Test + void browseExpandConcurrentCallersOnlyFireOneRpc() throws Exception { + // Verifies that concurrent expand() calls coalesce onto a single in-flight + // BrowseChildren RPC and that readers (isExpanded/getChildren) are not + // blocked for the full RPC duration. + BrowseChildrenReply rootsReply = browseReply( + List.of(obj(1, "Plant", true)), + List.of(true), + 7L, + ""); + BrowseChildrenReply childrenReply = browseReply( + List.of(obj(2, "Mixer_001", false)), + List.of(false), + 7L, + ""); + + // Gate the child fetch behind a latch so multiple expanders can pile up. + CountDownLatch release = new CountDownLatch(1); + AtomicInteger childCalls = new AtomicInteger(); + BrowseChildrenService service = new BrowseChildrenService() { + @Override + public void browseChildren( + BrowseChildrenRequest request, StreamObserver responseObserver) { + calls.add(request); + BrowseChildrenReply reply; + if (!request.hasParentGobjectId()) { + reply = rootsReply; + } else { + // Block the leader until the followers have arrived. + try { + assertTrue(release.await(5, TimeUnit.SECONDS), "release latch never tripped"); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + responseObserver.onError(Status.CANCELLED.asRuntimeException()); + return; + } + childCalls.incrementAndGet(); + reply = childrenReply; + } + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + + try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>()); + GalaxyRepositoryClient client = g.client("")) { + List roots = client.browse(); + LazyBrowseNode root = roots.get(0); + + int parallelism = 10; + ExecutorService pool = Executors.newFixedThreadPool(parallelism); + try { + CountDownLatch ready = new CountDownLatch(parallelism); + List> futures = new ArrayList<>(); + for (int i = 0; i < parallelism; i++) { + futures.add(pool.submit(() -> { + ready.countDown(); + root.expand(); + return null; + })); + } + // Wait for all callers to be in flight, then release the leader. + assertTrue(ready.await(5, TimeUnit.SECONDS), "expander threads did not start"); + // Readers must not be blocked by an in-flight expand; this should not deadlock + // and should return the pre-expand state. + assertFalse(root.isExpanded()); + assertEquals(0, root.getChildren().size()); + release.countDown(); + + for (Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + } finally { + pool.shutdownNow(); + } + + assertTrue(root.isExpanded()); + assertEquals(1, root.getChildren().size()); + // Exactly one expand RPC was issued even though many callers raced. + assertEquals(1, childCalls.get()); + // 1 roots fetch + exactly 1 expand fetch. + assertEquals(2, service.calls.size()); + } + } + @Test void browseWithFilterForwardsToRequest() throws Exception { BrowseChildrenService service = new BrowseChildrenService(); @@ -507,7 +597,7 @@ final class GalaxyRepositoryClientTests { return b.build(); } - private static final class BrowseChildrenService extends TestService { + private static class BrowseChildrenService extends TestService { final List calls = Collections.synchronizedList(new CopyOnWriteArrayList<>()); final Queue replies = new ArrayDeque<>();