From f220908f3f3b71c9d9db7000ef768e496d8ab118 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 04:06:14 -0400 Subject: [PATCH] Add bulk read/write CLI subcommands and e2e matrix coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added the bulk read/write library surface in every client; this commit makes that surface reachable from each client's CLI and exercises it through scripts/run-client-e2e-tests.ps1. Five new subcommands in every client CLI (.NET / Go / Rust / Python / Java): read-bulk, write-bulk, write2-bulk, write-secured-bulk, and write-secured2-bulk. Each follows the existing subscribe-bulk shape: - read-bulk takes --server-handle, --items , and --timeout-ms (0 = worker default). JSON output carries the BulkReadResult fields, including was_cached so the e2e matrix can verify the cached-path semantics. - The four bulk-write families take --server-handle, --item-handles , --type, --values . write2-bulk and write-secured2-bulk add a single --timestamp applied to every entry; the secured variants take --current-user-id and --verifier-user-id. All four output BulkWriteResult JSON. A new -SkipReadWriteBulk switch on the matrix script (default OFF) controls two new e2e phases: - After the existing subscribe-bulk phase leaves tags advised, the script runs read-bulk against the same tag list and asserts most results return was_cached = true. This is the only e2e coverage of the cache-then-snapshot fork — the unit + gateway tests verify the semantics with a fake worker, but only the live cross-language matrix proves the cache populates from real OnDataChange events and survives the round-trip through every client''s JSON parser. - When -VerifyWrite is set, the write phase now also runs a single- entry write-bulk against the same writable item handle (using a distinct sentinel value) and asserts a per-entry success. Confirms the BulkWriteResult wire format end-to-end without complicating the OnWriteComplete echo assertion the single-item phase already verifies. Dry-run validation passes for all five clients: each emits the correct read-bulk and write-bulk CLI invocations with the right flags. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MxGatewayClientCli.cs | 232 +++++++++++- clients/go/cmd/mxgw-go/main.go | 201 +++++++++++ .../mxgateway/cli/MxGatewayCli.java | 336 ++++++++++++++++++ clients/python/src/mxgateway_cli/commands.py | 198 +++++++++++ clients/rust/crates/mxgw-cli/src/main.rs | 319 ++++++++++++++++- scripts/run-client-e2e-tests.ps1 | 129 ++++++- 6 files changed, 1411 insertions(+), 4 deletions(-) diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index c07aab5..11888e9 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -101,6 +101,16 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) @@ -386,6 +396,220 @@ public static class MxGatewayClientCli cancellationToken); } + private static Task ReadBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + ReadBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0), + }; + command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items"))); + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.ReadBulk, + ReadBulk = command, + }, + cancellationToken); + } + + private static Task WriteBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + int userId = arguments.GetInt32("user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteBulkEntry + { + ItemHandle = handles[i], + Value = values[i], + UserId = userId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteBulk, + WriteBulk = command, + }, + cancellationToken); + } + + private static Task Write2BulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + Write2BulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + MxValue timestampValue = ParseTimestampValue(arguments); + int userId = arguments.GetInt32("user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new Write2BulkEntry + { + ItemHandle = handles[i], + Value = values[i], + TimestampValue = timestampValue, + UserId = userId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.Write2Bulk, + Write2Bulk = command, + }, + cancellationToken); + } + + private static Task WriteSecuredBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteSecuredBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + int currentUserId = arguments.GetInt32("current-user-id"); + int verifierUserId = arguments.GetInt32("verifier-user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteSecuredBulkEntry + { + ItemHandle = handles[i], + Value = values[i], + CurrentUserId = currentUserId, + VerifierUserId = verifierUserId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteSecuredBulk, + WriteSecuredBulk = command, + }, + cancellationToken); + } + + private static Task WriteSecured2BulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteSecured2BulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + MxValue timestampValue = ParseTimestampValue(arguments); + int currentUserId = arguments.GetInt32("current-user-id"); + int verifierUserId = arguments.GetInt32("verifier-user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteSecured2BulkEntry + { + ItemHandle = handles[i], + Value = values[i], + TimestampValue = timestampValue, + CurrentUserId = currentUserId, + VerifierUserId = verifierUserId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteSecured2Bulk, + WriteSecured2Bulk = command, + }, + cancellationToken); + } + + /// + /// Parses the bulk-write CLI's --values list. All entries share + /// the single --type argument; the comma-separated values are + /// each parsed via on a per-entry basis. This + /// keeps the CLI simple for e2e use (one type, N values) — callers + /// that need heterogeneous types per entry should drive the library + /// directly. + /// + private static IReadOnlyList ParseValuesList(CliArguments arguments) + { + string type = arguments.GetRequired("type"); + string[] values = ParseStringList(arguments.GetRequired("values")).ToArray(); + MxValue[] result = new MxValue[values.Length]; + for (int i = 0; i < values.Length; i++) + { + result[i] = ParseValue(type, values[i]); + } + return result; + } + + private static void EnsureSameLength(int handles, int values) + { + if (handles != values) + { + throw new ArgumentException( + $"Bulk write requires the same number of --item-handles ({handles}) and --values ({values})."); + } + } + private static Task WriteAsync( CliArguments arguments, IMxGatewayCliClient client, @@ -772,8 +996,12 @@ public static class MxGatewayClientCli private static MxValue ParseValue(CliArguments arguments) { - string type = arguments.GetRequired("type").ToLowerInvariant(); - string value = arguments.GetRequired("value"); + return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value")); + } + + private static MxValue ParseValue(string typeName, string value) + { + string type = typeName.ToLowerInvariant(); string[] values = value.Split(',', StringSplitOptions.TrimEntries); return type switch diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index 4ca977a..301d072 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -89,6 +89,16 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runSubscribeBulk(ctx, args[1:], stdout, stderr) case "unsubscribe-bulk": return runUnsubscribeBulk(ctx, args[1:], stdout, stderr) + case "read-bulk": + return runReadBulk(ctx, args[1:], stdout, stderr) + case "write-bulk": + return runWriteBulk(ctx, args[1:], stdout, stderr) + case "write2-bulk": + return runWrite2Bulk(ctx, args[1:], stdout, stderr) + case "write-secured-bulk": + return runWriteSecuredBulk(ctx, args[1:], stdout, stderr) + case "write-secured2-bulk": + return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr) case "write": return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": @@ -347,6 +357,167 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err) } +func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + sessionID := flags.String("session-id", "", "gateway session id") + serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") + items := flags.String("items", "", "comma-separated tag addresses") + timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)") + + if err := flags.Parse(args); err != nil { + return err + } + if *sessionID == "" || *items == "" { + return errors.New("session-id and items are required") + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session := mxgateway.NewSessionForID(client, *sessionID) + results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond) + return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err) +} + +func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false) +} + +func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false) +} + +func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true) +} + +func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true) +} + +// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across +// the four bulk-write families. withTimestamp adds a --timestamp-value flag; +// secured switches from --user-id to --current-user-id / --verifier-user-id. +func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error { + flags := flag.NewFlagSet(command, flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + sessionID := flags.String("session-id", "", "gateway session id") + serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") + itemHandles := flags.String("item-handles", "", "comma-separated item handles") + valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string") + values := flags.String("values", "", "comma-separated values (one per item handle)") + userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)") + currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)") + verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)") + timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)") + + if err := flags.Parse(args); err != nil { + return err + } + if *sessionID == "" || *itemHandles == "" || *values == "" { + return errors.New("session-id, item-handles, and values are required") + } + + handles, err := parseInt32List(*itemHandles) + if err != nil { + return err + } + valueTexts := parseStringList(*values) + if len(handles) != len(valueTexts) { + return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts)) + } + + parsedValues := make([]*mxgateway.MxValue, len(handles)) + for i, text := range valueTexts { + v, err := parseValue(*valueType, text) + if err != nil { + return fmt.Errorf("entry %d: %w", i, err) + } + parsedValues[i] = v + } + + var tsValue *mxgateway.MxValue + if withTimestamp { + if *timestampValue == "" { + return errors.New("timestamp-value is required for write2/write-secured2 bulk variants") + } + parsed, err := parseRfc3339Timestamp(*timestampValue) + if err != nil { + return err + } + tsValue = parsed + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session := mxgateway.NewSessionForID(client, *sessionID) + + var results []*mxgateway.BulkWriteResult + switch command { + case "write-bulk": + entries := make([]*mxgateway.WriteBulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)} + } + results, err = session.WriteBulk(ctx, int32(*serverHandle), entries) + case "write2-bulk": + entries := make([]*mxgateway.Write2BulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)} + } + results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries) + case "write-secured-bulk": + entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteSecuredBulkEntry{ + ItemHandle: handles[i], + Value: parsedValues[i], + CurrentUserId: int32(*currentUserID), + VerifierUserId: int32(*verifierUserID), + } + } + results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries) + case "write-secured2-bulk": + entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteSecured2BulkEntry{ + ItemHandle: handles[i], + Value: parsedValues[i], + TimestampValue: tsValue, + CurrentUserId: int32(*currentUserID), + VerifierUserId: int32(*verifierUserID), + } + } + results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries) + default: + return fmt.Errorf("unsupported bulk write command %q", command) + } + _ = secured // currently only used for routing above; reserved for future per-variant validation + return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err) +} + +// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the +// MxValue protobuf representation used for the timestamped write families. +func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) { + t, err := time.Parse(time.RFC3339Nano, text) + if err != nil { + return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err) + } + return mxgateway.TimestampValue(t), nil +} + func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("write", flag.ContinueOnError) flags.SetOutput(stderr) @@ -652,6 +823,36 @@ func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options return nil } +func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error { + if err != nil { + return err + } + if jsonOutput { + return writeJSON(stdout, map[string]any{ + "command": command, + "options": options, + "results": results, + }) + } + fmt.Fprintln(stdout, len(results)) + return nil +} + +func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error { + if err != nil { + return err + } + if jsonOutput { + return writeJSON(stdout, map[string]any{ + "command": command, + "options": options, + "results": results, + }) + } + fmt.Fprintln(stdout, len(results)) + return nil +} + func writeJSON(writer io.Writer, value any) error { encoder := json.NewEncoder(writer) encoder.SetIndent("", " ") diff --git a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java index 778a656..b3d47be 100644 --- a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java @@ -25,12 +25,18 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult; +import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.MxValue; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult; +import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry; +import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry; import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -109,6 +115,11 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("advise", new AdviseCommand(clientFactory)); commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory)); commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory)); + commandLine.addSubcommand("read-bulk", new ReadBulkCommand(clientFactory)); + commandLine.addSubcommand("write-bulk", new WriteBulkCommand(clientFactory)); + commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory)); + commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory)); + commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory)); commandLine.addSubcommand("write", new WriteCommand(clientFactory)); commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory)); commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory)); @@ -518,6 +529,246 @@ public final class MxGatewayCli implements Callable { } } + @Command(name = "read-bulk", description = "Invokes MXAccess ReadBulk (cached or snapshot per tag).") + static final class ReadBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--items", required = true, description = "Comma-separated tag addresses.") + String items; + + @Option(names = "--timeout-ms", defaultValue = "0", + description = "Per-tag snapshot timeout in milliseconds (0 = worker default).") + int timeoutMs; + + ReadBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List results = + client.session(sessionId).readBulk(serverHandle, parseStringList(items), timeoutMs); + writeReadBulkOutput("read-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-bulk", description = "Invokes MXAccess WriteBulk.") + static final class WriteBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.") + int userId; + + WriteBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")"); + } + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteBulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setUserId(userId) + .setValue(parseValue(type, valueTexts.get(i))) + .build()); + } + List results = client.session(sessionId).writeBulk(serverHandle, entries); + writeWriteBulkOutput("write-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write2-bulk", description = "Invokes MXAccess Write2Bulk (timestamped).") + static final class Write2BulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.") + String timestamp; + + @Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.") + int userId; + + Write2BulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")"); + } + MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp)); + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(Write2BulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setUserId(userId) + .setValue(parseValue(type, valueTexts.get(i))) + .setTimestampValue(timestampValue) + .build()); + } + List results = client.session(sessionId).write2Bulk(serverHandle, entries); + writeWriteBulkOutput("write2-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-secured-bulk", description = "Invokes MXAccess WriteSecuredBulk.") + static final class WriteSecuredBulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.") + int currentUserId; + + @Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.") + int verifierUserId; + + WriteSecuredBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")"); + } + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteSecuredBulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setCurrentUserId(currentUserId) + .setVerifierUserId(verifierUserId) + .setValue(parseValue(type, valueTexts.get(i))) + .build()); + } + List results = client.session(sessionId).writeSecuredBulk(serverHandle, entries); + writeWriteBulkOutput("write-secured-bulk", common, json, results); + } + return 0; + } + } + + @Command(name = "write-secured2-bulk", description = "Invokes MXAccess WriteSecured2Bulk.") + static final class WriteSecured2BulkCommand extends GatewayCommand { + @Option(names = "--session-id", required = true, description = "Gateway session id.") + String sessionId; + + @Option(names = "--server-handle", required = true, description = "MXAccess server handle.") + int serverHandle; + + @Option(names = "--item-handles", required = true, description = "Comma-separated item handles.") + String itemHandles; + + @Option(names = "--type", defaultValue = "string", description = "Value type for all entries.") + String type; + + @Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.") + String values; + + @Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.") + String timestamp; + + @Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.") + int currentUserId; + + @Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.") + int verifierUserId; + + WriteSecured2BulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + List handles = parseIntList(itemHandles); + List valueTexts = parseStringList(values); + if (handles.size() != valueTexts.size()) { + throw new IllegalArgumentException( + "item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")"); + } + MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp)); + List entries = new ArrayList<>(handles.size()); + for (int i = 0; i < handles.size(); i++) { + entries.add(WriteSecured2BulkEntry.newBuilder() + .setItemHandle(handles.get(i)) + .setCurrentUserId(currentUserId) + .setVerifierUserId(verifierUserId) + .setValue(parseValue(type, valueTexts.get(i))) + .setTimestampValue(timestampValue) + .build()); + } + List results = client.session(sessionId).writeSecured2Bulk(serverHandle, entries); + writeWriteBulkOutput("write-secured2-bulk", common, json, results); + } + return 0; + } + } + @Command(name = "write", description = "Invokes MXAccess Write.") static final class WriteCommand extends GatewayCommand { @Option(names = "--session-id", required = true, description = "Gateway session id.") @@ -760,6 +1011,16 @@ public final class MxGatewayCli implements Callable { List unsubscribeBulk(int serverHandle, List itemHandles); + List readBulk(int serverHandle, List items, int timeoutMs); + + List writeBulk(int serverHandle, List entries); + + List write2Bulk(int serverHandle, List entries); + + List writeSecuredBulk(int serverHandle, List entries); + + List writeSecured2Bulk(int serverHandle, List entries); + MxEventStream streamEventsAfter(long afterWorkerSequence); } @@ -851,6 +1112,31 @@ public final class MxGatewayCli implements Callable { return session.unsubscribeBulk(serverHandle, itemHandles); } + @Override + public List readBulk(int serverHandle, List items, int timeoutMs) { + return session.readBulk(serverHandle, items, timeoutMs); + } + + @Override + public List writeBulk(int serverHandle, List entries) { + return session.writeBulk(serverHandle, entries); + } + + @Override + public List write2Bulk(int serverHandle, List entries) { + return session.write2Bulk(serverHandle, entries); + } + + @Override + public List writeSecuredBulk(int serverHandle, List entries) { + return session.writeSecuredBulk(serverHandle, entries); + } + + @Override + public List writeSecured2Bulk(int serverHandle, List entries) { + return session.writeSecured2Bulk(serverHandle, entries); + } + @Override public MxEventStream streamEventsAfter(long afterWorkerSequence) { return session.streamEventsAfter(afterWorkerSequence); @@ -899,6 +1185,56 @@ public final class MxGatewayCli implements Callable { return values; } + private static void writeWriteBulkOutput( + String command, CommonOptions common, boolean json, List results) { + PrintWriter out = common.spec.commandLine().getOut(); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", command); + output.put("options", common.redactedJsonMap()); + output.put("results", results.stream().map(MxGatewayCli::bulkWriteResultMap).toList()); + out.println(jsonObject(output)); + return; + } + out.println(results.size()); + } + + private static Map bulkWriteResultMap(BulkWriteResult result) { + Map values = new LinkedHashMap<>(); + values.put("serverHandle", result.getServerHandle()); + values.put("itemHandle", result.getItemHandle()); + values.put("wasSuccessful", result.getWasSuccessful()); + values.put("hresult", result.hasHresult() ? (Object) result.getHresult() : null); + values.put("errorMessage", result.getErrorMessage()); + return values; + } + + private static void writeReadBulkOutput( + String command, CommonOptions common, boolean json, List results) { + PrintWriter out = common.spec.commandLine().getOut(); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", command); + output.put("options", common.redactedJsonMap()); + output.put("results", results.stream().map(MxGatewayCli::bulkReadResultMap).toList()); + out.println(jsonObject(output)); + return; + } + out.println(results.size()); + } + + private static Map bulkReadResultMap(BulkReadResult result) { + Map values = new LinkedHashMap<>(); + values.put("serverHandle", result.getServerHandle()); + values.put("tagAddress", result.getTagAddress()); + values.put("itemHandle", result.getItemHandle()); + values.put("wasSuccessful", result.getWasSuccessful()); + values.put("wasCached", result.getWasCached()); + values.put("quality", result.getQuality()); + values.put("errorMessage", result.getErrorMessage()); + return values; + } + private static MxValue parseValue(String type, String text) { return switch (type) { case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text)); diff --git a/clients/python/src/mxgateway_cli/commands.py b/clients/python/src/mxgateway_cli/commands.py index 0aa89ee..1c53087 100644 --- a/clients/python/src/mxgateway_cli/commands.py +++ b/clients/python/src/mxgateway_cli/commands.py @@ -19,6 +19,7 @@ from mxgateway.errors import MxGatewayError from mxgateway.generated import mxaccess_gateway_pb2 as pb from mxgateway.options import ClientOptions from mxgateway.session import Session +from mxgateway.values import to_mx_value from mxgateway.values import MxValueInput MAX_AGGREGATE_EVENTS = 10_000 @@ -186,6 +187,89 @@ def unsubscribe_bulk(**kwargs: Any) -> None: ) +@main.command("read-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.") +@click.option("--timeout-ms", default=0, type=int, show_default=True, + help="Per-tag snapshot timeout in milliseconds. 0 = worker default.") +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def read_bulk(**kwargs: Any) -> None: + """Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise.""" + + _run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteBulk — sequential Write per entry.""" + + _run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry.""" + + _run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecuredBulk — credential-sensitive.""" + + _run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + +@main.command("write-secured2-bulk") +@gateway_options +@click.option("--session-id", required=True, help="Gateway session id.") +@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.") +@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.") +@click.option("--type", "value_type", default="string", show_default=True) +@click.option("--values", required=True, help="Comma-separated values, one per item handle.") +@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.") +@click.option("--current-user-id", default=0, type=int, show_default=True) +@click.option("--verifier-user-id", default=0, type=int, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def write_secured2_bulk(**kwargs: Any) -> None: + """Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive.""" + + _run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -340,6 +424,120 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]: return {"results": [_message_dict(result) for result in results]} +async def _read_bulk(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.read_bulk( + kwargs["server_handle"], + _parse_string_list(kwargs["items"]), + timeout_ms=kwargs["timeout_ms"], + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +def _build_write_bulk_entries(kwargs: dict[str, Any]): + """Build (item_handle, MxValue) pairs for the bulk-write families. + + The CLI accepts a single ``--type`` plus ``--values`` (comma-separated + string-encoded values, one per ``--item-handles`` entry). Returns the + parsed item-handle list and the per-entry MxValue protobuf instances — + callers wrap these into the appropriate per-entry message type. + """ + + handles = _parse_int_list(kwargs["item_handles"]) + value_texts = _parse_string_list(kwargs["values"]) + if len(handles) != len(value_texts): + raise click.UsageError( + f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})", + ) + parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts] + values = [to_mx_value(v) for v in parsed] + return handles, values + + +async def _write_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.Write2BulkEntry( + item_handle=handle, + user_id=kwargs["user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + entries = [ + pb.WriteSecuredBulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + +async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: + handles, values = _build_write_bulk_entries(kwargs) + timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"])) + entries = [ + pb.WriteSecured2BulkEntry( + item_handle=handle, + current_user_id=kwargs["current_user_id"], + verifier_user_id=kwargs["verifier_user_id"], + value=value, + timestamp_value=timestamp_value, + ) + for handle, value in zip(handles, values) + ] + async with await _connect(kwargs) as client: + session = _session(client, kwargs["session_id"]) + results = await session.write_secured2_bulk( + kwargs["server_handle"], + entries, + correlation_id=kwargs["correlation_id"], + ) + return {"results": [_message_dict(result) for result in results]} + + async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 6ed4052..acc9934 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -18,7 +18,8 @@ use futures_util::StreamExt; use mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use mxgateway_client::generated::mxaccess_gateway::v1::{ CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, - MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, + MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, + WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use mxgateway_client::{ ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection, @@ -127,6 +128,109 @@ enum Command { #[arg(long)] json: bool, }, + /// Snapshot the current value for each requested tag. Cached + /// OnDataChange values are returned for tags that are already advised + /// without touching the existing subscription; otherwise the worker + /// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle. + ReadBulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + session_id: String, + #[arg(long)] + server_handle: i32, + #[arg(long, value_delimiter = ',')] + items: Vec, + /// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms). + #[arg(long, default_value_t = 0)] + timeout_ms: u32, + #[arg(long)] + json: bool, + }, + /// Bulk Write — one MXAccess Write per (item_handle, value) pair. + WriteBulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + session_id: String, + #[arg(long)] + server_handle: i32, + #[arg(long, value_delimiter = ',')] + item_handles: Vec, + #[arg(long, value_enum)] + value_type: CliValueType, + #[arg(long, value_delimiter = ',')] + values: Vec, + #[arg(long, default_value_t = 0)] + user_id: i32, + #[arg(long)] + json: bool, + }, + /// Bulk Write2 — timestamped variant; the timestamp applies to all entries. + Write2Bulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + session_id: String, + #[arg(long)] + server_handle: i32, + #[arg(long, value_delimiter = ',')] + item_handles: Vec, + #[arg(long, value_enum)] + value_type: CliValueType, + #[arg(long, value_delimiter = ',')] + values: Vec, + #[arg(long)] + timestamp: String, + #[arg(long, default_value_t = 0)] + user_id: i32, + #[arg(long)] + json: bool, + }, + /// Bulk WriteSecured. + WriteSecuredBulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + session_id: String, + #[arg(long)] + server_handle: i32, + #[arg(long, value_delimiter = ',')] + item_handles: Vec, + #[arg(long, value_enum)] + value_type: CliValueType, + #[arg(long, value_delimiter = ',')] + values: Vec, + #[arg(long, default_value_t = 0)] + current_user_id: i32, + #[arg(long, default_value_t = 0)] + verifier_user_id: i32, + #[arg(long)] + json: bool, + }, + /// Bulk WriteSecured2 — timestamped + verified. + WriteSecured2Bulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + session_id: String, + #[arg(long)] + server_handle: i32, + #[arg(long, value_delimiter = ',')] + item_handles: Vec, + #[arg(long, value_enum)] + value_type: CliValueType, + #[arg(long, value_delimiter = ',')] + values: Vec, + #[arg(long)] + timestamp: String, + #[arg(long, default_value_t = 0)] + current_user_id: i32, + #[arg(long, default_value_t = 0)] + verifier_user_id: i32, + #[arg(long)] + json: bool, + }, StreamEvents { #[command(flatten)] connection: ConnectionArgs, @@ -429,6 +533,136 @@ async fn run(cli: Cli) -> Result<(), Error> { .await?; print_bulk_results("unsubscribe-bulk", &results, json); } + Command::ReadBulk { + connection, + session_id, + server_handle, + items, + timeout_ms, + json, + } => { + let session = session_for(connection, session_id).await?; + let results = session.read_bulk(server_handle, items, timeout_ms).await?; + print_read_bulk_results("read-bulk", &results, json); + } + Command::WriteBulk { + connection, + session_id, + server_handle, + item_handles, + value_type, + values, + user_id, + json, + } => { + let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; + let session = session_for(connection, session_id).await?; + let results = session + .write_bulk( + server_handle, + entries + .into_iter() + .map(|(item_handle, value)| WriteBulkEntry { + item_handle, + value: Some(value), + user_id, + }) + .collect(), + ) + .await?; + print_write_bulk_results("write-bulk", &results, json); + } + Command::Write2Bulk { + connection, + session_id, + server_handle, + item_handles, + value_type, + values, + timestamp, + user_id, + json, + } => { + let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; + let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); + let session = session_for(connection, session_id).await?; + let results = session + .write2_bulk( + server_handle, + entries + .into_iter() + .map(|(item_handle, value)| Write2BulkEntry { + item_handle, + value: Some(value), + timestamp_value: Some(timestamp_value.clone()), + user_id, + }) + .collect(), + ) + .await?; + print_write_bulk_results("write2-bulk", &results, json); + } + Command::WriteSecuredBulk { + connection, + session_id, + server_handle, + item_handles, + value_type, + values, + current_user_id, + verifier_user_id, + json, + } => { + let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; + let session = session_for(connection, session_id).await?; + let results = session + .write_secured_bulk( + server_handle, + entries + .into_iter() + .map(|(item_handle, value)| WriteSecuredBulkEntry { + item_handle, + value: Some(value), + current_user_id, + verifier_user_id, + }) + .collect(), + ) + .await?; + print_write_bulk_results("write-secured-bulk", &results, json); + } + Command::WriteSecured2Bulk { + connection, + session_id, + server_handle, + item_handles, + value_type, + values, + timestamp, + current_user_id, + verifier_user_id, + json, + } => { + let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; + let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); + let session = session_for(connection, session_id).await?; + let results = session + .write_secured2_bulk( + server_handle, + entries + .into_iter() + .map(|(item_handle, value)| WriteSecured2BulkEntry { + item_handle, + value: Some(value), + timestamp_value: Some(timestamp_value.clone()), + current_user_id, + verifier_user_id, + }) + .collect(), + ) + .await?; + print_write_bulk_results("write-secured2-bulk", &results, json); + } Command::StreamEvents { connection, session_id, @@ -784,6 +1018,89 @@ fn print_bulk_results( } } +fn print_write_bulk_results( + operation: &str, + results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult], + use_json: bool, +) { + if use_json { + let results_json: Vec<_> = results + .iter() + .map(|result| { + json!({ + "serverHandle": result.server_handle, + "itemHandle": result.item_handle, + "wasSuccessful": result.was_successful, + "hresult": result.hresult, + "errorMessage": result.error_message, + }) + }) + .collect(); + println!( + "{}", + json!({ "operation": operation, "results": results_json }) + ); + } else { + println!("{}", results.len()); + } +} + +fn print_read_bulk_results( + operation: &str, + results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult], + use_json: bool, +) { + if use_json { + let results_json: Vec<_> = results + .iter() + .map(|result| { + json!({ + "serverHandle": result.server_handle, + "tagAddress": result.tag_address, + "itemHandle": result.item_handle, + "wasSuccessful": result.was_successful, + "wasCached": result.was_cached, + "quality": result.quality, + "errorMessage": result.error_message, + }) + }) + .collect(); + println!( + "{}", + json!({ "operation": operation, "results": results_json }) + ); + } else { + println!("{}", results.len()); + } +} + +/// Pairs each parsed item handle with its parsed MxValue (proto form) so a +/// single helper can build the four bulk-write families without each branch +/// repeating the length check and per-value parsing. +fn build_write_bulk_entries( + item_handles: &[i32], + value_type: CliValueType, + values: &[String], +) -> Result, Error> { + if item_handles.len() != values.len() { + return Err(Error::InvalidArgument { + name: "values".to_owned(), + detail: format!( + "item-handles count ({}) does not match values count ({})", + item_handles.len(), + values.len() + ), + }); + } + item_handles + .iter() + .zip(values.iter()) + .map(|(handle, value)| { + parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto())) + }) + .collect() +} + fn parse_value(value_type: CliValueType, value: &str) -> Result { let parsed = match value_type { CliValueType::Bool => MxValue::bool(parse_cli_value(value)?), diff --git a/scripts/run-client-e2e-tests.ps1 b/scripts/run-client-e2e-tests.ps1 index a260e75..7c93436 100644 --- a/scripts/run-client-e2e-tests.ps1 +++ b/scripts/run-client-e2e-tests.ps1 @@ -33,6 +33,12 @@ param( [int]$BulkTagCount = 6, [switch]$SkipStream, [switch]$SkipBulk, + # Skip the bulk read+write coverage that runs alongside the existing + # subscribe-bulk phase. The read-bulk phase confirms cached-path + # semantics against tags left advised by subscribe-bulk (was_cached + # = true); the write-bulk phase runs when -VerifyWrite is set and + # exercises the BulkWriteResult shape against the writable tag. + [switch]$SkipReadWriteBulk, # Write round-trip. Opt-in because it mutates live tag state: it writes a # sentinel value to -WriteAttribute and asserts an OnWriteComplete event # confirms the write reached the MXAccess provider. @@ -400,7 +406,18 @@ function Get-BulkResults { return @(Get-PropertyValue -Object $Json -Names @("results")) } - $replyName = if ($Operation -eq "subscribe-bulk") { "subscribeBulk" } else { "unsubscribeBulk" } + # .NET emits the full MxCommandReply via protobuf JSON, with results + # nested under a per-command field name. + $replyName = switch ($Operation) { + "subscribe-bulk" { "subscribeBulk" } + "unsubscribe-bulk" { "unsubscribeBulk" } + "read-bulk" { "readBulk" } + "write-bulk" { "writeBulk" } + "write2-bulk" { "write2Bulk" } + "write-secured-bulk" { "writeSecuredBulk" } + "write-secured2-bulk" { "writeSecured2Bulk" } + default { $Operation } + } $reply = Get-PropertyValue -Object $Json -Names @($replyName) return @(Get-PropertyValue -Object $reply -Names @("results")) } @@ -478,6 +495,13 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "read-bulk") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) + if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } + } elseif ($Operation -eq "write-bulk") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", + "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) + if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { @@ -507,6 +531,13 @@ function Get-ClientCommand { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles) + } elseif ($Operation -eq "read-bulk") { + $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items) + if ($Values.ContainsKey("timeoutMs")) { $arguments += @("-timeout-ms", "$($Values.timeoutMs)") } + } elseif ($Operation -eq "write-bulk") { + $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", + "-item-handles", $Values.itemHandles, "-type", $Values.valueType, "-values", $Values.values) + if ($Values.ContainsKey("userId")) { $arguments += @("-user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value) } elseif ($Operation -eq "stream-events") { @@ -535,6 +566,14 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "read-bulk") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) + if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } + } elseif ($Operation -eq "write-bulk") { + # Rust uses --value-type for the type flag. + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", + "--item-handles", $Values.itemHandles, "--value-type", $Values.valueType, "--values", $Values.values) + if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { # Rust names the type flag --value-type, unlike the other CLIs. $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value) @@ -565,6 +604,13 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "read-bulk") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) + if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } + } elseif ($Operation -eq "write-bulk") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", + "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) + if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { @@ -597,6 +643,13 @@ function Get-ClientCommand { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "read-bulk") { + $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) + if ($Values.ContainsKey("timeoutMs")) { $cliArgs += @("--timeout-ms", "$($Values.timeoutMs)") } + } elseif ($Operation -eq "write-bulk") { + $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", + "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) + if ($Values.ContainsKey("userId")) { $cliArgs += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { @@ -649,6 +702,23 @@ function Get-DryRunReply { }) return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results } } + "read-bulk" { + $results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process { + [pscustomobject]@{ + itemHandle = $index++ + tagAddress = $_ + wasSuccessful = $true + wasCached = $true + } + }) + return [pscustomobject]@{ readBulk = [pscustomobject]@{ results = $results }; results = $results } + } + "write-bulk" { + $results = @($Values.itemHandles -split "," | ForEach-Object { + [pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true } + }) + return [pscustomobject]@{ writeBulk = [pscustomobject]@{ results = $results }; results = $results } + } "stream-events" { # Synthesize an OnDataChange (carrying the written value) and an # OnWriteComplete so the write round-trip assertion passes under @@ -839,6 +909,28 @@ function Invoke-ClientFlow { writeCompleteObserved = $true echoObserved = ($null -ne $echoEvent) } + + # WriteBulk smoke: single-entry batch against the same writable + # tag. Exercises the BulkWriteResult wire format end-to-end + # without complicating the OnWriteComplete echo assertion that + # the single-item write phase already verified above. Pinned + # to a different sentinel value so a subsequent read-bulk + # against the same tag would see the bulk write's effect. + if (-not $SkipReadWriteBulk) { + $bulkSentinel = $sentinelValue + 1 + $writeBulkJson = Invoke-ClientOperation -Client $Client -Operation "write-bulk" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandles = "$writeItemHandle" + valueType = $WriteType + values = "$bulkSentinel" + userId = 0 + } + $writeBulkResults = @(Get-BulkResults -Client $Client -Operation "write-bulk" -Json $writeBulkJson) + Assert-BulkResults -Client $Client -Operation "write-bulk" -Results $writeBulkResults -ExpectedCount 1 + $clientResult.write.writeBulkValue = $bulkSentinel + $clientResult.write.writeBulkResultCount = $writeBulkResults.Count + } } } @@ -857,6 +949,40 @@ function Invoke-ClientFlow { throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)." } + # ReadBulk over the already-advised tags: every result must come + # from the per-session value cache (was_cached = true). Confirms + # the gateway/worker/cache wiring serves cached values for tags + # the caller did not create the subscription for. + $readBulkSummary = $null + if (-not $SkipReadWriteBulk) { + $readBulkJson = Invoke-ClientOperation -Client $Client -Operation "read-bulk" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + items = $bulkItems + timeoutMs = 1500 + } + $readResults = @(Get-BulkResults -Client $Client -Operation "read-bulk" -Json $readBulkJson) + Assert-BulkResults -Client $Client -Operation "read-bulk" -Results $readResults -ExpectedCount $bulkTags.Count + $cachedCount = @($readResults | Where-Object { + [bool](Get-PropertyValue -Object $_ -Names @("wasCached", "was_cached")) + }).Count + # Allow up to one snapshot fallback per batch: a freshly + # advised tag may not have an OnDataChange cached yet if it + # hasn't pushed an update in the small window between + # subscribe-bulk and read-bulk. Anything beyond that means + # the cached-path optimization is broken. + $maxSnapshotFallbacks = 1 + if ($cachedCount -lt ($readResults.Count - $maxSnapshotFallbacks)) { + throw ("$Client read-bulk only returned $cachedCount cached result(s) " + + "out of $($readResults.Count); the cache-then-snapshot fork must " + + "serve cached values for already-advised tags.") + } + $readBulkSummary = [ordered]@{ + tagCount = $readResults.Count + cachedCount = $cachedCount + } + } + $unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{ sessionId = $sessionId serverHandle = $serverHandle @@ -870,6 +996,7 @@ function Invoke-ClientFlow { subscribedCount = $subscribeResults.Count unsubscribedCount = $unsubscribeResults.Count itemHandles = $bulkItemHandles + readBulk = $readBulkSummary } }