Add bulk read/write CLI subcommands and e2e matrix coverage
The previous commit added the bulk read/write library surface in every
client; this commit makes that surface reachable from each client's CLI
and exercises it through scripts/run-client-e2e-tests.ps1.
Five new subcommands in every client CLI (.NET / Go / Rust / Python /
Java): read-bulk, write-bulk, write2-bulk, write-secured-bulk, and
write-secured2-bulk. Each follows the existing subscribe-bulk shape:
- read-bulk takes --server-handle, --items <csv tag list>, and
--timeout-ms (0 = worker default). JSON output carries the
BulkReadResult fields, including was_cached so the e2e matrix can
verify the cached-path semantics.
- The four bulk-write families take --server-handle, --item-handles
<csv>, --type, --values <csv>. write2-bulk and write-secured2-bulk
add a single --timestamp applied to every entry; the secured
variants take --current-user-id and --verifier-user-id. All four
output BulkWriteResult JSON.
A new -SkipReadWriteBulk switch on the matrix script (default OFF)
controls two new e2e phases:
- After the existing subscribe-bulk phase leaves tags advised, the
script runs read-bulk against the same tag list and asserts most
results return was_cached = true. This is the only e2e coverage of
the cache-then-snapshot fork — the unit + gateway tests verify the
semantics with a fake worker, but only the live cross-language
matrix proves the cache populates from real OnDataChange events and
survives the round-trip through every client''s JSON parser.
- When -VerifyWrite is set, the write phase now also runs a single-
entry write-bulk against the same writable item handle (using a
distinct sentinel value) and asserts a per-entry success. Confirms
the BulkWriteResult wire format end-to-end without complicating
the OnWriteComplete echo assertion the single-item phase already
verifies.
Dry-run validation passes for all five clients: each emits the correct
read-bulk and write-bulk CLI invocations with the right flags.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -101,6 +101,16 @@ public static class MxGatewayClientCli
|
|||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
|
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
|
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
|
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
|
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
|
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
@@ -386,6 +396,220 @@ public static class MxGatewayClientCli
|
|||||||
cancellationToken);
|
cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Task<int> ReadBulkAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ReadBulkCommand command = new()
|
||||||
|
{
|
||||||
|
ServerHandle = arguments.GetInt32("server-handle"),
|
||||||
|
TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0),
|
||||||
|
};
|
||||||
|
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
||||||
|
|
||||||
|
return InvokeAndWriteAsync(
|
||||||
|
arguments,
|
||||||
|
client,
|
||||||
|
output,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.ReadBulk,
|
||||||
|
ReadBulk = command,
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<int> WriteBulkAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
WriteBulkCommand command = new()
|
||||||
|
{
|
||||||
|
ServerHandle = arguments.GetInt32("server-handle"),
|
||||||
|
};
|
||||||
|
|
||||||
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||||
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||||
|
int userId = arguments.GetInt32("user-id", 0);
|
||||||
|
EnsureSameLength(handles.Count, values.Count);
|
||||||
|
|
||||||
|
for (int i = 0; i < handles.Count; i++)
|
||||||
|
{
|
||||||
|
command.Entries.Add(new WriteBulkEntry
|
||||||
|
{
|
||||||
|
ItemHandle = handles[i],
|
||||||
|
Value = values[i],
|
||||||
|
UserId = userId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return InvokeAndWriteAsync(
|
||||||
|
arguments,
|
||||||
|
client,
|
||||||
|
output,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.WriteBulk,
|
||||||
|
WriteBulk = command,
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<int> Write2BulkAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
Write2BulkCommand command = new()
|
||||||
|
{
|
||||||
|
ServerHandle = arguments.GetInt32("server-handle"),
|
||||||
|
};
|
||||||
|
|
||||||
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||||
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||||
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
||||||
|
int userId = arguments.GetInt32("user-id", 0);
|
||||||
|
EnsureSameLength(handles.Count, values.Count);
|
||||||
|
|
||||||
|
for (int i = 0; i < handles.Count; i++)
|
||||||
|
{
|
||||||
|
command.Entries.Add(new Write2BulkEntry
|
||||||
|
{
|
||||||
|
ItemHandle = handles[i],
|
||||||
|
Value = values[i],
|
||||||
|
TimestampValue = timestampValue,
|
||||||
|
UserId = userId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return InvokeAndWriteAsync(
|
||||||
|
arguments,
|
||||||
|
client,
|
||||||
|
output,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.Write2Bulk,
|
||||||
|
Write2Bulk = command,
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<int> WriteSecuredBulkAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
WriteSecuredBulkCommand command = new()
|
||||||
|
{
|
||||||
|
ServerHandle = arguments.GetInt32("server-handle"),
|
||||||
|
};
|
||||||
|
|
||||||
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||||
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||||
|
int currentUserId = arguments.GetInt32("current-user-id");
|
||||||
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
||||||
|
EnsureSameLength(handles.Count, values.Count);
|
||||||
|
|
||||||
|
for (int i = 0; i < handles.Count; i++)
|
||||||
|
{
|
||||||
|
command.Entries.Add(new WriteSecuredBulkEntry
|
||||||
|
{
|
||||||
|
ItemHandle = handles[i],
|
||||||
|
Value = values[i],
|
||||||
|
CurrentUserId = currentUserId,
|
||||||
|
VerifierUserId = verifierUserId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return InvokeAndWriteAsync(
|
||||||
|
arguments,
|
||||||
|
client,
|
||||||
|
output,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.WriteSecuredBulk,
|
||||||
|
WriteSecuredBulk = command,
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<int> WriteSecured2BulkAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
WriteSecured2BulkCommand command = new()
|
||||||
|
{
|
||||||
|
ServerHandle = arguments.GetInt32("server-handle"),
|
||||||
|
};
|
||||||
|
|
||||||
|
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||||
|
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||||
|
MxValue timestampValue = ParseTimestampValue(arguments);
|
||||||
|
int currentUserId = arguments.GetInt32("current-user-id");
|
||||||
|
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
||||||
|
EnsureSameLength(handles.Count, values.Count);
|
||||||
|
|
||||||
|
for (int i = 0; i < handles.Count; i++)
|
||||||
|
{
|
||||||
|
command.Entries.Add(new WriteSecured2BulkEntry
|
||||||
|
{
|
||||||
|
ItemHandle = handles[i],
|
||||||
|
Value = values[i],
|
||||||
|
TimestampValue = timestampValue,
|
||||||
|
CurrentUserId = currentUserId,
|
||||||
|
VerifierUserId = verifierUserId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return InvokeAndWriteAsync(
|
||||||
|
arguments,
|
||||||
|
client,
|
||||||
|
output,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.WriteSecured2Bulk,
|
||||||
|
WriteSecured2Bulk = command,
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
|
||||||
|
/// the single <c>--type</c> argument; the comma-separated values are
|
||||||
|
/// each parsed via <see cref="ParseValue"/> on a per-entry basis. This
|
||||||
|
/// keeps the CLI simple for e2e use (one type, N values) — callers
|
||||||
|
/// that need heterogeneous types per entry should drive the library
|
||||||
|
/// directly.
|
||||||
|
/// </summary>
|
||||||
|
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
|
||||||
|
{
|
||||||
|
string type = arguments.GetRequired("type");
|
||||||
|
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
|
||||||
|
MxValue[] result = new MxValue[values.Length];
|
||||||
|
for (int i = 0; i < values.Length; i++)
|
||||||
|
{
|
||||||
|
result[i] = ParseValue(type, values[i]);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void EnsureSameLength(int handles, int values)
|
||||||
|
{
|
||||||
|
if (handles != values)
|
||||||
|
{
|
||||||
|
throw new ArgumentException(
|
||||||
|
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static Task<int> WriteAsync(
|
private static Task<int> WriteAsync(
|
||||||
CliArguments arguments,
|
CliArguments arguments,
|
||||||
IMxGatewayCliClient client,
|
IMxGatewayCliClient client,
|
||||||
@@ -772,8 +996,12 @@ public static class MxGatewayClientCli
|
|||||||
|
|
||||||
private static MxValue ParseValue(CliArguments arguments)
|
private static MxValue ParseValue(CliArguments arguments)
|
||||||
{
|
{
|
||||||
string type = arguments.GetRequired("type").ToLowerInvariant();
|
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
|
||||||
string value = arguments.GetRequired("value");
|
}
|
||||||
|
|
||||||
|
private static MxValue ParseValue(string typeName, string value)
|
||||||
|
{
|
||||||
|
string type = typeName.ToLowerInvariant();
|
||||||
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
|
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
|
||||||
|
|
||||||
return type switch
|
return type switch
|
||||||
|
|||||||
@@ -89,6 +89,16 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
|
|||||||
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
|
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
|
||||||
case "unsubscribe-bulk":
|
case "unsubscribe-bulk":
|
||||||
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
|
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "read-bulk":
|
||||||
|
return runReadBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-bulk":
|
||||||
|
return runWriteBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write2-bulk":
|
||||||
|
return runWrite2Bulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-secured-bulk":
|
||||||
|
return runWriteSecuredBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-secured2-bulk":
|
||||||
|
return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr)
|
||||||
case "write":
|
case "write":
|
||||||
return runWrite(ctx, args[1:], stdout, stderr)
|
return runWrite(ctx, args[1:], stdout, stderr)
|
||||||
case "stream-events":
|
case "stream-events":
|
||||||
@@ -347,6 +357,167 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr
|
|||||||
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
|
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
sessionID := flags.String("session-id", "", "gateway session id")
|
||||||
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
||||||
|
items := flags.String("items", "", "comma-separated tag addresses")
|
||||||
|
timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *sessionID == "" || *items == "" {
|
||||||
|
return errors.New("session-id and items are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
||||||
|
results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond)
|
||||||
|
return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
|
||||||
|
// the four bulk-write families. withTimestamp adds a --timestamp-value flag;
|
||||||
|
// secured switches from --user-id to --current-user-id / --verifier-user-id.
|
||||||
|
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error {
|
||||||
|
flags := flag.NewFlagSet(command, flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
sessionID := flags.String("session-id", "", "gateway session id")
|
||||||
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
||||||
|
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
|
||||||
|
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
|
||||||
|
values := flags.String("values", "", "comma-separated values (one per item handle)")
|
||||||
|
userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
|
||||||
|
currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
|
||||||
|
verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
|
||||||
|
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *sessionID == "" || *itemHandles == "" || *values == "" {
|
||||||
|
return errors.New("session-id, item-handles, and values are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
handles, err := parseInt32List(*itemHandles)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
valueTexts := parseStringList(*values)
|
||||||
|
if len(handles) != len(valueTexts) {
|
||||||
|
return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts))
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedValues := make([]*mxgateway.MxValue, len(handles))
|
||||||
|
for i, text := range valueTexts {
|
||||||
|
v, err := parseValue(*valueType, text)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("entry %d: %w", i, err)
|
||||||
|
}
|
||||||
|
parsedValues[i] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
var tsValue *mxgateway.MxValue
|
||||||
|
if withTimestamp {
|
||||||
|
if *timestampValue == "" {
|
||||||
|
return errors.New("timestamp-value is required for write2/write-secured2 bulk variants")
|
||||||
|
}
|
||||||
|
parsed, err := parseRfc3339Timestamp(*timestampValue)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tsValue = parsed
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
||||||
|
|
||||||
|
var results []*mxgateway.BulkWriteResult
|
||||||
|
switch command {
|
||||||
|
case "write-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteBulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)}
|
||||||
|
}
|
||||||
|
results, err = session.WriteBulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write2-bulk":
|
||||||
|
entries := make([]*mxgateway.Write2BulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)}
|
||||||
|
}
|
||||||
|
results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write-secured-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteSecuredBulkEntry{
|
||||||
|
ItemHandle: handles[i],
|
||||||
|
Value: parsedValues[i],
|
||||||
|
CurrentUserId: int32(*currentUserID),
|
||||||
|
VerifierUserId: int32(*verifierUserID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write-secured2-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteSecured2BulkEntry{
|
||||||
|
ItemHandle: handles[i],
|
||||||
|
Value: parsedValues[i],
|
||||||
|
TimestampValue: tsValue,
|
||||||
|
CurrentUserId: int32(*currentUserID),
|
||||||
|
VerifierUserId: int32(*verifierUserID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported bulk write command %q", command)
|
||||||
|
}
|
||||||
|
_ = secured // currently only used for routing above; reserved for future per-variant validation
|
||||||
|
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the
|
||||||
|
// MxValue protobuf representation used for the timestamped write families.
|
||||||
|
func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) {
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, text)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err)
|
||||||
|
}
|
||||||
|
return mxgateway.TimestampValue(t), nil
|
||||||
|
}
|
||||||
|
|
||||||
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
flags := flag.NewFlagSet("write", flag.ContinueOnError)
|
flags := flag.NewFlagSet("write", flag.ContinueOnError)
|
||||||
flags.SetOutput(stderr)
|
flags.SetOutput(stderr)
|
||||||
@@ -652,6 +823,36 @@ func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if jsonOutput {
|
||||||
|
return writeJSON(stdout, map[string]any{
|
||||||
|
"command": command,
|
||||||
|
"options": options,
|
||||||
|
"results": results,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fmt.Fprintln(stdout, len(results))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if jsonOutput {
|
||||||
|
return writeJSON(stdout, map[string]any{
|
||||||
|
"command": command,
|
||||||
|
"options": options,
|
||||||
|
"results": results,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fmt.Fprintln(stdout, len(results))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func writeJSON(writer io.Writer, value any) error {
|
func writeJSON(writer io.Writer, value any) error {
|
||||||
encoder := json.NewEncoder(writer)
|
encoder := json.NewEncoder(writer)
|
||||||
encoder.SetIndent("", " ")
|
encoder.SetIndent("", " ")
|
||||||
|
|||||||
+336
@@ -25,12 +25,18 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
|
||||||
import picocli.CommandLine;
|
import picocli.CommandLine;
|
||||||
import picocli.CommandLine.Command;
|
import picocli.CommandLine.Command;
|
||||||
import picocli.CommandLine.Mixin;
|
import picocli.CommandLine.Mixin;
|
||||||
@@ -109,6 +115,11 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
|
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
|
||||||
commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory));
|
commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory));
|
||||||
commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory));
|
commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("read-bulk", new ReadBulkCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("write-bulk", new WriteBulkCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory));
|
||||||
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
|
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
|
||||||
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
|
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
|
||||||
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
|
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
|
||||||
@@ -518,6 +529,246 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Command(name = "read-bulk", description = "Invokes MXAccess ReadBulk (cached or snapshot per tag).")
|
||||||
|
static final class ReadBulkCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
|
String sessionId;
|
||||||
|
|
||||||
|
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||||
|
int serverHandle;
|
||||||
|
|
||||||
|
@Option(names = "--items", required = true, description = "Comma-separated tag addresses.")
|
||||||
|
String items;
|
||||||
|
|
||||||
|
@Option(names = "--timeout-ms", defaultValue = "0",
|
||||||
|
description = "Per-tag snapshot timeout in milliseconds (0 = worker default).")
|
||||||
|
int timeoutMs;
|
||||||
|
|
||||||
|
ReadBulkCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
List<BulkReadResult> results =
|
||||||
|
client.session(sessionId).readBulk(serverHandle, parseStringList(items), timeoutMs);
|
||||||
|
writeReadBulkOutput("read-bulk", common, json, results);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Command(name = "write-bulk", description = "Invokes MXAccess WriteBulk.")
|
||||||
|
static final class WriteBulkCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
|
String sessionId;
|
||||||
|
|
||||||
|
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||||
|
int serverHandle;
|
||||||
|
|
||||||
|
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
|
||||||
|
String itemHandles;
|
||||||
|
|
||||||
|
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
|
||||||
|
String type;
|
||||||
|
|
||||||
|
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
|
||||||
|
String values;
|
||||||
|
|
||||||
|
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
|
||||||
|
int userId;
|
||||||
|
|
||||||
|
WriteBulkCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
List<Integer> handles = parseIntList(itemHandles);
|
||||||
|
List<String> valueTexts = parseStringList(values);
|
||||||
|
if (handles.size() != valueTexts.size()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")");
|
||||||
|
}
|
||||||
|
List<WriteBulkEntry> entries = new ArrayList<>(handles.size());
|
||||||
|
for (int i = 0; i < handles.size(); i++) {
|
||||||
|
entries.add(WriteBulkEntry.newBuilder()
|
||||||
|
.setItemHandle(handles.get(i))
|
||||||
|
.setUserId(userId)
|
||||||
|
.setValue(parseValue(type, valueTexts.get(i)))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
List<BulkWriteResult> results = client.session(sessionId).writeBulk(serverHandle, entries);
|
||||||
|
writeWriteBulkOutput("write-bulk", common, json, results);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Command(name = "write2-bulk", description = "Invokes MXAccess Write2Bulk (timestamped).")
|
||||||
|
static final class Write2BulkCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
|
String sessionId;
|
||||||
|
|
||||||
|
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||||
|
int serverHandle;
|
||||||
|
|
||||||
|
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
|
||||||
|
String itemHandles;
|
||||||
|
|
||||||
|
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
|
||||||
|
String type;
|
||||||
|
|
||||||
|
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
|
||||||
|
String values;
|
||||||
|
|
||||||
|
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
|
||||||
|
String timestamp;
|
||||||
|
|
||||||
|
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
|
||||||
|
int userId;
|
||||||
|
|
||||||
|
Write2BulkCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
List<Integer> handles = parseIntList(itemHandles);
|
||||||
|
List<String> valueTexts = parseStringList(values);
|
||||||
|
if (handles.size() != valueTexts.size()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")");
|
||||||
|
}
|
||||||
|
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
|
||||||
|
List<Write2BulkEntry> entries = new ArrayList<>(handles.size());
|
||||||
|
for (int i = 0; i < handles.size(); i++) {
|
||||||
|
entries.add(Write2BulkEntry.newBuilder()
|
||||||
|
.setItemHandle(handles.get(i))
|
||||||
|
.setUserId(userId)
|
||||||
|
.setValue(parseValue(type, valueTexts.get(i)))
|
||||||
|
.setTimestampValue(timestampValue)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
List<BulkWriteResult> results = client.session(sessionId).write2Bulk(serverHandle, entries);
|
||||||
|
writeWriteBulkOutput("write2-bulk", common, json, results);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Command(name = "write-secured-bulk", description = "Invokes MXAccess WriteSecuredBulk.")
|
||||||
|
static final class WriteSecuredBulkCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
|
String sessionId;
|
||||||
|
|
||||||
|
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||||
|
int serverHandle;
|
||||||
|
|
||||||
|
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
|
||||||
|
String itemHandles;
|
||||||
|
|
||||||
|
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
|
||||||
|
String type;
|
||||||
|
|
||||||
|
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
|
||||||
|
String values;
|
||||||
|
|
||||||
|
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
|
||||||
|
int currentUserId;
|
||||||
|
|
||||||
|
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
|
||||||
|
int verifierUserId;
|
||||||
|
|
||||||
|
WriteSecuredBulkCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
List<Integer> handles = parseIntList(itemHandles);
|
||||||
|
List<String> valueTexts = parseStringList(values);
|
||||||
|
if (handles.size() != valueTexts.size()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")");
|
||||||
|
}
|
||||||
|
List<WriteSecuredBulkEntry> entries = new ArrayList<>(handles.size());
|
||||||
|
for (int i = 0; i < handles.size(); i++) {
|
||||||
|
entries.add(WriteSecuredBulkEntry.newBuilder()
|
||||||
|
.setItemHandle(handles.get(i))
|
||||||
|
.setCurrentUserId(currentUserId)
|
||||||
|
.setVerifierUserId(verifierUserId)
|
||||||
|
.setValue(parseValue(type, valueTexts.get(i)))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
List<BulkWriteResult> results = client.session(sessionId).writeSecuredBulk(serverHandle, entries);
|
||||||
|
writeWriteBulkOutput("write-secured-bulk", common, json, results);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Command(name = "write-secured2-bulk", description = "Invokes MXAccess WriteSecured2Bulk.")
|
||||||
|
static final class WriteSecured2BulkCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
|
String sessionId;
|
||||||
|
|
||||||
|
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||||
|
int serverHandle;
|
||||||
|
|
||||||
|
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
|
||||||
|
String itemHandles;
|
||||||
|
|
||||||
|
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
|
||||||
|
String type;
|
||||||
|
|
||||||
|
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
|
||||||
|
String values;
|
||||||
|
|
||||||
|
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
|
||||||
|
String timestamp;
|
||||||
|
|
||||||
|
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
|
||||||
|
int currentUserId;
|
||||||
|
|
||||||
|
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
|
||||||
|
int verifierUserId;
|
||||||
|
|
||||||
|
WriteSecured2BulkCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
List<Integer> handles = parseIntList(itemHandles);
|
||||||
|
List<String> valueTexts = parseStringList(values);
|
||||||
|
if (handles.size() != valueTexts.size()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"item-handles count (" + handles.size() + ") does not match values count (" + valueTexts.size() + ")");
|
||||||
|
}
|
||||||
|
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
|
||||||
|
List<WriteSecured2BulkEntry> entries = new ArrayList<>(handles.size());
|
||||||
|
for (int i = 0; i < handles.size(); i++) {
|
||||||
|
entries.add(WriteSecured2BulkEntry.newBuilder()
|
||||||
|
.setItemHandle(handles.get(i))
|
||||||
|
.setCurrentUserId(currentUserId)
|
||||||
|
.setVerifierUserId(verifierUserId)
|
||||||
|
.setValue(parseValue(type, valueTexts.get(i)))
|
||||||
|
.setTimestampValue(timestampValue)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
List<BulkWriteResult> results = client.session(sessionId).writeSecured2Bulk(serverHandle, entries);
|
||||||
|
writeWriteBulkOutput("write-secured2-bulk", common, json, results);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Command(name = "write", description = "Invokes MXAccess Write.")
|
@Command(name = "write", description = "Invokes MXAccess Write.")
|
||||||
static final class WriteCommand extends GatewayCommand {
|
static final class WriteCommand extends GatewayCommand {
|
||||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||||
@@ -760,6 +1011,16 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
|
|
||||||
List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles);
|
List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles);
|
||||||
|
|
||||||
|
List<BulkReadResult> readBulk(int serverHandle, List<String> items, int timeoutMs);
|
||||||
|
|
||||||
|
List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries);
|
||||||
|
|
||||||
|
List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries);
|
||||||
|
|
||||||
|
List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries);
|
||||||
|
|
||||||
|
List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries);
|
||||||
|
|
||||||
MxEventStream streamEventsAfter(long afterWorkerSequence);
|
MxEventStream streamEventsAfter(long afterWorkerSequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -851,6 +1112,31 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
return session.unsubscribeBulk(serverHandle, itemHandles);
|
return session.unsubscribeBulk(serverHandle, itemHandles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, int timeoutMs) {
|
||||||
|
return session.readBulk(serverHandle, items, timeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
|
||||||
|
return session.writeBulk(serverHandle, entries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
|
||||||
|
return session.write2Bulk(serverHandle, entries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
|
||||||
|
return session.writeSecuredBulk(serverHandle, entries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
|
||||||
|
return session.writeSecured2Bulk(serverHandle, entries);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
|
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
|
||||||
return session.streamEventsAfter(afterWorkerSequence);
|
return session.streamEventsAfter(afterWorkerSequence);
|
||||||
@@ -899,6 +1185,56 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
return values;
|
return values;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void writeWriteBulkOutput(
|
||||||
|
String command, CommonOptions common, boolean json, List<BulkWriteResult> results) {
|
||||||
|
PrintWriter out = common.spec.commandLine().getOut();
|
||||||
|
if (json) {
|
||||||
|
Map<String, Object> output = new LinkedHashMap<>();
|
||||||
|
output.put("command", command);
|
||||||
|
output.put("options", common.redactedJsonMap());
|
||||||
|
output.put("results", results.stream().map(MxGatewayCli::bulkWriteResultMap).toList());
|
||||||
|
out.println(jsonObject(output));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
out.println(results.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> bulkWriteResultMap(BulkWriteResult result) {
|
||||||
|
Map<String, Object> values = new LinkedHashMap<>();
|
||||||
|
values.put("serverHandle", result.getServerHandle());
|
||||||
|
values.put("itemHandle", result.getItemHandle());
|
||||||
|
values.put("wasSuccessful", result.getWasSuccessful());
|
||||||
|
values.put("hresult", result.hasHresult() ? (Object) result.getHresult() : null);
|
||||||
|
values.put("errorMessage", result.getErrorMessage());
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeReadBulkOutput(
|
||||||
|
String command, CommonOptions common, boolean json, List<BulkReadResult> results) {
|
||||||
|
PrintWriter out = common.spec.commandLine().getOut();
|
||||||
|
if (json) {
|
||||||
|
Map<String, Object> output = new LinkedHashMap<>();
|
||||||
|
output.put("command", command);
|
||||||
|
output.put("options", common.redactedJsonMap());
|
||||||
|
output.put("results", results.stream().map(MxGatewayCli::bulkReadResultMap).toList());
|
||||||
|
out.println(jsonObject(output));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
out.println(results.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> bulkReadResultMap(BulkReadResult result) {
|
||||||
|
Map<String, Object> values = new LinkedHashMap<>();
|
||||||
|
values.put("serverHandle", result.getServerHandle());
|
||||||
|
values.put("tagAddress", result.getTagAddress());
|
||||||
|
values.put("itemHandle", result.getItemHandle());
|
||||||
|
values.put("wasSuccessful", result.getWasSuccessful());
|
||||||
|
values.put("wasCached", result.getWasCached());
|
||||||
|
values.put("quality", result.getQuality());
|
||||||
|
values.put("errorMessage", result.getErrorMessage());
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
private static MxValue parseValue(String type, String text) {
|
private static MxValue parseValue(String type, String text) {
|
||||||
return switch (type) {
|
return switch (type) {
|
||||||
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
|
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from mxgateway.errors import MxGatewayError
|
|||||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||||
from mxgateway.options import ClientOptions
|
from mxgateway.options import ClientOptions
|
||||||
from mxgateway.session import Session
|
from mxgateway.session import Session
|
||||||
|
from mxgateway.values import to_mx_value
|
||||||
from mxgateway.values import MxValueInput
|
from mxgateway.values import MxValueInput
|
||||||
|
|
||||||
MAX_AGGREGATE_EVENTS = 10_000
|
MAX_AGGREGATE_EVENTS = 10_000
|
||||||
@@ -186,6 +187,89 @@ def unsubscribe_bulk(**kwargs: Any) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("read-bulk")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
|
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||||
|
@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.")
|
||||||
|
@click.option("--timeout-ms", default=0, type=int, show_default=True,
|
||||||
|
help="Per-tag snapshot timeout in milliseconds. 0 = worker default.")
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def read_bulk(**kwargs: Any) -> None:
|
||||||
|
"""Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise."""
|
||||||
|
|
||||||
|
_run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("write-bulk")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
|
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||||
|
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
|
||||||
|
@click.option("--type", "value_type", default="string", show_default=True)
|
||||||
|
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
|
||||||
|
@click.option("--user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def write_bulk(**kwargs: Any) -> None:
|
||||||
|
"""Invoke MXAccess WriteBulk — sequential Write per entry."""
|
||||||
|
|
||||||
|
_run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("write2-bulk")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
|
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||||
|
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
|
||||||
|
@click.option("--type", "value_type", default="string", show_default=True)
|
||||||
|
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
|
||||||
|
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
|
||||||
|
@click.option("--user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def write2_bulk(**kwargs: Any) -> None:
|
||||||
|
"""Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry."""
|
||||||
|
|
||||||
|
_run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("write-secured-bulk")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
|
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||||
|
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
|
||||||
|
@click.option("--type", "value_type", default="string", show_default=True)
|
||||||
|
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
|
||||||
|
@click.option("--current-user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def write_secured_bulk(**kwargs: Any) -> None:
|
||||||
|
"""Invoke MXAccess WriteSecuredBulk — credential-sensitive."""
|
||||||
|
|
||||||
|
_run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("write-secured2-bulk")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
|
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||||
|
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
|
||||||
|
@click.option("--type", "value_type", default="string", show_default=True)
|
||||||
|
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
|
||||||
|
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
|
||||||
|
@click.option("--current-user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def write_secured2_bulk(**kwargs: Any) -> None:
|
||||||
|
"""Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive."""
|
||||||
|
|
||||||
|
_run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||||
|
|
||||||
|
|
||||||
@main.command("stream-events")
|
@main.command("stream-events")
|
||||||
@gateway_options
|
@gateway_options
|
||||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
@@ -340,6 +424,120 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]:
|
|||||||
return {"results": [_message_dict(result) for result in results]}
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _read_bulk(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
session = _session(client, kwargs["session_id"])
|
||||||
|
results = await session.read_bulk(
|
||||||
|
kwargs["server_handle"],
|
||||||
|
_parse_string_list(kwargs["items"]),
|
||||||
|
timeout_ms=kwargs["timeout_ms"],
|
||||||
|
correlation_id=kwargs["correlation_id"],
|
||||||
|
)
|
||||||
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
|
def _build_write_bulk_entries(kwargs: dict[str, Any]):
|
||||||
|
"""Build (item_handle, MxValue) pairs for the bulk-write families.
|
||||||
|
|
||||||
|
The CLI accepts a single ``--type`` plus ``--values`` (comma-separated
|
||||||
|
string-encoded values, one per ``--item-handles`` entry). Returns the
|
||||||
|
parsed item-handle list and the per-entry MxValue protobuf instances —
|
||||||
|
callers wrap these into the appropriate per-entry message type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
handles = _parse_int_list(kwargs["item_handles"])
|
||||||
|
value_texts = _parse_string_list(kwargs["values"])
|
||||||
|
if len(handles) != len(value_texts):
|
||||||
|
raise click.UsageError(
|
||||||
|
f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})",
|
||||||
|
)
|
||||||
|
parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts]
|
||||||
|
values = [to_mx_value(v) for v in parsed]
|
||||||
|
return handles, values
|
||||||
|
|
||||||
|
|
||||||
|
async def _write_bulk(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
handles, values = _build_write_bulk_entries(kwargs)
|
||||||
|
entries = [
|
||||||
|
pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value)
|
||||||
|
for handle, value in zip(handles, values)
|
||||||
|
]
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
session = _session(client, kwargs["session_id"])
|
||||||
|
results = await session.write_bulk(
|
||||||
|
kwargs["server_handle"],
|
||||||
|
entries,
|
||||||
|
correlation_id=kwargs["correlation_id"],
|
||||||
|
)
|
||||||
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _write2_bulk(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
handles, values = _build_write_bulk_entries(kwargs)
|
||||||
|
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
|
||||||
|
entries = [
|
||||||
|
pb.Write2BulkEntry(
|
||||||
|
item_handle=handle,
|
||||||
|
user_id=kwargs["user_id"],
|
||||||
|
value=value,
|
||||||
|
timestamp_value=timestamp_value,
|
||||||
|
)
|
||||||
|
for handle, value in zip(handles, values)
|
||||||
|
]
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
session = _session(client, kwargs["session_id"])
|
||||||
|
results = await session.write2_bulk(
|
||||||
|
kwargs["server_handle"],
|
||||||
|
entries,
|
||||||
|
correlation_id=kwargs["correlation_id"],
|
||||||
|
)
|
||||||
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
handles, values = _build_write_bulk_entries(kwargs)
|
||||||
|
entries = [
|
||||||
|
pb.WriteSecuredBulkEntry(
|
||||||
|
item_handle=handle,
|
||||||
|
current_user_id=kwargs["current_user_id"],
|
||||||
|
verifier_user_id=kwargs["verifier_user_id"],
|
||||||
|
value=value,
|
||||||
|
)
|
||||||
|
for handle, value in zip(handles, values)
|
||||||
|
]
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
session = _session(client, kwargs["session_id"])
|
||||||
|
results = await session.write_secured_bulk(
|
||||||
|
kwargs["server_handle"],
|
||||||
|
entries,
|
||||||
|
correlation_id=kwargs["correlation_id"],
|
||||||
|
)
|
||||||
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
handles, values = _build_write_bulk_entries(kwargs)
|
||||||
|
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
|
||||||
|
entries = [
|
||||||
|
pb.WriteSecured2BulkEntry(
|
||||||
|
item_handle=handle,
|
||||||
|
current_user_id=kwargs["current_user_id"],
|
||||||
|
verifier_user_id=kwargs["verifier_user_id"],
|
||||||
|
value=value,
|
||||||
|
timestamp_value=timestamp_value,
|
||||||
|
)
|
||||||
|
for handle, value in zip(handles, values)
|
||||||
|
]
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
session = _session(client, kwargs["session_id"])
|
||||||
|
results = await session.write_secured2_bulk(
|
||||||
|
kwargs["server_handle"],
|
||||||
|
entries,
|
||||||
|
correlation_id=kwargs["correlation_id"],
|
||||||
|
)
|
||||||
|
return {"results": [_message_dict(result) for result in results]}
|
||||||
|
|
||||||
|
|
||||||
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
|
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
|
||||||
async with await _connect(kwargs) as client:
|
async with await _connect(kwargs) as client:
|
||||||
session = _session(client, kwargs["session_id"])
|
session = _session(client, kwargs["session_id"])
|
||||||
|
|||||||
@@ -18,7 +18,8 @@ use futures_util::StreamExt;
|
|||||||
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
||||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||||
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
|
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
|
||||||
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest,
|
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry,
|
||||||
|
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
|
||||||
};
|
};
|
||||||
use mxgateway_client::{
|
use mxgateway_client::{
|
||||||
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection,
|
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection,
|
||||||
@@ -127,6 +128,109 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
json: bool,
|
json: bool,
|
||||||
},
|
},
|
||||||
|
/// Snapshot the current value for each requested tag. Cached
|
||||||
|
/// OnDataChange values are returned for tags that are already advised
|
||||||
|
/// without touching the existing subscription; otherwise the worker
|
||||||
|
/// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle.
|
||||||
|
ReadBulk {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
session_id: String,
|
||||||
|
#[arg(long)]
|
||||||
|
server_handle: i32,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
items: Vec<String>,
|
||||||
|
/// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms).
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
timeout_ms: u32,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
|
/// Bulk Write — one MXAccess Write per (item_handle, value) pair.
|
||||||
|
WriteBulk {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
session_id: String,
|
||||||
|
#[arg(long)]
|
||||||
|
server_handle: i32,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
item_handles: Vec<i32>,
|
||||||
|
#[arg(long, value_enum)]
|
||||||
|
value_type: CliValueType,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
values: Vec<String>,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
user_id: i32,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
|
/// Bulk Write2 — timestamped variant; the timestamp applies to all entries.
|
||||||
|
Write2Bulk {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
session_id: String,
|
||||||
|
#[arg(long)]
|
||||||
|
server_handle: i32,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
item_handles: Vec<i32>,
|
||||||
|
#[arg(long, value_enum)]
|
||||||
|
value_type: CliValueType,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
values: Vec<String>,
|
||||||
|
#[arg(long)]
|
||||||
|
timestamp: String,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
user_id: i32,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
|
/// Bulk WriteSecured.
|
||||||
|
WriteSecuredBulk {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
session_id: String,
|
||||||
|
#[arg(long)]
|
||||||
|
server_handle: i32,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
item_handles: Vec<i32>,
|
||||||
|
#[arg(long, value_enum)]
|
||||||
|
value_type: CliValueType,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
values: Vec<String>,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
current_user_id: i32,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
verifier_user_id: i32,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
|
/// Bulk WriteSecured2 — timestamped + verified.
|
||||||
|
WriteSecured2Bulk {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
session_id: String,
|
||||||
|
#[arg(long)]
|
||||||
|
server_handle: i32,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
item_handles: Vec<i32>,
|
||||||
|
#[arg(long, value_enum)]
|
||||||
|
value_type: CliValueType,
|
||||||
|
#[arg(long, value_delimiter = ',')]
|
||||||
|
values: Vec<String>,
|
||||||
|
#[arg(long)]
|
||||||
|
timestamp: String,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
current_user_id: i32,
|
||||||
|
#[arg(long, default_value_t = 0)]
|
||||||
|
verifier_user_id: i32,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
StreamEvents {
|
StreamEvents {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
connection: ConnectionArgs,
|
connection: ConnectionArgs,
|
||||||
@@ -429,6 +533,136 @@ async fn run(cli: Cli) -> Result<(), Error> {
|
|||||||
.await?;
|
.await?;
|
||||||
print_bulk_results("unsubscribe-bulk", &results, json);
|
print_bulk_results("unsubscribe-bulk", &results, json);
|
||||||
}
|
}
|
||||||
|
Command::ReadBulk {
|
||||||
|
connection,
|
||||||
|
session_id,
|
||||||
|
server_handle,
|
||||||
|
items,
|
||||||
|
timeout_ms,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let session = session_for(connection, session_id).await?;
|
||||||
|
let results = session.read_bulk(server_handle, items, timeout_ms).await?;
|
||||||
|
print_read_bulk_results("read-bulk", &results, json);
|
||||||
|
}
|
||||||
|
Command::WriteBulk {
|
||||||
|
connection,
|
||||||
|
session_id,
|
||||||
|
server_handle,
|
||||||
|
item_handles,
|
||||||
|
value_type,
|
||||||
|
values,
|
||||||
|
user_id,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
|
||||||
|
let session = session_for(connection, session_id).await?;
|
||||||
|
let results = session
|
||||||
|
.write_bulk(
|
||||||
|
server_handle,
|
||||||
|
entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|(item_handle, value)| WriteBulkEntry {
|
||||||
|
item_handle,
|
||||||
|
value: Some(value),
|
||||||
|
user_id,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
print_write_bulk_results("write-bulk", &results, json);
|
||||||
|
}
|
||||||
|
Command::Write2Bulk {
|
||||||
|
connection,
|
||||||
|
session_id,
|
||||||
|
server_handle,
|
||||||
|
item_handles,
|
||||||
|
value_type,
|
||||||
|
values,
|
||||||
|
timestamp,
|
||||||
|
user_id,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
|
||||||
|
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
|
||||||
|
let session = session_for(connection, session_id).await?;
|
||||||
|
let results = session
|
||||||
|
.write2_bulk(
|
||||||
|
server_handle,
|
||||||
|
entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|(item_handle, value)| Write2BulkEntry {
|
||||||
|
item_handle,
|
||||||
|
value: Some(value),
|
||||||
|
timestamp_value: Some(timestamp_value.clone()),
|
||||||
|
user_id,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
print_write_bulk_results("write2-bulk", &results, json);
|
||||||
|
}
|
||||||
|
Command::WriteSecuredBulk {
|
||||||
|
connection,
|
||||||
|
session_id,
|
||||||
|
server_handle,
|
||||||
|
item_handles,
|
||||||
|
value_type,
|
||||||
|
values,
|
||||||
|
current_user_id,
|
||||||
|
verifier_user_id,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
|
||||||
|
let session = session_for(connection, session_id).await?;
|
||||||
|
let results = session
|
||||||
|
.write_secured_bulk(
|
||||||
|
server_handle,
|
||||||
|
entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|(item_handle, value)| WriteSecuredBulkEntry {
|
||||||
|
item_handle,
|
||||||
|
value: Some(value),
|
||||||
|
current_user_id,
|
||||||
|
verifier_user_id,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
print_write_bulk_results("write-secured-bulk", &results, json);
|
||||||
|
}
|
||||||
|
Command::WriteSecured2Bulk {
|
||||||
|
connection,
|
||||||
|
session_id,
|
||||||
|
server_handle,
|
||||||
|
item_handles,
|
||||||
|
value_type,
|
||||||
|
values,
|
||||||
|
timestamp,
|
||||||
|
current_user_id,
|
||||||
|
verifier_user_id,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
|
||||||
|
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
|
||||||
|
let session = session_for(connection, session_id).await?;
|
||||||
|
let results = session
|
||||||
|
.write_secured2_bulk(
|
||||||
|
server_handle,
|
||||||
|
entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|(item_handle, value)| WriteSecured2BulkEntry {
|
||||||
|
item_handle,
|
||||||
|
value: Some(value),
|
||||||
|
timestamp_value: Some(timestamp_value.clone()),
|
||||||
|
current_user_id,
|
||||||
|
verifier_user_id,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
print_write_bulk_results("write-secured2-bulk", &results, json);
|
||||||
|
}
|
||||||
Command::StreamEvents {
|
Command::StreamEvents {
|
||||||
connection,
|
connection,
|
||||||
session_id,
|
session_id,
|
||||||
@@ -784,6 +1018,89 @@ fn print_bulk_results(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn print_write_bulk_results(
|
||||||
|
operation: &str,
|
||||||
|
results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult],
|
||||||
|
use_json: bool,
|
||||||
|
) {
|
||||||
|
if use_json {
|
||||||
|
let results_json: Vec<_> = results
|
||||||
|
.iter()
|
||||||
|
.map(|result| {
|
||||||
|
json!({
|
||||||
|
"serverHandle": result.server_handle,
|
||||||
|
"itemHandle": result.item_handle,
|
||||||
|
"wasSuccessful": result.was_successful,
|
||||||
|
"hresult": result.hresult,
|
||||||
|
"errorMessage": result.error_message,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({ "operation": operation, "results": results_json })
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("{}", results.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_read_bulk_results(
|
||||||
|
operation: &str,
|
||||||
|
results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult],
|
||||||
|
use_json: bool,
|
||||||
|
) {
|
||||||
|
if use_json {
|
||||||
|
let results_json: Vec<_> = results
|
||||||
|
.iter()
|
||||||
|
.map(|result| {
|
||||||
|
json!({
|
||||||
|
"serverHandle": result.server_handle,
|
||||||
|
"tagAddress": result.tag_address,
|
||||||
|
"itemHandle": result.item_handle,
|
||||||
|
"wasSuccessful": result.was_successful,
|
||||||
|
"wasCached": result.was_cached,
|
||||||
|
"quality": result.quality,
|
||||||
|
"errorMessage": result.error_message,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({ "operation": operation, "results": results_json })
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("{}", results.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pairs each parsed item handle with its parsed MxValue (proto form) so a
|
||||||
|
/// single helper can build the four bulk-write families without each branch
|
||||||
|
/// repeating the length check and per-value parsing.
|
||||||
|
fn build_write_bulk_entries(
|
||||||
|
item_handles: &[i32],
|
||||||
|
value_type: CliValueType,
|
||||||
|
values: &[String],
|
||||||
|
) -> Result<Vec<(i32, mxgateway_client::generated::mxaccess_gateway::v1::MxValue)>, Error> {
|
||||||
|
if item_handles.len() != values.len() {
|
||||||
|
return Err(Error::InvalidArgument {
|
||||||
|
name: "values".to_owned(),
|
||||||
|
detail: format!(
|
||||||
|
"item-handles count ({}) does not match values count ({})",
|
||||||
|
item_handles.len(),
|
||||||
|
values.len()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
item_handles
|
||||||
|
.iter()
|
||||||
|
.zip(values.iter())
|
||||||
|
.map(|(handle, value)| {
|
||||||
|
parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto()))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error> {
|
fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error> {
|
||||||
let parsed = match value_type {
|
let parsed = match value_type {
|
||||||
CliValueType::Bool => MxValue::bool(parse_cli_value(value)?),
|
CliValueType::Bool => MxValue::bool(parse_cli_value(value)?),
|
||||||
|
|||||||
@@ -33,6 +33,12 @@ param(
|
|||||||
[int]$BulkTagCount = 6,
|
[int]$BulkTagCount = 6,
|
||||||
[switch]$SkipStream,
|
[switch]$SkipStream,
|
||||||
[switch]$SkipBulk,
|
[switch]$SkipBulk,
|
||||||
|
# Skip the bulk read+write coverage that runs alongside the existing
|
||||||
|
# subscribe-bulk phase. The read-bulk phase confirms cached-path
|
||||||
|
# semantics against tags left advised by subscribe-bulk (was_cached
|
||||||
|
# = true); the write-bulk phase runs when -VerifyWrite is set and
|
||||||
|
# exercises the BulkWriteResult shape against the writable tag.
|
||||||
|
[switch]$SkipReadWriteBulk,
|
||||||
# Write round-trip. Opt-in because it mutates live tag state: it writes a
|
# Write round-trip. Opt-in because it mutates live tag state: it writes a
|
||||||
# sentinel value to -WriteAttribute and asserts an OnWriteComplete event
|
# sentinel value to -WriteAttribute and asserts an OnWriteComplete event
|
||||||
# confirms the write reached the MXAccess provider.
|
# confirms the write reached the MXAccess provider.
|
||||||
@@ -400,7 +406,18 @@ function Get-BulkResults {
|
|||||||
return @(Get-PropertyValue -Object $Json -Names @("results"))
|
return @(Get-PropertyValue -Object $Json -Names @("results"))
|
||||||
}
|
}
|
||||||
|
|
||||||
$replyName = if ($Operation -eq "subscribe-bulk") { "subscribeBulk" } else { "unsubscribeBulk" }
|
# .NET emits the full MxCommandReply via protobuf JSON, with results
|
||||||
|
# nested under a per-command field name.
|
||||||
|
$replyName = switch ($Operation) {
|
||||||
|
"subscribe-bulk" { "subscribeBulk" }
|
||||||
|
"unsubscribe-bulk" { "unsubscribeBulk" }
|
||||||
|
"read-bulk" { "readBulk" }
|
||||||
|
"write-bulk" { "writeBulk" }
|
||||||
|
"write2-bulk" { "write2Bulk" }
|
||||||
|
"write-secured-bulk" { "writeSecuredBulk" }
|
||||||
|
"write-secured2-bulk" { "writeSecured2Bulk" }
|
||||||
|
default { $Operation }
|
||||||
|
}
|
||||||
$reply = Get-PropertyValue -Object $Json -Names @($replyName)
|
$reply = Get-PropertyValue -Object $Json -Names @($replyName)
|
||||||
return @(Get-PropertyValue -Object $reply -Names @("results"))
|
return @(Get-PropertyValue -Object $reply -Names @("results"))
|
||||||
}
|
}
|
||||||
@@ -478,6 +495,13 @@ function Get-ClientCommand {
|
|||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
} elseif ($Operation -eq "unsubscribe-bulk") {
|
} elseif ($Operation -eq "unsubscribe-bulk") {
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
||||||
|
} elseif ($Operation -eq "read-bulk") {
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
|
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
|
||||||
|
} elseif ($Operation -eq "write-bulk") {
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
|
||||||
|
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
|
||||||
|
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
|
||||||
} elseif ($Operation -eq "write") {
|
} elseif ($Operation -eq "write") {
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
||||||
} elseif ($Operation -eq "stream-events") {
|
} elseif ($Operation -eq "stream-events") {
|
||||||
@@ -507,6 +531,13 @@ function Get-ClientCommand {
|
|||||||
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
|
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
|
||||||
} elseif ($Operation -eq "unsubscribe-bulk") {
|
} elseif ($Operation -eq "unsubscribe-bulk") {
|
||||||
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles)
|
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles)
|
||||||
|
} elseif ($Operation -eq "read-bulk") {
|
||||||
|
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
|
||||||
|
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("-timeout-ms", "$($Values.timeoutMs)") }
|
||||||
|
} elseif ($Operation -eq "write-bulk") {
|
||||||
|
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)",
|
||||||
|
"-item-handles", $Values.itemHandles, "-type", $Values.valueType, "-values", $Values.values)
|
||||||
|
if ($Values.ContainsKey("userId")) { $arguments += @("-user-id", "$($Values.userId)") }
|
||||||
} elseif ($Operation -eq "write") {
|
} elseif ($Operation -eq "write") {
|
||||||
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
|
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
|
||||||
} elseif ($Operation -eq "stream-events") {
|
} elseif ($Operation -eq "stream-events") {
|
||||||
@@ -535,6 +566,14 @@ function Get-ClientCommand {
|
|||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
} elseif ($Operation -eq "unsubscribe-bulk") {
|
} elseif ($Operation -eq "unsubscribe-bulk") {
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
||||||
|
} elseif ($Operation -eq "read-bulk") {
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
|
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
|
||||||
|
} elseif ($Operation -eq "write-bulk") {
|
||||||
|
# Rust uses --value-type for the type flag.
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
|
||||||
|
"--item-handles", $Values.itemHandles, "--value-type", $Values.valueType, "--values", $Values.values)
|
||||||
|
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
|
||||||
} elseif ($Operation -eq "write") {
|
} elseif ($Operation -eq "write") {
|
||||||
# Rust names the type flag --value-type, unlike the other CLIs.
|
# Rust names the type flag --value-type, unlike the other CLIs.
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
|
||||||
@@ -565,6 +604,13 @@ function Get-ClientCommand {
|
|||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
} elseif ($Operation -eq "unsubscribe-bulk") {
|
} elseif ($Operation -eq "unsubscribe-bulk") {
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
||||||
|
} elseif ($Operation -eq "read-bulk") {
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
|
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
|
||||||
|
} elseif ($Operation -eq "write-bulk") {
|
||||||
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
|
||||||
|
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
|
||||||
|
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
|
||||||
} elseif ($Operation -eq "write") {
|
} elseif ($Operation -eq "write") {
|
||||||
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
||||||
} elseif ($Operation -eq "stream-events") {
|
} elseif ($Operation -eq "stream-events") {
|
||||||
@@ -597,6 +643,13 @@ function Get-ClientCommand {
|
|||||||
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
} elseif ($Operation -eq "unsubscribe-bulk") {
|
} elseif ($Operation -eq "unsubscribe-bulk") {
|
||||||
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
|
||||||
|
} elseif ($Operation -eq "read-bulk") {
|
||||||
|
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
|
||||||
|
if ($Values.ContainsKey("timeoutMs")) { $cliArgs += @("--timeout-ms", "$($Values.timeoutMs)") }
|
||||||
|
} elseif ($Operation -eq "write-bulk") {
|
||||||
|
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
|
||||||
|
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
|
||||||
|
if ($Values.ContainsKey("userId")) { $cliArgs += @("--user-id", "$($Values.userId)") }
|
||||||
} elseif ($Operation -eq "write") {
|
} elseif ($Operation -eq "write") {
|
||||||
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
|
||||||
} elseif ($Operation -eq "stream-events") {
|
} elseif ($Operation -eq "stream-events") {
|
||||||
@@ -649,6 +702,23 @@ function Get-DryRunReply {
|
|||||||
})
|
})
|
||||||
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
|
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
|
||||||
}
|
}
|
||||||
|
"read-bulk" {
|
||||||
|
$results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process {
|
||||||
|
[pscustomobject]@{
|
||||||
|
itemHandle = $index++
|
||||||
|
tagAddress = $_
|
||||||
|
wasSuccessful = $true
|
||||||
|
wasCached = $true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return [pscustomobject]@{ readBulk = [pscustomobject]@{ results = $results }; results = $results }
|
||||||
|
}
|
||||||
|
"write-bulk" {
|
||||||
|
$results = @($Values.itemHandles -split "," | ForEach-Object {
|
||||||
|
[pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true }
|
||||||
|
})
|
||||||
|
return [pscustomobject]@{ writeBulk = [pscustomobject]@{ results = $results }; results = $results }
|
||||||
|
}
|
||||||
"stream-events" {
|
"stream-events" {
|
||||||
# Synthesize an OnDataChange (carrying the written value) and an
|
# Synthesize an OnDataChange (carrying the written value) and an
|
||||||
# OnWriteComplete so the write round-trip assertion passes under
|
# OnWriteComplete so the write round-trip assertion passes under
|
||||||
@@ -839,6 +909,28 @@ function Invoke-ClientFlow {
|
|||||||
writeCompleteObserved = $true
|
writeCompleteObserved = $true
|
||||||
echoObserved = ($null -ne $echoEvent)
|
echoObserved = ($null -ne $echoEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# WriteBulk smoke: single-entry batch against the same writable
|
||||||
|
# tag. Exercises the BulkWriteResult wire format end-to-end
|
||||||
|
# without complicating the OnWriteComplete echo assertion that
|
||||||
|
# the single-item write phase already verified above. Pinned
|
||||||
|
# to a different sentinel value so a subsequent read-bulk
|
||||||
|
# against the same tag would see the bulk write's effect.
|
||||||
|
if (-not $SkipReadWriteBulk) {
|
||||||
|
$bulkSentinel = $sentinelValue + 1
|
||||||
|
$writeBulkJson = Invoke-ClientOperation -Client $Client -Operation "write-bulk" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
serverHandle = $serverHandle
|
||||||
|
itemHandles = "$writeItemHandle"
|
||||||
|
valueType = $WriteType
|
||||||
|
values = "$bulkSentinel"
|
||||||
|
userId = 0
|
||||||
|
}
|
||||||
|
$writeBulkResults = @(Get-BulkResults -Client $Client -Operation "write-bulk" -Json $writeBulkJson)
|
||||||
|
Assert-BulkResults -Client $Client -Operation "write-bulk" -Results $writeBulkResults -ExpectedCount 1
|
||||||
|
$clientResult.write.writeBulkValue = $bulkSentinel
|
||||||
|
$clientResult.write.writeBulkResultCount = $writeBulkResults.Count
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -857,6 +949,40 @@ function Invoke-ClientFlow {
|
|||||||
throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)."
|
throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)."
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# ReadBulk over the already-advised tags: every result must come
|
||||||
|
# from the per-session value cache (was_cached = true). Confirms
|
||||||
|
# the gateway/worker/cache wiring serves cached values for tags
|
||||||
|
# the caller did not create the subscription for.
|
||||||
|
$readBulkSummary = $null
|
||||||
|
if (-not $SkipReadWriteBulk) {
|
||||||
|
$readBulkJson = Invoke-ClientOperation -Client $Client -Operation "read-bulk" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
serverHandle = $serverHandle
|
||||||
|
items = $bulkItems
|
||||||
|
timeoutMs = 1500
|
||||||
|
}
|
||||||
|
$readResults = @(Get-BulkResults -Client $Client -Operation "read-bulk" -Json $readBulkJson)
|
||||||
|
Assert-BulkResults -Client $Client -Operation "read-bulk" -Results $readResults -ExpectedCount $bulkTags.Count
|
||||||
|
$cachedCount = @($readResults | Where-Object {
|
||||||
|
[bool](Get-PropertyValue -Object $_ -Names @("wasCached", "was_cached"))
|
||||||
|
}).Count
|
||||||
|
# Allow up to one snapshot fallback per batch: a freshly
|
||||||
|
# advised tag may not have an OnDataChange cached yet if it
|
||||||
|
# hasn't pushed an update in the small window between
|
||||||
|
# subscribe-bulk and read-bulk. Anything beyond that means
|
||||||
|
# the cached-path optimization is broken.
|
||||||
|
$maxSnapshotFallbacks = 1
|
||||||
|
if ($cachedCount -lt ($readResults.Count - $maxSnapshotFallbacks)) {
|
||||||
|
throw ("$Client read-bulk only returned $cachedCount cached result(s) " +
|
||||||
|
"out of $($readResults.Count); the cache-then-snapshot fork must " +
|
||||||
|
"serve cached values for already-advised tags.")
|
||||||
|
}
|
||||||
|
$readBulkSummary = [ordered]@{
|
||||||
|
tagCount = $readResults.Count
|
||||||
|
cachedCount = $cachedCount
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{
|
$unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{
|
||||||
sessionId = $sessionId
|
sessionId = $sessionId
|
||||||
serverHandle = $serverHandle
|
serverHandle = $serverHandle
|
||||||
@@ -870,6 +996,7 @@ function Invoke-ClientFlow {
|
|||||||
subscribedCount = $subscribeResults.Count
|
subscribedCount = $subscribeResults.Count
|
||||||
unsubscribedCount = $unsubscribeResults.Count
|
unsubscribedCount = $unsubscribeResults.Count
|
||||||
itemHandles = $bulkItemHandles
|
itemHandles = $bulkItemHandles
|
||||||
|
readBulk = $readBulkSummary
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user