diff --git a/clients/java/zb-mom-ww-mxgateway-cli/build.gradle b/clients/java/zb-mom-ww-mxgateway-cli/build.gradle index b1730f3..424b73f 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/build.gradle +++ b/clients/java/zb-mom-ww-mxgateway-cli/build.gradle @@ -6,6 +6,9 @@ dependencies { implementation project(':zb-mom-ww-mxgateway-client') implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}" implementation "info.picocli:picocli:${picocliVersion}" + + testImplementation "io.grpc:grpc-inprocess:${grpcVersion}" + testImplementation "io.grpc:grpc-testing:${grpcVersion}" } application { diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/InProcessGatewayHarness.java b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/InProcessGatewayHarness.java new file mode 100644 index 0000000..726fbb5 --- /dev/null +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/InProcessGatewayHarness.java @@ -0,0 +1,216 @@ +package com.zb.mom.ww.mxgateway.cli; + +import com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient; +import com.zb.mom.ww.mxgateway.client.MxGatewayClient; +import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions; +import galaxy_repository.v1.GalaxyRepositoryGrpc; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyReply; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyRequest; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject; +import galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import mxaccess_gateway.v1.MxAccessGatewayGrpc; +import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; +import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; +import mxaccess_gateway.v1.MxaccessGateway.MxEvent; +import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus; +import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; +import mxaccess_gateway.v1.MxaccessGateway.SessionState; + +/** + * Test fixture that stands up an in-process gRPC server hosting scripted fake + * {@code MxAccessGateway} and {@code GalaxyRepository} service implementations, + * so the real Java client types ({@link MxGatewayClient} / + * {@link GalaxyRepositoryClient}) can be driven over a real channel. + * + *

The real streaming wrappers ({@code MxEventStream} / + * {@code DeployEventStream}) have package-private constructors and + * {@link GalaxyRepositoryClient} is {@code final}, so the streaming and galaxy + * CLI commands cannot be exercised through the lightweight {@code FakeSession} + * seam. Driving the real client over an in-process channel against scripted + * services is the clean alternative; Tasks 5 and 6 add the CLI assertions on + * top of this fixture. + * + *

Scripted payloads are settable via constructor args or setters. Each + * instance uses a unique server name so harnesses do not collide. The + * {@code directExecutor()} wiring keeps all dispatch on the calling thread, so + * no background threads are leaked. + */ +final class InProcessGatewayHarness implements AutoCloseable { + private final String serverName; + private final Server server; + private final ManagedChannel channel; + private final FakeGatewayService fakeGateway; + private final FakeGalaxyService fakeGalaxy; + + /** Starts a harness with empty scripted payloads; populate via setters. */ + InProcessGatewayHarness() { + this(List.of(), List.of(), List.of()); + } + + /** + * Starts a harness with the supplied scripted payloads. + * + * @param scriptedEvents events {@code streamEvents} pushes before completing + * @param scriptedObjects objects {@code discoverHierarchy} returns (single page) + * @param scriptedDeployEvents events {@code watchDeployEvents} streams before completing + */ + InProcessGatewayHarness( + List scriptedEvents, + List scriptedObjects, + List scriptedDeployEvents) { + this.serverName = "mxgw-cli-harness-" + UUID.randomUUID(); + this.fakeGateway = new FakeGatewayService(scriptedEvents); + this.fakeGalaxy = new FakeGalaxyService(scriptedObjects, scriptedDeployEvents); + try { + this.server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(fakeGateway) + .addService(fakeGalaxy) + .build() + .start(); + } catch (IOException error) { + throw new IllegalStateException("failed to start in-process gateway harness", error); + } + this.channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + } + + /** Replaces the scripted {@code streamEvents} payload. */ + void setScriptedEvents(List events) { + fakeGateway.scriptedEvents.clear(); + fakeGateway.scriptedEvents.addAll(events); + } + + /** Replaces the scripted {@code discoverHierarchy} payload. */ + void setScriptedObjects(List objects) { + fakeGalaxy.scriptedObjects.clear(); + fakeGalaxy.scriptedObjects.addAll(objects); + } + + /** Replaces the scripted {@code watchDeployEvents} payload. */ + void setScriptedDeployEvents(List deployEvents) { + fakeGalaxy.scriptedDeployEvents.clear(); + fakeGalaxy.scriptedDeployEvents.addAll(deployEvents); + } + + /** + * Returns the in-process channel into the scripted services. + * + * @return the managed channel; lifecycle owned by the harness + */ + ManagedChannel channel() { + return channel; + } + + /** + * Builds a real {@link MxGatewayClient} over the in-process channel. + * + * @return a client borrowing the harness channel + */ + MxGatewayClient gatewayClient() { + return new MxGatewayClient(channel, testOptions()); + } + + /** + * Builds a real {@link GalaxyRepositoryClient} over the in-process channel. + * + * @return a client borrowing the harness channel + */ + GalaxyRepositoryClient galaxyClient() { + return new GalaxyRepositoryClient(channel, testOptions()); + } + + private static MxGatewayClientOptions testOptions() { + return MxGatewayClientOptions.builder() + .endpoint("in-process") + .apiKey("mxgw_test_secret") + .plaintext(true) + .callTimeout(Duration.ofSeconds(5)) + .build(); + } + + @Override + public void close() { + channel.shutdownNow(); + server.shutdownNow(); + } + + private static ProtocolStatus ok() { + return ProtocolStatus.newBuilder() + .setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK) + .build(); + } + + /** Scripted fake of the {@code MxAccessGateway} service. */ + private static final class FakeGatewayService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase { + private final List scriptedEvents = new CopyOnWriteArrayList<>(); + + FakeGatewayService(List scriptedEvents) { + this.scriptedEvents.addAll(scriptedEvents); + } + + @Override + public void streamEvents( + mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request, + StreamObserver responseObserver) { + for (MxEvent event : scriptedEvents) { + responseObserver.onNext(event); + } + responseObserver.onCompleted(); + } + + @Override + public void closeSession( + CloseSessionRequest request, StreamObserver responseObserver) { + responseObserver.onNext(CloseSessionReply.newBuilder() + .setSessionId(request.getSessionId()) + .setFinalState(SessionState.SESSION_STATE_CLOSED) + .setProtocolStatus(ok()) + .build()); + responseObserver.onCompleted(); + } + } + + /** Scripted fake of the {@code GalaxyRepository} service. */ + private static final class FakeGalaxyService extends GalaxyRepositoryGrpc.GalaxyRepositoryImplBase { + private final List scriptedObjects = new CopyOnWriteArrayList<>(); + private final List scriptedDeployEvents = new CopyOnWriteArrayList<>(); + + FakeGalaxyService(List scriptedObjects, List scriptedDeployEvents) { + this.scriptedObjects.addAll(scriptedObjects); + this.scriptedDeployEvents.addAll(scriptedDeployEvents); + } + + @Override + public void discoverHierarchy( + DiscoverHierarchyRequest request, StreamObserver responseObserver) { + List snapshot = new ArrayList<>(scriptedObjects); + responseObserver.onNext(DiscoverHierarchyReply.newBuilder() + .setTotalObjectCount(snapshot.size()) + .addAllObjects(snapshot) + .setNextPageToken("") + .build()); + responseObserver.onCompleted(); + } + + @Override + public void watchDeployEvents( + WatchDeployEventsRequest request, StreamObserver responseObserver) { + for (DeployEvent event : scriptedDeployEvents) { + responseObserver.onNext(event); + } + responseObserver.onCompleted(); + } + } +}