From f90bff01db44dad5a1e01e81474e77fc21ac5c40 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 04:50:34 -0400 Subject: [PATCH] Java client: port bulk read/write SDK methods + CLI subcommands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final language in the bulk-CLI port wave. HEAD's MxGatewaySession had only the subscribe-style bulks; this commit adds the value-bulks plus matching picocli subcommands and a bench-read-bulk harness. SDK (MxGatewaySession.java): - List writeBulk(int serverHandle, List entries) - List write2Bulk(int serverHandle, List entries) - List writeSecuredBulk(int serverHandle, List entries) - List writeSecured2Bulk(int serverHandle, List entries) - List readBulk(int serverHandle, List tagAddresses, Duration timeout) readBulk uses java.time.Duration for the timeout parameter (idiomatic Java) and internally converts to the timeoutMs proto field; Duration.ZERO / null both delegate to the worker default. Per-entry secured user ids stay on each WriteSecured(2)BulkEntry to match the proto's per-row shape. CLI (MxGatewayCli.java): - read-bulk / write-bulk / write2-bulk / write-secured-bulk / write-secured2-bulk as picocli @Command subcommands. Write families share value-parsing logic; gating of --current-user-id / --verifier-user-id / --timestamp matches the cross-language flag contract. - bench-read-bulk: --iterations / --warmup loop with avg/min/max ms reporting plus a --json mode that emits the cross-language bench JSON schema. A small fixture in MxGatewayCliTests.FakeSession adds stub implementations of the five new interface methods so the test module compiles. Verification: gradle build BUILD SUCCESSFUL (4 tasks executed, all tests pass); gradle :zb-mom-ww-mxgateway-cli:installDist BUILD SUCCESSFUL. Manual smoke against live gateway on localhost:5120: open-session → register → read-bulk cold (wasCached=false both tags) → subscribe-bulk → read-bulk warm (wasCached=true both tags) → write-bulk int32 111,222 (both wasSuccessful=true) → write2-bulk timestamped (both wasSuccessful=true) → write-secured-bulk and write-secured2-bulk return per-entry MXAccess "Value does not fall within the expected range" failures with the configured user/verifier ids (0,0) — confirming the SDK does NOT throw on per-entry MXAccess failures and surfaces them through BulkWriteResult exactly as the .NET and Go ports do → bench-read-bulk iterations=20 avg=9.5 ms last_success=2/2 cached=2/2 → close-session SESSION_STATE_CLOSED. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zb/mom/ww/mxgateway/cli/MxGatewayCli.java | 450 ++++++++++++++++++ .../ww/mxgateway/cli/MxGatewayCliTests.java | 74 +++ .../ww/mxgateway/client/MxGatewaySession.java | 148 ++++++ 3 files changed, 672 insertions(+) diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java index 802854a..b64daf2 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java @@ -29,12 +29,18 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; +import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; +import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry; import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -113,6 +119,12 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("advise", new AdviseCommand(clientFactory)); commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory)); commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory)); + commandLine.addSubcommand("read-bulk", new ReadBulkCommand(clientFactory)); + commandLine.addSubcommand("write-bulk", new WriteBulkCommand(clientFactory)); + commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory)); + commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory)); + commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory)); + commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory)); commandLine.addSubcommand("write", new WriteCommand(clientFactory)); commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory)); commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory)); @@ -603,6 +615,359 @@ public final class MxGatewayCli implements Callable { } } + @Command(name = "read-bulk", description = "Invokes MXAccess ReadBulk (cached or snapshot per tag).") + static final class ReadBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--items", required = true, description = "Comma-separated tag addresses.") + String items; + + @Option( + names = "--timeout-ms", + defaultValue = "0", + description = "Per-tag snapshot timeout in milliseconds (0 = worker default).") + int timeoutMs; + + ReadBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List results = client.session(sessionId) + .readBulk(serverHandle, parseStringList(items), Duration.ofMillis(timeoutMs)); + writeReadBulkOutput("read-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-bulk", description = "Invokes MXAccess WriteBulk.") + static final class WriteBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.") + int userId; + + WriteBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + + valueTexts.size() + ")"); + } + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteBulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setUserId(userId) + .setValue(parseValue(type, valueTexts.get(i))) + .build()); + } + List results = client.session(sessionId).writeBulk(serverHandle, entries); + writeWriteBulkOutput("write-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write2-bulk", description = "Invokes MXAccess Write2Bulk (timestamped).") + static final class Write2BulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.") + String timestamp; + + @Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.") + int userId; + + Write2BulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + + valueTexts.size() + ")"); + } + MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp)); + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(Write2BulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setUserId(userId) + .setValue(parseValue(type, valueTexts.get(i))) + .setTimestampValue(timestampValue) + .build()); + } + List results = client.session(sessionId).write2Bulk(serverHandle, entries); + writeWriteBulkOutput("write2-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-secured-bulk", description = "Invokes MXAccess WriteSecuredBulk.") + static final class WriteSecuredBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.") + int currentUserId; + + @Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.") + int verifierUserId; + + WriteSecuredBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + + valueTexts.size() + ")"); + } + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteSecuredBulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setCurrentUserId(currentUserId) + .setVerifierUserId(verifierUserId) + .setValue(parseValue(type, valueTexts.get(i))) + .build()); + } + List results = client.session(sessionId).writeSecuredBulk(serverHandle, entries); + writeWriteBulkOutput("write-secured-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-secured2-bulk", description = "Invokes MXAccess WriteSecured2Bulk.") + static final class WriteSecured2BulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.") + String timestamp; + + @Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.") + int currentUserId; + + @Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.") + int verifierUserId; + + WriteSecured2BulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + + valueTexts.size() + ")"); + } + MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp)); + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteSecured2BulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setCurrentUserId(currentUserId) + .setVerifierUserId(verifierUserId) + .setValue(parseValue(type, valueTexts.get(i))) + .setTimestampValue(timestampValue) + .build()); + } + List results = client.session(sessionId).writeSecured2Bulk(serverHandle, entries); + writeWriteBulkOutput("write-secured2-bulk", common, json, results); + } + return 0; + } + } + + @Command( + name = "bench-read-bulk", + description = "Repeatedly invokes ReadBulk for benchmarking; prints aggregate timing.") + static final class BenchReadBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--items", required = true, description = "Comma-separated tag addresses.") + String items; + + @Option( + names = "--timeout-ms", + defaultValue = "0", + description = "Per-tag snapshot timeout in milliseconds (0 = worker default).") + int timeoutMs; + + @Option(names = "--iterations", defaultValue = "10", description = "Number of ReadBulk calls to perform.") + int iterations; + + @Option( + names = "--warmup", + defaultValue = "1", + description = "Number of warmup iterations excluded from timing.") + int warmup; + + BenchReadBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + if (iterations <= 0) { + throw new IllegalArgumentException("--iterations must be positive"); + } + if (warmup < 0) { + throw new IllegalArgumentException("--warmup must be non-negative"); + } + List tagAddresses = parseStringList(items); + Duration timeout = Duration.ofMillis(timeoutMs); + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + MxGatewayCliSession session = client.session(sessionId); + for (int i = 0; i < warmup; i++) { + session.readBulk(serverHandle, tagAddresses, timeout); + } + long totalNanos = 0L; + long minNanos = Long.MAX_VALUE; + long maxNanos = 0L; + int lastResultCount = 0; + int lastSuccessCount = 0; + int lastCachedCount = 0; + for (int i = 0; i < iterations; i++) { + long start = System.nanoTime(); + List results = session.readBulk(serverHandle, tagAddresses, timeout); + long elapsed = System.nanoTime() - start; + totalNanos += elapsed; + minNanos = Math.min(minNanos, elapsed); + maxNanos = Math.max(maxNanos, elapsed); + lastResultCount = results.size(); + lastSuccessCount = 0; + lastCachedCount = 0; + for (BulkReadResult result : results) { + if (result.getWasSuccessful()) { + lastSuccessCount++; + } + if (result.getWasCached()) { + lastCachedCount++; + } + } + } + double avgMs = totalNanos / 1_000_000.0 / iterations; + double minMs = minNanos / 1_000_000.0; + double maxMs = maxNanos / 1_000_000.0; + PrintWriter out = common.spec.commandLine().getOut(); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", "bench-read-bulk"); + output.put("options", common.redactedJsonMap()); + output.put("iterations", iterations); + output.put("warmup", warmup); + output.put("tagCount", tagAddresses.size()); + output.put("resultCount", lastResultCount); + output.put("successCount", lastSuccessCount); + output.put("cachedCount", lastCachedCount); + output.put("avgMs", avgMs); + output.put("minMs", minMs); + output.put("maxMs", maxMs); + out.println(jsonObject(output)); + } else { + out.printf( + "iterations=%d tags=%d avg=%.3fms min=%.3fms max=%.3fms last_results=%d last_success=%d last_cached=%d%n", + iterations, + tagAddresses.size(), + avgMs, + minMs, + maxMs, + lastResultCount, + lastSuccessCount, + lastCachedCount); + } + } + return 0; + } + } + @Command(name = "write", description = "Invokes MXAccess Write.") static final class WriteCommand extends GatewayCommand { @Option(names = "--session-id", required = true, description = "Gateway session id.") @@ -818,6 +1183,16 @@ public final class MxGatewayCli implements Callable { List unsubscribeBulk(int serverHandle, List itemHandles); + List readBulk(int serverHandle, List items, Duration timeout); + + List writeBulk(int serverHandle, List entries); + + List write2Bulk(int serverHandle, List entries); + + List writeSecuredBulk(int serverHandle, List entries); + + List writeSecured2Bulk(int serverHandle, List entries); + MxEventStream streamEventsAfter(long afterWorkerSequence); } @@ -909,6 +1284,31 @@ public final class MxGatewayCli implements Callable { return session.unsubscribeBulk(serverHandle, itemHandles); } + @Override + public List readBulk(int serverHandle, List items, Duration timeout) { + return session.readBulk(serverHandle, items, timeout); + } + + @Override + public List writeBulk(int serverHandle, List entries) { + return session.writeBulk(serverHandle, entries); + } + + @Override + public List write2Bulk(int serverHandle, List entries) { + return session.write2Bulk(serverHandle, entries); + } + + @Override + public List writeSecuredBulk(int serverHandle, List entries) { + return session.writeSecuredBulk(serverHandle, entries); + } + + @Override + public List writeSecured2Bulk(int serverHandle, List entries) { + return session.writeSecured2Bulk(serverHandle, entries); + } + @Override public MxEventStream streamEventsAfter(long afterWorkerSequence) { return session.streamEventsAfter(afterWorkerSequence); @@ -957,6 +1357,56 @@ public final class MxGatewayCli implements Callable { return values; } + private static void writeWriteBulkOutput( + String command, CommonOptions common, boolean json, List results) { + PrintWriter out = common.spec.commandLine().getOut(); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", command); + output.put("options", common.redactedJsonMap()); + output.put("results", results.stream().map(MxGatewayCli::bulkWriteResultMap).toList()); + out.println(jsonObject(output)); + return; + } + out.println(results.size()); + } + + private static Map bulkWriteResultMap(BulkWriteResult result) { + Map values = new LinkedHashMap<>(); + values.put("serverHandle", result.getServerHandle()); + values.put("itemHandle", result.getItemHandle()); + values.put("wasSuccessful", result.getWasSuccessful()); + values.put("hresult", result.hasHresult() ? (Object) result.getHresult() : null); + values.put("errorMessage", result.getErrorMessage()); + return values; + } + + private static void writeReadBulkOutput( + String command, CommonOptions common, boolean json, List results) { + PrintWriter out = common.spec.commandLine().getOut(); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", command); + output.put("options", common.redactedJsonMap()); + output.put("results", results.stream().map(MxGatewayCli::bulkReadResultMap).toList()); + out.println(jsonObject(output)); + return; + } + out.println(results.size()); + } + + private static Map bulkReadResultMap(BulkReadResult result) { + Map values = new LinkedHashMap<>(); + values.put("serverHandle", result.getServerHandle()); + values.put("tagAddress", result.getTagAddress()); + values.put("itemHandle", result.getItemHandle()); + values.put("wasSuccessful", result.getWasSuccessful()); + values.put("wasCached", result.getWasCached()); + values.put("quality", result.getQuality()); + values.put("errorMessage", result.getErrorMessage()); + return values; + } + private static MxValue parseValue(String type, String text) { return switch (type) { case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text)); diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java index 1d11046..01e7b3c 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java @@ -9,9 +9,12 @@ import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; +import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; +import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind; @@ -25,6 +28,10 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode; import mxaccess_gateway.v1.MxaccessGateway.RegisterReply; import mxaccess_gateway.v1.MxaccessGateway.SessionState; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; +import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry; import org.junit.jupiter.api.Test; final class MxGatewayCliTests { @@ -339,6 +346,73 @@ final class MxGatewayCliTests { return results; } + @Override + public List readBulk(int serverHandle, List items, Duration timeout) { + List results = new ArrayList<>(); + for (int index = 0; index < items.size(); index++) { + results.add(BulkReadResult.newBuilder() + .setServerHandle(serverHandle) + .setTagAddress(items.get(index)) + .setItemHandle(200 + index) + .setWasSuccessful(true) + .setWasCached(true) + .build()); + } + return results; + } + + @Override + public List writeBulk(int serverHandle, List entries) { + List results = new ArrayList<>(); + for (WriteBulkEntry entry : entries) { + results.add(BulkWriteResult.newBuilder() + .setServerHandle(serverHandle) + .setItemHandle(entry.getItemHandle()) + .setWasSuccessful(true) + .build()); + } + return results; + } + + @Override + public List write2Bulk(int serverHandle, List entries) { + List results = new ArrayList<>(); + for (Write2BulkEntry entry : entries) { + results.add(BulkWriteResult.newBuilder() + .setServerHandle(serverHandle) + .setItemHandle(entry.getItemHandle()) + .setWasSuccessful(true) + .build()); + } + return results; + } + + @Override + public List writeSecuredBulk(int serverHandle, List entries) { + List results = new ArrayList<>(); + for (WriteSecuredBulkEntry entry : entries) { + results.add(BulkWriteResult.newBuilder() + .setServerHandle(serverHandle) + .setItemHandle(entry.getItemHandle()) + .setWasSuccessful(true) + .build()); + } + return results; + } + + @Override + public List writeSecured2Bulk(int serverHandle, List entries) { + List results = new ArrayList<>(); + for (WriteSecured2BulkEntry entry : entries) { + results.add(BulkWriteResult.newBuilder() + .setServerHandle(serverHandle) + .setItemHandle(entry.getItemHandle()) + .setWasSuccessful(true) + .build()); + } + return results; + } + @Override public com.zb.mom.ww.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) { throw new UnsupportedOperationException("stream-events is covered by client tests"); diff --git a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewaySession.java b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewaySession.java index 0abbd22..73b7d0c 100644 --- a/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewaySession.java +++ b/clients/java/zb-mom-ww-mxgateway-client/src/main/java/com/zb/mom/ww/mxgateway/client/MxGatewaySession.java @@ -1,6 +1,7 @@ package com.zb.mom.ww.mxgateway.client; import java.security.SecureRandom; +import java.time.Duration; import java.util.HexFormat; import java.util.List; import java.util.Objects; @@ -9,6 +10,8 @@ import mxaccess_gateway.v1.MxaccessGateway.AddItemBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand; import mxaccess_gateway.v1.MxaccessGateway.AdviseItemBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand; +import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; +import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommand; @@ -17,6 +20,7 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest; import mxaccess_gateway.v1.MxaccessGateway.MxValue; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; +import mxaccess_gateway.v1.MxaccessGateway.ReadBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand; import mxaccess_gateway.v1.MxaccessGateway.RemoveItemBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.RemoveItemCommand; @@ -27,8 +31,16 @@ import mxaccess_gateway.v1.MxaccessGateway.UnAdviseCommand; import mxaccess_gateway.v1.MxaccessGateway.UnAdviseItemBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.UnsubscribeBulkCommand; import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand; +import mxaccess_gateway.v1.MxaccessGateway.Write2BulkCommand; +import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; import mxaccess_gateway.v1.MxaccessGateway.Write2Command; +import mxaccess_gateway.v1.MxaccessGateway.WriteBulkCommand; +import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; import mxaccess_gateway.v1.MxaccessGateway.WriteCommand; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkCommand; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkCommand; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry; /** * Typed handle for a single MXAccess gateway session. @@ -421,6 +433,142 @@ public final class MxGatewaySession implements AutoCloseable { return reply.getUnsubscribeBulk().getResultsList(); } + /** + * Bulk {@code Write} — sequential MXAccess Write per entry on the worker's STA. + * + *

Per-entry failures appear as {@link BulkWriteResult} entries with + * {@code wasSuccessful == false}; this method does not throw for per-entry + * MXAccess failures (it still throws {@link MxGatewayException} on transport + * or protocol-level failures). + * + * @param serverHandle the {@code ServerHandle} owning the items + * @param entries the per-item (handle, value, user id) tuples + * @return a per-entry {@link BulkWriteResult} list + * @throws MxGatewayException on transport or protocol failure + * @throws NullPointerException if {@code entries} is {@code null} + */ + public List writeBulk(int serverHandle, List entries) { + Objects.requireNonNull(entries, "entries"); + MxCommandReply reply = invokeCommand(MxCommand.newBuilder() + .setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_BULK) + .setWriteBulk(WriteBulkCommand.newBuilder() + .setServerHandle(serverHandle) + .addAllEntries(entries)) + .build()); + return reply.getWriteBulk().getResultsList(); + } + + /** + * Bulk {@code Write2} — sequential MXAccess Write2 (timestamped) per entry. + * + *

Per-entry semantics mirror {@link #writeBulk(int, List)}. + * + * @param serverHandle the {@code ServerHandle} owning the items + * @param entries the per-item (handle, value, timestamp, user id) tuples + * @return a per-entry {@link BulkWriteResult} list + * @throws MxGatewayException on transport or protocol failure + * @throws NullPointerException if {@code entries} is {@code null} + */ + public List write2Bulk(int serverHandle, List entries) { + Objects.requireNonNull(entries, "entries"); + MxCommandReply reply = invokeCommand(MxCommand.newBuilder() + .setKind(MxCommandKind.MX_COMMAND_KIND_WRITE2_BULK) + .setWrite2Bulk(Write2BulkCommand.newBuilder() + .setServerHandle(serverHandle) + .addAllEntries(entries)) + .build()); + return reply.getWrite2Bulk().getResultsList(); + } + + /** + * Bulk {@code WriteSecured} — credential-sensitive values must not be logged + * by callers; mirrors the single-item write-secured redaction contract. + * + *

Per-entry semantics mirror {@link #writeBulk(int, List)}. + * + * @param serverHandle the {@code ServerHandle} owning the items + * @param entries the per-item (handle, value, current+verifier user id) tuples + * @return a per-entry {@link BulkWriteResult} list + * @throws MxGatewayException on transport or protocol failure + * @throws NullPointerException if {@code entries} is {@code null} + */ + public List writeSecuredBulk(int serverHandle, List entries) { + Objects.requireNonNull(entries, "entries"); + MxCommandReply reply = invokeCommand(MxCommand.newBuilder() + .setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED_BULK) + .setWriteSecuredBulk(WriteSecuredBulkCommand.newBuilder() + .setServerHandle(serverHandle) + .addAllEntries(entries)) + .build()); + return reply.getWriteSecuredBulk().getResultsList(); + } + + /** + * Bulk {@code WriteSecured2} — sequential timestamped + verified write per entry. + * + *

Per-entry semantics mirror {@link #writeBulk(int, List)}. + * + * @param serverHandle the {@code ServerHandle} owning the items + * @param entries the per-item (handle, value, timestamp, current+verifier user id) tuples + * @return a per-entry {@link BulkWriteResult} list + * @throws MxGatewayException on transport or protocol failure + * @throws NullPointerException if {@code entries} is {@code null} + */ + public List writeSecured2Bulk(int serverHandle, List entries) { + Objects.requireNonNull(entries, "entries"); + MxCommandReply reply = invokeCommand(MxCommand.newBuilder() + .setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED2_BULK) + .setWriteSecured2Bulk(WriteSecured2BulkCommand.newBuilder() + .setServerHandle(serverHandle) + .addAllEntries(entries)) + .build()); + return reply.getWriteSecured2Bulk().getResultsList(); + } + + /** + * Bulk {@code Read} — snapshot the current value of each requested tag. + * + *

MXAccess COM has no synchronous read; the worker returns the cached + * {@code OnDataChange} value for any tag that is already advised + * ({@code wasCached == true}) without modifying the existing subscription, + * and falls back to a full AddItem + Advise + wait + UnAdvise + RemoveItem + * snapshot lifecycle otherwise. The supplied {@code timeout} bounds the + * per-tag wait in the snapshot case; pass {@link Duration#ZERO} (or + * {@code null}) to use the worker default (1000 ms). Per-tag failures + * appear as {@link BulkReadResult} entries with {@code wasSuccessful == false}; + * this method does not throw for per-tag MXAccess failures. + * + * @param serverHandle the {@code ServerHandle} owning the items + * @param tagAddresses the tag addresses to read + * @param timeout per-tag snapshot timeout (zero or null = worker default) + * @return a per-tag {@link BulkReadResult} list + * @throws MxGatewayException on transport or protocol failure + * @throws NullPointerException if {@code tagAddresses} is {@code null} + * @throws IllegalArgumentException if {@code timeout} is negative or exceeds {@link Integer#MAX_VALUE} milliseconds + */ + public List readBulk(int serverHandle, List tagAddresses, Duration timeout) { + Objects.requireNonNull(tagAddresses, "tagAddresses"); + int timeoutMs = 0; + if (timeout != null) { + if (timeout.isNegative()) { + throw new IllegalArgumentException("timeout must be non-negative"); + } + long millis = timeout.toMillis(); + if (millis > Integer.MAX_VALUE) { + throw new IllegalArgumentException("timeout exceeds Integer.MAX_VALUE milliseconds"); + } + timeoutMs = (int) millis; + } + MxCommandReply reply = invokeCommand(MxCommand.newBuilder() + .setKind(MxCommandKind.MX_COMMAND_KIND_READ_BULK) + .setReadBulk(ReadBulkCommand.newBuilder() + .setServerHandle(serverHandle) + .addAllTagAddresses(tagAddresses) + .setTimeoutMs(timeoutMs)) + .build()); + return reply.getReadBulk().getResultsList(); + } + /** * Invokes MXAccess {@code Write}. *