Java client: port bulk read/write SDK methods + CLI subcommands

Final language in the bulk-CLI port wave. HEAD's MxGatewaySession had
only the subscribe-style bulks; this commit adds the value-bulks plus
matching picocli subcommands and a bench-read-bulk harness.

SDK (MxGatewaySession.java):
- List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries)
- List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries)
- List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries)
- List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries)
- List<BulkReadResult> readBulk(int serverHandle, List<String> tagAddresses, Duration timeout)

readBulk uses java.time.Duration for the timeout parameter (idiomatic
Java) and internally converts to the timeoutMs proto field;
Duration.ZERO / null both delegate to the worker default. Per-entry
secured user ids stay on each WriteSecured(2)BulkEntry to match the
proto's per-row shape.

CLI (MxGatewayCli.java):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk /
  write-secured2-bulk as picocli @Command subcommands. Write families
  share value-parsing logic; gating of --current-user-id /
  --verifier-user-id / --timestamp matches the cross-language flag
  contract.
- bench-read-bulk: --iterations / --warmup loop with avg/min/max ms
  reporting plus a --json mode that emits the cross-language bench
  JSON schema.

A small fixture in MxGatewayCliTests.FakeSession adds stub
implementations of the five new interface methods so the test module
compiles.

Verification: gradle build BUILD SUCCESSFUL (4 tasks executed, all
tests pass); gradle :zb-mom-ww-mxgateway-cli:installDist BUILD
SUCCESSFUL. Manual smoke against live gateway on localhost:5120:
open-session → register → read-bulk cold (wasCached=false both tags)
→ subscribe-bulk → read-bulk warm (wasCached=true both tags) →
write-bulk int32 111,222 (both wasSuccessful=true) → write2-bulk
timestamped (both wasSuccessful=true) → write-secured-bulk and
write-secured2-bulk return per-entry MXAccess "Value does not fall
within the expected range" failures with the configured user/verifier
ids (0,0) — confirming the SDK does NOT throw on per-entry MXAccess
failures and surfaces them through BulkWriteResult exactly as the
.NET and Go ports do → bench-read-bulk iterations=20 avg=9.5 ms
last_success=2/2 cached=2/2 → close-session SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 04:50:34 -04:00
parent 6add4b4acc
commit f90bff01db
3 changed files with 672 additions and 0 deletions
@@ -29,12 +29,18 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
@@ -113,6 +119,12 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory));
commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory));
commandLine.addSubcommand("read-bulk", new ReadBulkCommand(clientFactory));
commandLine.addSubcommand("write-bulk", new WriteBulkCommand(clientFactory));
commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory));
commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory));
commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory));
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
@@ -603,6 +615,359 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
@Command(name = "read-bulk", description = "Invokes MXAccess ReadBulk (cached or snapshot per tag).")
static final class ReadBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--items", required = true, description = "Comma-separated tag addresses.")
String items;
@Option(
names = "--timeout-ms",
defaultValue = "0",
description = "Per-tag snapshot timeout in milliseconds (0 = worker default).")
int timeoutMs;
ReadBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<BulkReadResult> results = client.session(sessionId)
.readBulk(serverHandle, parseStringList(items), Duration.ofMillis(timeoutMs));
writeReadBulkOutput("read-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-bulk", description = "Invokes MXAccess WriteBulk.")
static final class WriteBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
int userId;
WriteBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
List<WriteBulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteBulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setUserId(userId)
.setValue(parseValue(type, valueTexts.get(i)))
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeBulk(serverHandle, entries);
writeWriteBulkOutput("write-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write2-bulk", description = "Invokes MXAccess Write2Bulk (timestamped).")
static final class Write2BulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
String timestamp;
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
int userId;
Write2BulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
List<Write2BulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(Write2BulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setUserId(userId)
.setValue(parseValue(type, valueTexts.get(i)))
.setTimestampValue(timestampValue)
.build());
}
List<BulkWriteResult> results = client.session(sessionId).write2Bulk(serverHandle, entries);
writeWriteBulkOutput("write2-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-secured-bulk", description = "Invokes MXAccess WriteSecuredBulk.")
static final class WriteSecuredBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
int currentUserId;
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
int verifierUserId;
WriteSecuredBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
List<WriteSecuredBulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteSecuredBulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setCurrentUserId(currentUserId)
.setVerifierUserId(verifierUserId)
.setValue(parseValue(type, valueTexts.get(i)))
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeSecuredBulk(serverHandle, entries);
writeWriteBulkOutput("write-secured-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-secured2-bulk", description = "Invokes MXAccess WriteSecured2Bulk.")
static final class WriteSecured2BulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
String timestamp;
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
int currentUserId;
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
int verifierUserId;
WriteSecured2BulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
List<WriteSecured2BulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteSecured2BulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setCurrentUserId(currentUserId)
.setVerifierUserId(verifierUserId)
.setValue(parseValue(type, valueTexts.get(i)))
.setTimestampValue(timestampValue)
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeSecured2Bulk(serverHandle, entries);
writeWriteBulkOutput("write-secured2-bulk", common, json, results);
}
return 0;
}
}
@Command(
name = "bench-read-bulk",
description = "Repeatedly invokes ReadBulk for benchmarking; prints aggregate timing.")
static final class BenchReadBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--items", required = true, description = "Comma-separated tag addresses.")
String items;
@Option(
names = "--timeout-ms",
defaultValue = "0",
description = "Per-tag snapshot timeout in milliseconds (0 = worker default).")
int timeoutMs;
@Option(names = "--iterations", defaultValue = "10", description = "Number of ReadBulk calls to perform.")
int iterations;
@Option(
names = "--warmup",
defaultValue = "1",
description = "Number of warmup iterations excluded from timing.")
int warmup;
BenchReadBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
if (iterations <= 0) {
throw new IllegalArgumentException("--iterations must be positive");
}
if (warmup < 0) {
throw new IllegalArgumentException("--warmup must be non-negative");
}
List<String> tagAddresses = parseStringList(items);
Duration timeout = Duration.ofMillis(timeoutMs);
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxGatewayCliSession session = client.session(sessionId);
for (int i = 0; i < warmup; i++) {
session.readBulk(serverHandle, tagAddresses, timeout);
}
long totalNanos = 0L;
long minNanos = Long.MAX_VALUE;
long maxNanos = 0L;
int lastResultCount = 0;
int lastSuccessCount = 0;
int lastCachedCount = 0;
for (int i = 0; i < iterations; i++) {
long start = System.nanoTime();
List<BulkReadResult> results = session.readBulk(serverHandle, tagAddresses, timeout);
long elapsed = System.nanoTime() - start;
totalNanos += elapsed;
minNanos = Math.min(minNanos, elapsed);
maxNanos = Math.max(maxNanos, elapsed);
lastResultCount = results.size();
lastSuccessCount = 0;
lastCachedCount = 0;
for (BulkReadResult result : results) {
if (result.getWasSuccessful()) {
lastSuccessCount++;
}
if (result.getWasCached()) {
lastCachedCount++;
}
}
}
double avgMs = totalNanos / 1_000_000.0 / iterations;
double minMs = minNanos / 1_000_000.0;
double maxMs = maxNanos / 1_000_000.0;
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", "bench-read-bulk");
output.put("options", common.redactedJsonMap());
output.put("iterations", iterations);
output.put("warmup", warmup);
output.put("tagCount", tagAddresses.size());
output.put("resultCount", lastResultCount);
output.put("successCount", lastSuccessCount);
output.put("cachedCount", lastCachedCount);
output.put("avgMs", avgMs);
output.put("minMs", minMs);
output.put("maxMs", maxMs);
out.println(jsonObject(output));
} else {
out.printf(
"iterations=%d tags=%d avg=%.3fms min=%.3fms max=%.3fms last_results=%d last_success=%d last_cached=%d%n",
iterations,
tagAddresses.size(),
avgMs,
minMs,
maxMs,
lastResultCount,
lastSuccessCount,
lastCachedCount);
}
}
return 0;
}
}
@Command(name = "write", description = "Invokes MXAccess Write.")
static final class WriteCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
@@ -818,6 +1183,16 @@ public final class MxGatewayCli implements Callable<Integer> {
List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles);
List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout);
List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries);
List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries);
List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries);
List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries);
MxEventStream streamEventsAfter(long afterWorkerSequence);
}
@@ -909,6 +1284,31 @@ public final class MxGatewayCli implements Callable<Integer> {
return session.unsubscribeBulk(serverHandle, itemHandles);
}
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout) {
return session.readBulk(serverHandle, items, timeout);
}
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
return session.writeBulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
return session.write2Bulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
return session.writeSecuredBulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
return session.writeSecured2Bulk(serverHandle, entries);
}
@Override
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
return session.streamEventsAfter(afterWorkerSequence);
@@ -957,6 +1357,56 @@ public final class MxGatewayCli implements Callable<Integer> {
return values;
}
private static void writeWriteBulkOutput(
String command, CommonOptions common, boolean json, List<BulkWriteResult> results) {
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", command);
output.put("options", common.redactedJsonMap());
output.put("results", results.stream().map(MxGatewayCli::bulkWriteResultMap).toList());
out.println(jsonObject(output));
return;
}
out.println(results.size());
}
private static Map<String, Object> bulkWriteResultMap(BulkWriteResult result) {
Map<String, Object> values = new LinkedHashMap<>();
values.put("serverHandle", result.getServerHandle());
values.put("itemHandle", result.getItemHandle());
values.put("wasSuccessful", result.getWasSuccessful());
values.put("hresult", result.hasHresult() ? (Object) result.getHresult() : null);
values.put("errorMessage", result.getErrorMessage());
return values;
}
private static void writeReadBulkOutput(
String command, CommonOptions common, boolean json, List<BulkReadResult> results) {
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", command);
output.put("options", common.redactedJsonMap());
output.put("results", results.stream().map(MxGatewayCli::bulkReadResultMap).toList());
out.println(jsonObject(output));
return;
}
out.println(results.size());
}
private static Map<String, Object> bulkReadResultMap(BulkReadResult result) {
Map<String, Object> values = new LinkedHashMap<>();
values.put("serverHandle", result.getServerHandle());
values.put("tagAddress", result.getTagAddress());
values.put("itemHandle", result.getItemHandle());
values.put("wasSuccessful", result.getWasSuccessful());
values.put("wasCached", result.getWasCached());
values.put("quality", result.getQuality());
values.put("errorMessage", result.getErrorMessage());
return values;
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
@@ -9,9 +9,12 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
@@ -25,6 +28,10 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@@ -339,6 +346,73 @@ final class MxGatewayCliTests {
return results;
}
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout) {
List<BulkReadResult> results = new ArrayList<>();
for (int index = 0; index < items.size(); index++) {
results.add(BulkReadResult.newBuilder()
.setServerHandle(serverHandle)
.setTagAddress(items.get(index))
.setItemHandle(200 + index)
.setWasSuccessful(true)
.setWasCached(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (Write2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecuredBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecured2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public com.zb.mom.ww.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) {
throw new UnsupportedOperationException("stream-events is covered by client tests");
@@ -1,6 +1,7 @@
package com.zb.mom.ww.mxgateway.client;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;
@@ -9,6 +10,8 @@ import mxaccess_gateway.v1.MxaccessGateway.AddItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommand;
@@ -17,6 +20,7 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.ReadBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemCommand;
@@ -27,8 +31,16 @@ import mxaccess_gateway.v1.MxaccessGateway.UnAdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnAdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnsubscribeBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.Write2Command;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
/**
* Typed handle for a single MXAccess gateway session.
@@ -421,6 +433,142 @@ public final class MxGatewaySession implements AutoCloseable {
return reply.getUnsubscribeBulk().getResultsList();
}
/**
* Bulk {@code Write} — sequential MXAccess Write per entry on the worker's STA.
*
* <p>Per-entry failures appear as {@link BulkWriteResult} entries with
* {@code wasSuccessful == false}; this method does not throw for per-entry
* MXAccess failures (it still throws {@link MxGatewayException} on transport
* or protocol-level failures).
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_BULK)
.setWriteBulk(WriteBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteBulk().getResultsList();
}
/**
* Bulk {@code Write2} — sequential MXAccess Write2 (timestamped) per entry.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, timestamp, user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE2_BULK)
.setWrite2Bulk(Write2BulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWrite2Bulk().getResultsList();
}
/**
* Bulk {@code WriteSecured} — credential-sensitive values must not be logged
* by callers; mirrors the single-item write-secured redaction contract.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, current+verifier user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED_BULK)
.setWriteSecuredBulk(WriteSecuredBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteSecuredBulk().getResultsList();
}
/**
* Bulk {@code WriteSecured2} — sequential timestamped + verified write per entry.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, timestamp, current+verifier user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED2_BULK)
.setWriteSecured2Bulk(WriteSecured2BulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteSecured2Bulk().getResultsList();
}
/**
* Bulk {@code Read} — snapshot the current value of each requested tag.
*
* <p>MXAccess COM has no synchronous read; the worker returns the cached
* {@code OnDataChange} value for any tag that is already advised
* ({@code wasCached == true}) without modifying the existing subscription,
* and falls back to a full AddItem + Advise + wait + UnAdvise + RemoveItem
* snapshot lifecycle otherwise. The supplied {@code timeout} bounds the
* per-tag wait in the snapshot case; pass {@link Duration#ZERO} (or
* {@code null}) to use the worker default (1000 ms). Per-tag failures
* appear as {@link BulkReadResult} entries with {@code wasSuccessful == false};
* this method does not throw for per-tag MXAccess failures.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param tagAddresses the tag addresses to read
* @param timeout per-tag snapshot timeout (zero or null = worker default)
* @return a per-tag {@link BulkReadResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code tagAddresses} is {@code null}
* @throws IllegalArgumentException if {@code timeout} is negative or exceeds {@link Integer#MAX_VALUE} milliseconds
*/
public List<BulkReadResult> readBulk(int serverHandle, List<String> tagAddresses, Duration timeout) {
Objects.requireNonNull(tagAddresses, "tagAddresses");
int timeoutMs = 0;
if (timeout != null) {
if (timeout.isNegative()) {
throw new IllegalArgumentException("timeout must be non-negative");
}
long millis = timeout.toMillis();
if (millis > Integer.MAX_VALUE) {
throw new IllegalArgumentException("timeout exceeds Integer.MAX_VALUE milliseconds");
}
timeoutMs = (int) millis;
}
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_READ_BULK)
.setReadBulk(ReadBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllTagAddresses(tagAddresses)
.setTimeoutMs(timeoutMs))
.build());
return reply.getReadBulk().getResultsList();
}
/**
* Invokes MXAccess {@code Write}.
*