Compare commits

...

5 Commits

Author SHA1 Message Date
Joseph Doherty f90bff01db Java client: port bulk read/write SDK methods + CLI subcommands
Final language in the bulk-CLI port wave. HEAD's MxGatewaySession had
only the subscribe-style bulks; this commit adds the value-bulks plus
matching picocli subcommands and a bench-read-bulk harness.

SDK (MxGatewaySession.java):
- List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries)
- List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries)
- List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries)
- List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries)
- List<BulkReadResult> readBulk(int serverHandle, List<String> tagAddresses, Duration timeout)

readBulk uses java.time.Duration for the timeout parameter (idiomatic
Java) and internally converts to the timeoutMs proto field;
Duration.ZERO / null both delegate to the worker default. Per-entry
secured user ids stay on each WriteSecured(2)BulkEntry to match the
proto's per-row shape.

CLI (MxGatewayCli.java):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk /
  write-secured2-bulk as picocli @Command subcommands. Write families
  share value-parsing logic; gating of --current-user-id /
  --verifier-user-id / --timestamp matches the cross-language flag
  contract.
- bench-read-bulk: --iterations / --warmup loop with avg/min/max ms
  reporting plus a --json mode that emits the cross-language bench
  JSON schema.

A small fixture in MxGatewayCliTests.FakeSession adds stub
implementations of the five new interface methods so the test module
compiles.

Verification: gradle build BUILD SUCCESSFUL (4 tasks executed, all
tests pass); gradle :zb-mom-ww-mxgateway-cli:installDist BUILD
SUCCESSFUL. Manual smoke against live gateway on localhost:5120:
open-session → register → read-bulk cold (wasCached=false both tags)
→ subscribe-bulk → read-bulk warm (wasCached=true both tags) →
write-bulk int32 111,222 (both wasSuccessful=true) → write2-bulk
timestamped (both wasSuccessful=true) → write-secured-bulk and
write-secured2-bulk return per-entry MXAccess "Value does not fall
within the expected range" failures with the configured user/verifier
ids (0,0) — confirming the SDK does NOT throw on per-entry MXAccess
failures and surfaces them through BulkWriteResult exactly as the
.NET and Go ports do → bench-read-bulk iterations=20 avg=9.5 ms
last_success=2/2 cached=2/2 → close-session SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:50:34 -04:00
Joseph Doherty 6add4b4acc Python client: port bulk read/write SDK methods + CLI subcommands
Mirrors the .NET / Go ports of divergent branch commit f220908. HEAD's
Session class had only the subscribe-style bulks; this commit adds the
value-bulk SDK surface plus matching CLI subcommands and a
bench-read-bulk harness.

SDK (zb_mom_ww_mxgateway/session.py):
- async def write_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write2_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write_secured_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def write_secured2_bulk(server_handle, entries, *, correlation_id="")
  → list[pb.BulkWriteResult]
- async def read_bulk(server_handle, tag_addresses, *, timeout_ms=0,
  correlation_id="") → list[pb.BulkReadResult]

All five reuse the existing _ensure_bulk_size validator and route
through the existing invoke() pipeline. read_bulk additionally enforces
timeout_ms >= 0.

CLI (zb_mom_ww_mxgateway_cli/commands.py):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk /
  write-secured2-bulk registered as click @main.command(...). The
  write families share a _build_write_bulk_entries() helper that parses
  --item-handles and --values with a single --type, validates count
  match, converts via to_mx_value, and assembles the correct per-entry
  proto message.
- bench-read-bulk: opens its own session, subscribes to --bulk-size
  TestMachine_NNN.TestChangingInt tags, runs warmup then steady-state
  ReadBulk for --duration-seconds with time.perf_counter() latency
  capture, and emits the shared JSON schema (language, durationMs,
  totalCalls, successfulCalls, failedCalls, totalReadResults,
  cachedReadResults, callsPerSecond, latencyMs:{p50,p95,p99,max,mean})
  so scripts/bench-read-bulk.ps1 collates Python alongside the four
  other clients. _percentile_summary + linear-interpolation
  _percentile helper match the Go / .NET implementations.

to_mx_value is added to the existing values-module import line in
commands.py since the bulk-write commands need it.

Verification: python -m pip install -e . --quiet --no-deps; pytest
42/42 passing. Manual smoke against live gateway on localhost:5120:
open-session → register → subscribe-bulk on two
TestMachine_NNN.TestChangingInt tags (both wasSuccessful=true) →
read-bulk (both wasSuccessful=true / wasCached=true / int32 values
present) → close-session SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:50:10 -04:00
Joseph Doherty 325106920f Rust client: port BenchReadBulk subcommand + session.rs tightening
The bulk-write/read SDK methods (read_bulk, write_bulk, write2_bulk,
write_secured_bulk, write_secured2_bulk) and the matching clap
subcommands (ReadBulk, WriteBulk, Write2Bulk, WriteSecuredBulk,
WriteSecured2Bulk) were already on HEAD from a prior session — they
were the only bulk family that HEAD shipped before the .NET / Go /
Python / Java parallel ports. The one missing piece from the divergent
branch (commit f220908) was the BenchReadBulk benchmark harness.

mxgw-cli/src/main.rs adds:
- BenchReadBulk clap variant with flags --client-name,
  --duration-seconds, --warmup-seconds, --bulk-size, --tag-start,
  --tag-prefix, --tag-attribute, --timeout-ms, --json — defaults match
  the .NET and Go benches.
- run_bench_read_bulk(): open-session → register → subscribe_bulk on
  the synthesized TestMachine_NNN.TestChangingInt tags to populate the
  worker value cache → warmup → steady-state loop with per-call
  std::time::Instant capture → unsubscribe → close-session.
- BenchStats + LatencySummary structs and a percentile()
  helper (nearest-rank with linear interpolation, matching the Go and
  .NET implementations) so the cross-language JSON output is byte-for-
  byte comparable. JSON schema: language / command / endpoint /
  clientName / bulkSize / durationSeconds / warmupSeconds / durationMs
  / tags / totalCalls / successfulCalls / failedCalls /
  totalReadResults / cachedReadResults / callsPerSecond /
  latencyMs:{p50,p95,p99,max,mean}. scripts/bench-read-bulk.ps1 will
  pick up the Rust line on its next run.

session.rs picks up minor tightening tied to the bulk SDK methods that
were already in the file (per-entry validation paths, BulkReplyKind
dispatch coverage) — no public-surface change.

Verification: cargo build --workspace clean (the 2 pre-existing
options.rs missing_docs warnings remain — out of scope); cargo test
--workspace 34/34 passing; cargo clippy --workspace --all-targets has
only the 3 pre-existing tolerated warnings (enum_variant_names on
BulkReplyKind, missing_docs on options.rs, clone_on_copy on
galaxy.rs:282). Manual smoke against live gateway on localhost:5120:
read-bulk on two TestMachine tags returned wasCached=true,
wasSuccessful=true; bench-read-bulk --duration-seconds 2
--warmup-seconds 1 --bulk-size 2 --json ran 363 calls / 181.35 calls
per second / p50=5.3 ms / p99=7.8 ms / 726 of 726 cached reads, all
emitting valid JSON in the shared bench schema.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:50:09 -04:00
Joseph Doherty 8aaab82287 Go client: port bulk read/write SDK methods + CLI subcommands
Mirrors the .NET addition: HEAD's session.go had only the subscribe-style
bulks (AddItemBulk / AdviseItemBulk / RemoveItemBulk / UnAdviseItemBulk /
SubscribeBulk / UnsubscribeBulk). This commit ports the value-bulk SDK
surface and CLI subcommands from divergent branch commit f220908.

SDK (clients/go/mxgateway/session.go):
- WriteBulk(ctx, serverHandle int32, entries []*WriteBulkEntry)
- Write2Bulk(ctx, ..., entries []*Write2BulkEntry)
- WriteSecuredBulk(ctx, ..., entries []*WriteSecuredBulkEntry)
- WriteSecured2Bulk(ctx, ..., entries []*WriteSecured2BulkEntry)
- ReadBulk(ctx, serverHandle int32, tagAddresses []string, timeout time.Duration)
  → []*BulkReadResult

types.go gains public re-exports of the generated proto types
(WriteBulkCommand, WriteBulkEntry, Write2BulkCommand, Write2BulkEntry,
WriteSecuredBulkCommand, WriteSecuredBulkEntry, WriteSecured2BulkCommand,
WriteSecured2BulkEntry, ReadBulkCommand, BulkWriteReply, BulkWriteResult,
BulkReadReply, BulkReadResult) so external callers can construct entries
through the public `mxgateway` package without dipping into the internal
generated path.

CLI (clients/go/cmd/mxgw-go/main.go):
- read-bulk, write-bulk, write2-bulk, write-secured-bulk,
  write-secured2-bulk routed through runWithIO. write families share a
  runWriteBulkVariant helper that gates per-variant flags
  (--current-user-id, --verifier-user-id, --timestamp) so the
  Client.Go-015 flag-gating contract is preserved.
- bench-read-bulk: percentile + timing helpers; JSON output schema
  identical to the .NET / Rust / Python / Java benches.

parseInt32List was changed from panic-on-error to ([]int32, error) so
the new write-bulk commands surface parse errors gracefully; the
existing runUnsubscribeBulk caller is updated accordingly.

Verification: go build ./... + go vet ./... + go test ./... all clean.
Manual smoke against live gateway on localhost:5120: open-session →
register → subscribe-bulk on 3 TestMachine_NNN.TestChangingInt tags
(all wasSuccessful=true) → read-bulk (all wasSuccessful=true /
wasCached=true) → write-bulk int32 100/200/300 (all wasSuccessful=true)
→ close-session SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:49:33 -04:00
Joseph Doherty b3ae200b11 .NET client: port bulk read/write SDK methods + CLI subcommands
Adds the value-bulk SDK surface and CLI subcommands that lived on the
divergent branch (commit f220908) but were never merged into main.
HEAD's MxGatewaySession only had the subscribe-style bulks (AddItem /
Advise / Remove / UnAdvise / Subscribe / Unsubscribe). The proto
contract already defined ReadBulkCommand / WriteBulkCommand /
Write2BulkCommand / WriteSecuredBulkCommand / WriteSecured2BulkCommand
/ BulkReadReply / BulkWriteReply, so this is purely a client-side
addition.

SDK (MxGatewaySession.cs):
- WriteBulkAsync(serverHandle, IReadOnlyList<WriteBulkEntry>, ct)
- Write2BulkAsync(serverHandle, IReadOnlyList<Write2BulkEntry>, ct)
- WriteSecuredBulkAsync(serverHandle, IReadOnlyList<WriteSecuredBulkEntry>, ct)
- WriteSecured2BulkAsync(serverHandle, IReadOnlyList<WriteSecured2BulkEntry>, ct)
- ReadBulkAsync(serverHandle, IReadOnlyList<string> tagAddresses, TimeSpan timeout, ct)

Per-entry secured user ids live on each WriteSecured(2)BulkEntry — they
are NOT lifted to ctor args because the proto field shape allows distinct
ids per row.

CLI (MxGatewayClientCli.cs):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk / write-secured2-bulk
  routed through the existing dispatch table, with --type, --values,
  --item-handles, --timeout-ms, --current-user-id, --verifier-user-id,
  --timestamp flags matching the cross-language CLI surface.
- bench-read-bulk benchmark harness: warmup + steady-state ReadBulk loop
  with p50/p95/p99/max/mean latency, emitting the shared JSON schema so
  scripts/bench-read-bulk.ps1 collates the .NET line alongside the four
  other clients.

The new subcommands flow through the existing batch dispatcher without
further changes.

Verification: dotnet build clean (0 warnings / 0 errors);
dotnet test 59/59 passing. Manual smoke against the live gateway
on localhost:5120: read-bulk returned 2 BulkReadResult entries with
wasSuccessful=true, wasCached=true; write-bulk on int32 returned
wasSuccessful=true; close-session returned SESSION_STATE_CLOSED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:49:33 -04:00
12 changed files with 3164 additions and 17 deletions
@@ -114,6 +114,18 @@ public static class MxGatewayClientCli
.ConfigureAwait(false),
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
@@ -458,6 +470,451 @@ public static class MxGatewayClientCli
cancellationToken);
}
private static Task<int> ReadBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
ReadBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0),
};
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.ReadBulk,
ReadBulk = command,
},
cancellationToken);
}
private static Task<int> WriteBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
int userId = arguments.GetInt32("user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteBulkEntry
{
ItemHandle = handles[i],
Value = values[i],
UserId = userId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteBulk,
WriteBulk = command,
},
cancellationToken);
}
private static Task<int> Write2BulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
Write2BulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
MxValue timestampValue = ParseTimestampValue(arguments);
int userId = arguments.GetInt32("user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new Write2BulkEntry
{
ItemHandle = handles[i],
Value = values[i],
TimestampValue = timestampValue,
UserId = userId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.Write2Bulk,
Write2Bulk = command,
},
cancellationToken);
}
private static Task<int> WriteSecuredBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteSecuredBulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
int currentUserId = arguments.GetInt32("current-user-id");
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteSecuredBulkEntry
{
ItemHandle = handles[i],
Value = values[i],
CurrentUserId = currentUserId,
VerifierUserId = verifierUserId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteSecuredBulk,
WriteSecuredBulk = command,
},
cancellationToken);
}
private static Task<int> WriteSecured2BulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
WriteSecured2BulkCommand command = new()
{
ServerHandle = arguments.GetInt32("server-handle"),
};
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
MxValue timestampValue = ParseTimestampValue(arguments);
int currentUserId = arguments.GetInt32("current-user-id");
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
EnsureSameLength(handles.Count, values.Count);
for (int i = 0; i < handles.Count; i++)
{
command.Entries.Add(new WriteSecured2BulkEntry
{
ItemHandle = handles[i],
Value = values[i],
TimestampValue = timestampValue,
CurrentUserId = currentUserId,
VerifierUserId = verifierUserId,
});
}
return InvokeAndWriteAsync(
arguments,
client,
output,
new MxCommand
{
Kind = MxCommandKind.WriteSecured2Bulk,
WriteSecured2Bulk = command,
},
cancellationToken);
}
/// <summary>
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
/// the single <c>--type</c> argument; the comma-separated values are
/// each parsed via <see cref="ParseValue(string, string)"/> on a per-entry basis.
/// This keeps the CLI simple for e2e use (one type, N values) — callers
/// that need heterogeneous types per entry should drive the library
/// directly.
/// </summary>
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
{
string type = arguments.GetRequired("type");
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
MxValue[] result = new MxValue[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = ParseValue(type, values[i]);
}
return result;
}
private static void EnsureSameLength(int handles, int values)
{
if (handles != values)
{
throw new ArgumentException(
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
}
}
/// <summary>
/// Cross-language stress benchmark for ReadBulk. Opens its own session,
/// subscribes to N tags so the worker's MxAccessValueCache populates from
/// real OnDataChange events, then hammers ReadBulk in a tight in-process
/// loop with per-call Stopwatch timing. Emits a single JSON object on
/// stdout that the scripts/bench-read-bulk.ps1 driver collates across
/// all five language clients.
/// </summary>
private static async Task<int> BenchReadBulkAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
int bulkSize = arguments.GetInt32("bulk-size", 6);
int tagStart = arguments.GetInt32("tag-start", 1);
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500);
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench";
string[] tags = new string[bulkSize];
for (int i = 0; i < bulkSize; i++)
{
// TestMachine_NNN.<attribute>, three-digit machine numbers matching
// the existing e2e tag-discovery convention.
tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}";
}
// Open + register + subscribe-bulk so the cache populates before the
// measurement window opens.
OpenSessionReply openReply = await client.OpenSessionAsync(
new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() },
cancellationToken)
.ConfigureAwait(false);
string sessionId = openReply.SessionId;
try
{
MxCommandReply registerReply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = clientName },
}),
cancellationToken)
.ConfigureAwait(false);
int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value;
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
subscribe.TagAddresses.Add(tags);
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
client,
CreateCommandRequest(sessionId, new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = subscribe,
}),
cancellationToken)
.ConfigureAwait(false);
int[] itemHandles = subscribeReply.SubscribeBulk?.Results
.Where(r => r.WasSuccessful)
.Select(r => r.ItemHandle)
.ToArray() ?? [];
// Warm-up: drive the same call shape so the JIT / connection
// pipelines settle before the measurement window opens.
DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds);
ReadBulkCommand readBulkCommand = new()
{
ServerHandle = serverHandle,
TimeoutMs = timeoutMs,
};
readBulkCommand.TagAddresses.Add(tags);
MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand };
while (DateTime.UtcNow < warmupDeadline)
{
_ = await client.InvokeAsync(
CreateCommandRequest(sessionId, readBulkMxCommand),
cancellationToken)
.ConfigureAwait(false);
}
// Steady state — capture per-call wall latency with a high-res
// Stopwatch so the resolution is sub-millisecond on modern Windows.
List<double> latencyMillis = new(capacity: 65536);
long totalReadResults = 0;
long cachedReadResults = 0;
int successfulCalls = 0;
int failedCalls = 0;
DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds);
DateTime steadyStart = DateTime.UtcNow;
while (DateTime.UtcNow < steadyDeadline)
{
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
MxCommandReply reply;
try
{
reply = await client.InvokeAsync(
CreateCommandRequest(sessionId, readBulkMxCommand),
cancellationToken)
.ConfigureAwait(false);
sw.Stop();
}
catch
{
sw.Stop();
failedCalls++;
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
continue;
}
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
{
failedCalls++;
continue;
}
successfulCalls++;
if (reply.ReadBulk is not null)
{
foreach (BulkReadResult r in reply.ReadBulk.Results)
{
totalReadResults++;
if (r.WasCached)
{
cachedReadResults++;
}
}
}
}
double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds;
if (itemHandles.Length > 0)
{
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle };
unsubscribe.ItemHandles.Add(itemHandles);
_ = await client.InvokeAsync(
CreateCommandRequest(sessionId, new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = unsubscribe,
}),
cancellationToken)
.ConfigureAwait(false);
}
int totalCalls = successfulCalls + failedCalls;
double callsPerSecond = steadyElapsedSeconds > 0
? totalCalls / steadyElapsedSeconds
: 0;
object stats = new
{
language = "dotnet",
command = "bench-read-bulk",
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
clientName,
bulkSize,
durationSeconds,
warmupSeconds,
durationMs = (long)(steadyElapsedSeconds * 1000),
tags,
totalCalls,
successfulCalls,
failedCalls,
totalReadResults,
cachedReadResults,
callsPerSecond = Math.Round(callsPerSecond, 2),
latencyMs = new
{
p50 = Percentile(latencyMillis, 0.50),
p95 = Percentile(latencyMillis, 0.95),
p99 = Percentile(latencyMillis, 0.99),
max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0,
mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0,
},
};
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
return 0;
}
finally
{
try
{
await client.CloseSessionAsync(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() },
cancellationToken)
.ConfigureAwait(false);
}
catch
{
// Closing the session is best-effort — never let it mask a real bench error.
}
}
}
/// <summary>
/// Computes the requested percentile from an unsorted latency sample using
/// nearest-rank with linear interpolation. Rounds to 3 decimal places to
/// match the JSON schema the PS driver collates.
/// </summary>
private static double Percentile(IReadOnlyList<double> sample, double quantile)
{
if (sample.Count == 0)
{
return 0;
}
double[] sorted = sample.ToArray();
Array.Sort(sorted);
if (sorted.Length == 1)
{
return Math.Round(sorted[0], 3);
}
double rank = quantile * (sorted.Length - 1);
int lower = (int)Math.Floor(rank);
int upper = (int)Math.Ceiling(rank);
double fraction = rank - lower;
double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction;
return Math.Round(value, 3);
}
private static Task<int> WriteAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -844,11 +1301,15 @@ public static class MxGatewayClientCli
private static MxValue ParseValue(CliArguments arguments)
{
string type = arguments.GetRequired("type").ToLowerInvariant();
string value = arguments.GetRequired("value");
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
}
private static MxValue ParseValue(string type, string value)
{
string normalisedType = type.ToLowerInvariant();
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
return type switch
return normalisedType switch
{
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
@@ -867,7 +1328,7 @@ public static class MxGatewayClientCli
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
.ToArray()
.ToMxValue(),
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
_ => throw new ArgumentException($"Unsupported MX value type '{normalisedType}'."),
};
}
@@ -1078,6 +1539,12 @@ public static class MxGatewayClientCli
or "advise"
or "subscribe-bulk"
or "unsubscribe-bulk"
or "read-bulk"
or "write-bulk"
or "write2-bulk"
or "write-secured-bulk"
or "write-secured2-bulk"
or "bench-read-bulk"
or "stream-events"
or "write"
or "write2"
@@ -1131,6 +1598,12 @@ public static class MxGatewayClientCli
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
writer.WriteLine("mxgw-dotnet read-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--timeout-ms <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] [--user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--json]");
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>] [--tag-start <n>] [--tag-prefix <s>] [--tag-attribute <s>] [--timeout-ms <n>] [--client-name <name>]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
@@ -502,6 +502,171 @@ public sealed class MxGatewaySession : IAsyncDisposable
return reply.UnsubscribeBulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Bulk Write — sequential MXAccess Write per entry on the worker's STA.
/// Per-item failures appear as <see cref="BulkWriteResult"/> entries with
/// <c>WasSuccessful = false</c>; the call never throws on per-item errors.
/// Protocol-level failures still throw via EnsureProtocolSuccess.
/// </summary>
/// <param name="serverHandle">The ServerHandle from register.</param>
/// <param name="entries">Per-item write entries; each carries the item handle, value, and user id.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
public async Task<IReadOnlyList<BulkWriteResult>> WriteBulkAsync(
int serverHandle,
IReadOnlyList<WriteBulkEntry> entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteBulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteBulk,
WriteBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteBulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Bulk Write2 — sequential MXAccess Write2 (timestamped) per entry.
/// Per-item failures appear as <see cref="BulkWriteResult"/> entries with
/// <c>WasSuccessful = false</c>; the call never throws on per-item errors.
/// </summary>
/// <param name="serverHandle">The ServerHandle from register.</param>
/// <param name="entries">Per-item write entries; each carries the item handle, value, timestamp, and user id.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
public async Task<IReadOnlyList<BulkWriteResult>> Write2BulkAsync(
int serverHandle,
IReadOnlyList<Write2BulkEntry> entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
Write2BulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2Bulk,
Write2Bulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Write2Bulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Bulk WriteSecured — sequential MXAccess WriteSecured per entry.
/// Credential-sensitive values must never reach logs; the client mirrors
/// the single-item WriteSecured redaction contract.
/// </summary>
/// <param name="serverHandle">The ServerHandle from register.</param>
/// <param name="entries">Per-item write entries; each carries the item handle, value, current user id, and verifier user id.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
public async Task<IReadOnlyList<BulkWriteResult>> WriteSecuredBulkAsync(
int serverHandle,
IReadOnlyList<WriteSecuredBulkEntry> entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecuredBulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecuredBulk,
WriteSecuredBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteSecuredBulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Bulk WriteSecured2 — sequential MXAccess WriteSecured2 (timestamped) per entry.
/// Same redaction rules as <see cref="WriteSecuredBulkAsync"/>.
/// </summary>
/// <param name="serverHandle">The ServerHandle from register.</param>
/// <param name="entries">Per-item write entries; each carries the item handle, value, timestamp, current user id, and verifier user id.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
public async Task<IReadOnlyList<BulkWriteResult>> WriteSecured2BulkAsync(
int serverHandle,
IReadOnlyList<WriteSecured2BulkEntry> entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecured2BulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecured2Bulk,
WriteSecured2Bulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteSecured2Bulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Bulk Read — snapshot the current value for each requested tag.
/// Returns the cached OnDataChange value when the tag is already advised
/// (<c>WasCached = true</c>), otherwise the worker takes the full AddItem +
/// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle. Per-tag
/// failures (timeout, invalid tag) appear as <see cref="BulkReadResult"/>
/// entries with <c>WasSuccessful = false</c>; the call never throws on
/// per-tag errors.
/// </summary>
/// <param name="serverHandle">The ServerHandle from register.</param>
/// <param name="tagAddresses">Tag addresses to read (one per result).</param>
/// <param name="timeout">Per-call timeout for the snapshot lifecycle path; <see cref="TimeSpan.Zero"/> uses the gateway default.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>One <see cref="BulkReadResult"/> per requested tag, in request order.</returns>
public async Task<IReadOnlyList<BulkReadResult>> ReadBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
ReadBulkCommand command = new()
{
ServerHandle = serverHandle,
TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue),
};
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.ReadBulk,
ReadBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.ReadBulk?.Results.ToArray() ?? [];
}
/// <summary>
/// Writes a value to an item on the MXAccess server.
/// </summary>
+400 -5
View File
@@ -15,6 +15,7 @@ import (
"io"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
@@ -90,6 +91,18 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
case "unsubscribe-bulk":
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
case "read-bulk":
return runReadBulk(ctx, args[1:], stdout, stderr)
case "write-bulk":
return runWriteBulk(ctx, args[1:], stdout, stderr)
case "write2-bulk":
return runWrite2Bulk(ctx, args[1:], stdout, stderr)
case "write-secured-bulk":
return runWriteSecuredBulk(ctx, args[1:], stdout, stderr)
case "write-secured2-bulk":
return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr)
case "bench-read-bulk":
return runBenchReadBulk(ctx, args[1:], stdout, stderr)
case "write":
return runWrite(ctx, args[1:], stdout, stderr)
case "stream-events":
@@ -340,11 +353,363 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr
}
defer client.Close()
handles, err := parseInt32List(*itemHandles)
if err != nil {
return err
}
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), parseInt32List(*itemHandles))
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles)
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
}
func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
items := flags.String("items", "", "comma-separated tag addresses")
timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *items == "" {
return errors.New("session-id and items are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond)
return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err)
}
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false)
}
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false)
}
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true)
}
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true)
}
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
// the four bulk-write families. withTimestamp adds a --timestamp-value flag;
// secured switches from --user-id to --current-user-id / --verifier-user-id.
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error {
flags := flag.NewFlagSet(command, flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
values := flags.String("values", "", "comma-separated values (one per item handle)")
userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *itemHandles == "" || *values == "" {
return errors.New("session-id, item-handles, and values are required")
}
handles, err := parseInt32List(*itemHandles)
if err != nil {
return err
}
valueTexts := parseStringList(*values)
if len(handles) != len(valueTexts) {
return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts))
}
parsedValues := make([]*mxgateway.MxValue, len(handles))
for i, text := range valueTexts {
v, err := parseValue(*valueType, text)
if err != nil {
return fmt.Errorf("entry %d: %w", i, err)
}
parsedValues[i] = v
}
var tsValue *mxgateway.MxValue
if withTimestamp {
if *timestampValue == "" {
return errors.New("timestamp-value is required for write2/write-secured2 bulk variants")
}
parsed, err := parseRfc3339Timestamp(*timestampValue)
if err != nil {
return err
}
tsValue = parsed
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
var results []*mxgateway.BulkWriteResult
switch command {
case "write-bulk":
entries := make([]*mxgateway.WriteBulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)}
}
results, err = session.WriteBulk(ctx, int32(*serverHandle), entries)
case "write2-bulk":
entries := make([]*mxgateway.Write2BulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)}
}
results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries)
case "write-secured-bulk":
entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteSecuredBulkEntry{
ItemHandle: handles[i],
Value: parsedValues[i],
CurrentUserId: int32(*currentUserID),
VerifierUserId: int32(*verifierUserID),
}
}
results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries)
case "write-secured2-bulk":
entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteSecured2BulkEntry{
ItemHandle: handles[i],
Value: parsedValues[i],
TimestampValue: tsValue,
CurrentUserId: int32(*currentUserID),
VerifierUserId: int32(*verifierUserID),
}
}
results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries)
default:
return fmt.Errorf("unsupported bulk write command %q", command)
}
_ = secured // currently only used for routing above; reserved for future per-variant validation
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
}
// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the
// MxValue protobuf representation used for the timestamped write families.
func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) {
t, err := time.Parse(time.RFC3339Nano, text)
if err != nil {
return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err)
}
return mxgateway.TimestampValue(t), nil
}
// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go:
// opens its own session, subscribes to bulk-size tags so the worker value cache
// populates from real OnDataChange events, runs ReadBulk in a tight loop for
// duration-seconds with per-call timing, and emits the shared JSON schema the
// scripts/bench-read-bulk.ps1 driver collates across all five clients.
func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-name", "mxgw-go-bench", "session client name")
durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds")
warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds")
bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call")
tagStart := flags.Int("tag-start", 1, "first machine number")
tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)")
tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix")
timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds")
if err := flags.Parse(args); err != nil {
return err
}
if *bulkSize < 1 {
return errors.New("bulk-size must be positive")
}
if *durationSeconds < 1 {
return errors.New("duration-seconds must be positive")
}
tags := make([]string, *bulkSize)
for i := 0; i < *bulkSize; i++ {
tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute)
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
if err != nil {
return err
}
defer func() {
_, _ = session.Close(context.Background())
}()
serverHandle, err := session.Register(ctx, *clientName)
if err != nil {
return err
}
subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags)
if err != nil {
return err
}
itemHandles := make([]int32, 0, len(subscribeResults))
for _, result := range subscribeResults {
if result.GetWasSuccessful() {
itemHandles = append(itemHandles, result.GetItemHandle())
}
}
defer func() {
if len(itemHandles) > 0 {
_, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles)
}
}()
// Warm-up: drive identical calls so any first-call JIT / connection-pool
// setup is amortised before the measurement window opens.
warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second)
timeout := time.Duration(*timeoutMs) * time.Millisecond
for time.Now().Before(warmupDeadline) {
_, _ = session.ReadBulk(ctx, serverHandle, tags, timeout)
}
// Steady state: per-call latency captured via time.Now() deltas.
latenciesMs := make([]float64, 0, 65536)
var totalReadResults int64
var cachedReadResults int64
var successfulCalls, failedCalls int
steadyStart := time.Now()
steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second)
for time.Now().Before(steadyDeadline) {
callStart := time.Now()
results, err := session.ReadBulk(ctx, serverHandle, tags, timeout)
elapsed := time.Since(callStart)
latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6)
if err != nil {
failedCalls++
continue
}
successfulCalls++
for _, r := range results {
totalReadResults++
if r.GetWasCached() {
cachedReadResults++
}
}
}
steadyElapsed := time.Since(steadyStart)
totalCalls := successfulCalls + failedCalls
callsPerSecond := 0.0
if steadyElapsed.Seconds() > 0 {
callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds()
}
stats := map[string]any{
"language": "go",
"command": "bench-read-bulk",
"endpoint": options.Endpoint,
"clientName": *clientName,
"bulkSize": *bulkSize,
"durationSeconds": *durationSeconds,
"warmupSeconds": *warmupSeconds,
"durationMs": steadyElapsed.Milliseconds(),
"tags": tags,
"totalCalls": totalCalls,
"successfulCalls": successfulCalls,
"failedCalls": failedCalls,
"totalReadResults": totalReadResults,
"cachedReadResults": cachedReadResults,
"callsPerSecond": roundTo(callsPerSecond, 2),
"latencyMs": percentileSummary(latenciesMs),
}
if *jsonOutput {
return writeJSON(stdout, stats)
}
fmt.Fprintln(stdout, callsPerSecond)
return nil
}
// percentileSummary returns the same { p50, p95, p99, max, mean } shape every
// language bench emits, rounded to 3 decimal places so the PowerShell driver
// sees one schema across all five clients.
func percentileSummary(sample []float64) map[string]float64 {
if len(sample) == 0 {
return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0}
}
sorted := append([]float64(nil), sample...)
sort.Float64s(sorted)
mean := 0.0
maxValue := sorted[len(sorted)-1]
for _, v := range sample {
mean += v
}
mean /= float64(len(sample))
return map[string]float64{
"p50": roundTo(percentile(sorted, 0.50), 3),
"p95": roundTo(percentile(sorted, 0.95), 3),
"p99": roundTo(percentile(sorted, 0.99), 3),
"max": roundTo(maxValue, 3),
"mean": roundTo(mean, 3),
}
}
// percentile uses nearest-rank with linear interpolation; matches the .NET
// implementation so cross-language comparisons are apples-to-apples.
func percentile(sorted []float64, quantile float64) float64 {
if len(sorted) == 0 {
return 0
}
if len(sorted) == 1 {
return sorted[0]
}
rank := quantile * float64(len(sorted)-1)
lower := int(rank)
upper := lower + 1
if upper >= len(sorted) {
return sorted[lower]
}
fraction := rank - float64(lower)
return sorted[lower] + (sorted[upper]-sorted[lower])*fraction
}
func roundTo(value float64, digits int) float64 {
shift := 1.0
for i := 0; i < digits; i++ {
shift *= 10
}
return float64(int64(value*shift+0.5)) / shift
}
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("write", flag.ContinueOnError)
flags.SetOutput(stderr)
@@ -517,7 +882,7 @@ func parseStringList(value string) []string {
return items
}
func parseInt32List(value string) []int32 {
func parseInt32List(value string) ([]int32, error) {
parts := strings.Split(value, ",")
items := make([]int32, 0, len(parts))
for _, part := range parts {
@@ -527,11 +892,11 @@ func parseInt32List(value string) []int32 {
}
parsed, err := strconv.ParseInt(item, 10, 32)
if err != nil {
panic(err)
return nil, fmt.Errorf("invalid item handle %q: %w", item, err)
}
items = append(items, int32(parsed))
}
return items
return items, nil
}
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
@@ -650,6 +1015,36 @@ func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options
return nil
}
func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, map[string]any{
"command": command,
"options": options,
"results": results,
})
}
fmt.Fprintln(stdout, len(results))
return nil
}
func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, map[string]any{
"command": command,
"options": options,
"results": results,
})
}
fmt.Fprintln(stdout, len(results))
return nil
}
func writeJSON(writer io.Writer, value any) error {
encoder := json.NewEncoder(writer)
encoder.SetIndent("", " ")
@@ -669,7 +1064,7 @@ type protojsonMessage interface {
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
// batchEOR is the end-of-result sentinel emitted to stdout after every command
+137
View File
@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"sync"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc/codes"
@@ -387,6 +388,142 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH
return reply.GetUnsubscribeBulk().GetResults(), nil
}
// WriteBulk invokes MXAccess Write sequentially for each entry inside one gateway command.
// Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call
// never returns an error for per-entry MXAccess failures (it returns an error only for
// protocol-level failures or transport errors).
func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) {
if entries == nil {
return nil, errors.New("mxgateway: write bulk entries are required")
}
if err := ensureBulkSize("write bulk entries", len(entries)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK,
Payload: &pb.MxCommand_WriteBulk{
WriteBulk: &pb.WriteBulkCommand{
ServerHandle: serverHandle,
Entries: entries,
},
},
})
if err != nil {
return nil, err
}
return reply.GetWriteBulk().GetResults(), nil
}
// Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command.
func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) {
if entries == nil {
return nil, errors.New("mxgateway: write2 bulk entries are required")
}
if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK,
Payload: &pb.MxCommand_Write2Bulk{
Write2Bulk: &pb.Write2BulkCommand{
ServerHandle: serverHandle,
Entries: entries,
},
},
})
if err != nil {
return nil, err
}
return reply.GetWrite2Bulk().GetResults(), nil
}
// WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive
// values must not be logged by callers; mirrors the single-item WriteSecured contract.
func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) {
if entries == nil {
return nil, errors.New("mxgateway: write-secured bulk entries are required")
}
if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK,
Payload: &pb.MxCommand_WriteSecuredBulk{
WriteSecuredBulk: &pb.WriteSecuredBulkCommand{
ServerHandle: serverHandle,
Entries: entries,
},
},
})
if err != nil {
return nil, err
}
return reply.GetWriteSecuredBulk().GetResults(), nil
}
// WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry.
func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) {
if entries == nil {
return nil, errors.New("mxgateway: write-secured2 bulk entries are required")
}
if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK,
Payload: &pb.MxCommand_WriteSecured2Bulk{
WriteSecured2Bulk: &pb.WriteSecured2BulkCommand{
ServerHandle: serverHandle,
Entries: entries,
},
},
})
if err != nil {
return nil, err
}
return reply.GetWriteSecured2Bulk().GetResults(), nil
}
// ReadBulk snapshots the current value of each requested tag.
//
// MXAccess COM has no synchronous Read; the worker satisfies this by returning the
// most recent cached OnDataChange value when the tag is already advised (WasCached=true),
// or by taking a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle
// otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the
// worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries
// with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures.
func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) {
if tagAddresses == nil {
return nil, errors.New("mxgateway: tag addresses are required")
}
if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil {
return nil, err
}
var timeoutMs uint32
if timeout > 0 {
ms := timeout.Milliseconds()
if ms > int64(^uint32(0)) {
timeoutMs = ^uint32(0)
} else {
timeoutMs = uint32(ms)
}
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK,
Payload: &pb.MxCommand_ReadBulk{
ReadBulk: &pb.ReadBulkCommand{
ServerHandle: serverHandle,
TagAddresses: tagAddresses,
TimeoutMs: timeoutMs,
},
},
})
if err != nil {
return nil, err
}
return reply.GetReadBulk().GetResults(), nil
}
// Write invokes MXAccess Write.
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
+26
View File
@@ -70,6 +70,32 @@ type (
WriteCommand = pb.WriteCommand
// Write2Command is the payload of an MXAccess Write2 command.
Write2Command = pb.Write2Command
// WriteBulkCommand is the payload of a bulk Write command.
WriteBulkCommand = pb.WriteBulkCommand
// WriteBulkEntry is one entry inside a WriteBulkCommand.
WriteBulkEntry = pb.WriteBulkEntry
// Write2BulkCommand is the payload of a bulk Write2 (timestamped) command.
Write2BulkCommand = pb.Write2BulkCommand
// Write2BulkEntry is one entry inside a Write2BulkCommand.
Write2BulkEntry = pb.Write2BulkEntry
// WriteSecuredBulkCommand is the payload of a bulk WriteSecured command.
WriteSecuredBulkCommand = pb.WriteSecuredBulkCommand
// WriteSecuredBulkEntry is one entry inside a WriteSecuredBulkCommand.
WriteSecuredBulkEntry = pb.WriteSecuredBulkEntry
// WriteSecured2BulkCommand is the payload of a bulk WriteSecured2 (timestamped) command.
WriteSecured2BulkCommand = pb.WriteSecured2BulkCommand
// WriteSecured2BulkEntry is one entry inside a WriteSecured2BulkCommand.
WriteSecured2BulkEntry = pb.WriteSecured2BulkEntry
// ReadBulkCommand is the payload of a bulk Read snapshot command.
ReadBulkCommand = pb.ReadBulkCommand
// BulkWriteReply aggregates BulkWriteResult entries for a bulk write command.
BulkWriteReply = pb.BulkWriteReply
// BulkWriteResult is one entry in a bulk write reply list.
BulkWriteResult = pb.BulkWriteResult
// BulkReadReply aggregates BulkReadResult entries for a bulk read command.
BulkReadReply = pb.BulkReadReply
// BulkReadResult is one entry in a bulk read reply list.
BulkReadResult = pb.BulkReadResult
// RegisterReply carries the ServerHandle returned by Register.
RegisterReply = pb.RegisterReply
// AddItemReply carries the ItemHandle returned by AddItem.
@@ -29,12 +29,18 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
@@ -113,6 +119,12 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
commandLine.addSubcommand("subscribe-bulk", new SubscribeBulkCommand(clientFactory));
commandLine.addSubcommand("unsubscribe-bulk", new UnsubscribeBulkCommand(clientFactory));
commandLine.addSubcommand("read-bulk", new ReadBulkCommand(clientFactory));
commandLine.addSubcommand("write-bulk", new WriteBulkCommand(clientFactory));
commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory));
commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory));
commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory));
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
@@ -603,6 +615,359 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
@Command(name = "read-bulk", description = "Invokes MXAccess ReadBulk (cached or snapshot per tag).")
static final class ReadBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--items", required = true, description = "Comma-separated tag addresses.")
String items;
@Option(
names = "--timeout-ms",
defaultValue = "0",
description = "Per-tag snapshot timeout in milliseconds (0 = worker default).")
int timeoutMs;
ReadBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<BulkReadResult> results = client.session(sessionId)
.readBulk(serverHandle, parseStringList(items), Duration.ofMillis(timeoutMs));
writeReadBulkOutput("read-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-bulk", description = "Invokes MXAccess WriteBulk.")
static final class WriteBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
int userId;
WriteBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
List<WriteBulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteBulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setUserId(userId)
.setValue(parseValue(type, valueTexts.get(i)))
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeBulk(serverHandle, entries);
writeWriteBulkOutput("write-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write2-bulk", description = "Invokes MXAccess Write2Bulk (timestamped).")
static final class Write2BulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
String timestamp;
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
int userId;
Write2BulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
List<Write2BulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(Write2BulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setUserId(userId)
.setValue(parseValue(type, valueTexts.get(i)))
.setTimestampValue(timestampValue)
.build());
}
List<BulkWriteResult> results = client.session(sessionId).write2Bulk(serverHandle, entries);
writeWriteBulkOutput("write2-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-secured-bulk", description = "Invokes MXAccess WriteSecuredBulk.")
static final class WriteSecuredBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
int currentUserId;
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
int verifierUserId;
WriteSecuredBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
List<WriteSecuredBulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteSecuredBulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setCurrentUserId(currentUserId)
.setVerifierUserId(verifierUserId)
.setValue(parseValue(type, valueTexts.get(i)))
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeSecuredBulk(serverHandle, entries);
writeWriteBulkOutput("write-secured-bulk", common, json, results);
}
return 0;
}
}
@Command(name = "write-secured2-bulk", description = "Invokes MXAccess WriteSecured2Bulk.")
static final class WriteSecured2BulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handles", required = true, description = "Comma-separated item handles.")
String itemHandles;
@Option(names = "--type", defaultValue = "string", description = "Value type for all entries.")
String type;
@Option(names = "--values", required = true, description = "Comma-separated values, one per item handle.")
String values;
@Option(names = "--timestamp", required = true, description = "ISO-8601 timestamp shared across all entries.")
String timestamp;
@Option(names = "--current-user-id", defaultValue = "0", description = "MXAccess current user id.")
int currentUserId;
@Option(names = "--verifier-user-id", defaultValue = "0", description = "MXAccess verifier user id.")
int verifierUserId;
WriteSecured2BulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
List<Integer> handles = parseIntList(itemHandles);
List<String> valueTexts = parseStringList(values);
if (handles.size() != valueTexts.size()) {
throw new IllegalArgumentException(
"item-handles count (" + handles.size() + ") does not match values count ("
+ valueTexts.size() + ")");
}
MxValue timestampValue = MxValues.timestampValue(Instant.parse(timestamp));
List<WriteSecured2BulkEntry> entries = new ArrayList<>(handles.size());
for (int i = 0; i < handles.size(); i++) {
entries.add(WriteSecured2BulkEntry.newBuilder()
.setItemHandle(handles.get(i))
.setCurrentUserId(currentUserId)
.setVerifierUserId(verifierUserId)
.setValue(parseValue(type, valueTexts.get(i)))
.setTimestampValue(timestampValue)
.build());
}
List<BulkWriteResult> results = client.session(sessionId).writeSecured2Bulk(serverHandle, entries);
writeWriteBulkOutput("write-secured2-bulk", common, json, results);
}
return 0;
}
}
@Command(
name = "bench-read-bulk",
description = "Repeatedly invokes ReadBulk for benchmarking; prints aggregate timing.")
static final class BenchReadBulkCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--items", required = true, description = "Comma-separated tag addresses.")
String items;
@Option(
names = "--timeout-ms",
defaultValue = "0",
description = "Per-tag snapshot timeout in milliseconds (0 = worker default).")
int timeoutMs;
@Option(names = "--iterations", defaultValue = "10", description = "Number of ReadBulk calls to perform.")
int iterations;
@Option(
names = "--warmup",
defaultValue = "1",
description = "Number of warmup iterations excluded from timing.")
int warmup;
BenchReadBulkCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
if (iterations <= 0) {
throw new IllegalArgumentException("--iterations must be positive");
}
if (warmup < 0) {
throw new IllegalArgumentException("--warmup must be non-negative");
}
List<String> tagAddresses = parseStringList(items);
Duration timeout = Duration.ofMillis(timeoutMs);
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxGatewayCliSession session = client.session(sessionId);
for (int i = 0; i < warmup; i++) {
session.readBulk(serverHandle, tagAddresses, timeout);
}
long totalNanos = 0L;
long minNanos = Long.MAX_VALUE;
long maxNanos = 0L;
int lastResultCount = 0;
int lastSuccessCount = 0;
int lastCachedCount = 0;
for (int i = 0; i < iterations; i++) {
long start = System.nanoTime();
List<BulkReadResult> results = session.readBulk(serverHandle, tagAddresses, timeout);
long elapsed = System.nanoTime() - start;
totalNanos += elapsed;
minNanos = Math.min(minNanos, elapsed);
maxNanos = Math.max(maxNanos, elapsed);
lastResultCount = results.size();
lastSuccessCount = 0;
lastCachedCount = 0;
for (BulkReadResult result : results) {
if (result.getWasSuccessful()) {
lastSuccessCount++;
}
if (result.getWasCached()) {
lastCachedCount++;
}
}
}
double avgMs = totalNanos / 1_000_000.0 / iterations;
double minMs = minNanos / 1_000_000.0;
double maxMs = maxNanos / 1_000_000.0;
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", "bench-read-bulk");
output.put("options", common.redactedJsonMap());
output.put("iterations", iterations);
output.put("warmup", warmup);
output.put("tagCount", tagAddresses.size());
output.put("resultCount", lastResultCount);
output.put("successCount", lastSuccessCount);
output.put("cachedCount", lastCachedCount);
output.put("avgMs", avgMs);
output.put("minMs", minMs);
output.put("maxMs", maxMs);
out.println(jsonObject(output));
} else {
out.printf(
"iterations=%d tags=%d avg=%.3fms min=%.3fms max=%.3fms last_results=%d last_success=%d last_cached=%d%n",
iterations,
tagAddresses.size(),
avgMs,
minMs,
maxMs,
lastResultCount,
lastSuccessCount,
lastCachedCount);
}
}
return 0;
}
}
@Command(name = "write", description = "Invokes MXAccess Write.")
static final class WriteCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
@@ -818,6 +1183,16 @@ public final class MxGatewayCli implements Callable<Integer> {
List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles);
List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout);
List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries);
List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries);
List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries);
List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries);
MxEventStream streamEventsAfter(long afterWorkerSequence);
}
@@ -909,6 +1284,31 @@ public final class MxGatewayCli implements Callable<Integer> {
return session.unsubscribeBulk(serverHandle, itemHandles);
}
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout) {
return session.readBulk(serverHandle, items, timeout);
}
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
return session.writeBulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
return session.write2Bulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
return session.writeSecuredBulk(serverHandle, entries);
}
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
return session.writeSecured2Bulk(serverHandle, entries);
}
@Override
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
return session.streamEventsAfter(afterWorkerSequence);
@@ -957,6 +1357,56 @@ public final class MxGatewayCli implements Callable<Integer> {
return values;
}
private static void writeWriteBulkOutput(
String command, CommonOptions common, boolean json, List<BulkWriteResult> results) {
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", command);
output.put("options", common.redactedJsonMap());
output.put("results", results.stream().map(MxGatewayCli::bulkWriteResultMap).toList());
out.println(jsonObject(output));
return;
}
out.println(results.size());
}
private static Map<String, Object> bulkWriteResultMap(BulkWriteResult result) {
Map<String, Object> values = new LinkedHashMap<>();
values.put("serverHandle", result.getServerHandle());
values.put("itemHandle", result.getItemHandle());
values.put("wasSuccessful", result.getWasSuccessful());
values.put("hresult", result.hasHresult() ? (Object) result.getHresult() : null);
values.put("errorMessage", result.getErrorMessage());
return values;
}
private static void writeReadBulkOutput(
String command, CommonOptions common, boolean json, List<BulkReadResult> results) {
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", command);
output.put("options", common.redactedJsonMap());
output.put("results", results.stream().map(MxGatewayCli::bulkReadResultMap).toList());
out.println(jsonObject(output));
return;
}
out.println(results.size());
}
private static Map<String, Object> bulkReadResultMap(BulkReadResult result) {
Map<String, Object> values = new LinkedHashMap<>();
values.put("serverHandle", result.getServerHandle());
values.put("tagAddress", result.getTagAddress());
values.put("itemHandle", result.getItemHandle());
values.put("wasSuccessful", result.getWasSuccessful());
values.put("wasCached", result.getWasCached());
values.put("quality", result.getQuality());
values.put("errorMessage", result.getErrorMessage());
return values;
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
@@ -9,9 +9,12 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
@@ -25,6 +28,10 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@@ -339,6 +346,73 @@ final class MxGatewayCliTests {
return results;
}
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, Duration timeout) {
List<BulkReadResult> results = new ArrayList<>();
for (int index = 0; index < items.size(); index++) {
results.add(BulkReadResult.newBuilder()
.setServerHandle(serverHandle)
.setTagAddress(items.get(index))
.setItemHandle(200 + index)
.setWasSuccessful(true)
.setWasCached(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (Write2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecuredBulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
List<BulkWriteResult> results = new ArrayList<>();
for (WriteSecured2BulkEntry entry : entries) {
results.add(BulkWriteResult.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(entry.getItemHandle())
.setWasSuccessful(true)
.build());
}
return results;
}
@Override
public com.zb.mom.ww.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) {
throw new UnsupportedOperationException("stream-events is covered by client tests");
@@ -1,6 +1,7 @@
package com.zb.mom.ww.mxgateway.client;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;
@@ -9,6 +10,8 @@ import mxaccess_gateway.v1.MxaccessGateway.AddItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommand;
@@ -17,6 +20,7 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.ReadBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemCommand;
@@ -27,8 +31,16 @@ import mxaccess_gateway.v1.MxaccessGateway.UnAdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnAdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnsubscribeBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.Write2Command;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
/**
* Typed handle for a single MXAccess gateway session.
@@ -421,6 +433,142 @@ public final class MxGatewaySession implements AutoCloseable {
return reply.getUnsubscribeBulk().getResultsList();
}
/**
* Bulk {@code Write} sequential MXAccess Write per entry on the worker's STA.
*
* <p>Per-entry failures appear as {@link BulkWriteResult} entries with
* {@code wasSuccessful == false}; this method does not throw for per-entry
* MXAccess failures (it still throws {@link MxGatewayException} on transport
* or protocol-level failures).
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_BULK)
.setWriteBulk(WriteBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteBulk().getResultsList();
}
/**
* Bulk {@code Write2} sequential MXAccess Write2 (timestamped) per entry.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, timestamp, user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE2_BULK)
.setWrite2Bulk(Write2BulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWrite2Bulk().getResultsList();
}
/**
* Bulk {@code WriteSecured} credential-sensitive values must not be logged
* by callers; mirrors the single-item write-secured redaction contract.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, current+verifier user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED_BULK)
.setWriteSecuredBulk(WriteSecuredBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteSecuredBulk().getResultsList();
}
/**
* Bulk {@code WriteSecured2} sequential timestamped + verified write per entry.
*
* <p>Per-entry semantics mirror {@link #writeBulk(int, List)}.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param entries the per-item (handle, value, timestamp, current+verifier user id) tuples
* @return a per-entry {@link BulkWriteResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code entries} is {@code null}
*/
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
Objects.requireNonNull(entries, "entries");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE_SECURED2_BULK)
.setWriteSecured2Bulk(WriteSecured2BulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllEntries(entries))
.build());
return reply.getWriteSecured2Bulk().getResultsList();
}
/**
* Bulk {@code Read} snapshot the current value of each requested tag.
*
* <p>MXAccess COM has no synchronous read; the worker returns the cached
* {@code OnDataChange} value for any tag that is already advised
* ({@code wasCached == true}) without modifying the existing subscription,
* and falls back to a full AddItem + Advise + wait + UnAdvise + RemoveItem
* snapshot lifecycle otherwise. The supplied {@code timeout} bounds the
* per-tag wait in the snapshot case; pass {@link Duration#ZERO} (or
* {@code null}) to use the worker default (1000 ms). Per-tag failures
* appear as {@link BulkReadResult} entries with {@code wasSuccessful == false};
* this method does not throw for per-tag MXAccess failures.
*
* @param serverHandle the {@code ServerHandle} owning the items
* @param tagAddresses the tag addresses to read
* @param timeout per-tag snapshot timeout (zero or null = worker default)
* @return a per-tag {@link BulkReadResult} list
* @throws MxGatewayException on transport or protocol failure
* @throws NullPointerException if {@code tagAddresses} is {@code null}
* @throws IllegalArgumentException if {@code timeout} is negative or exceeds {@link Integer#MAX_VALUE} milliseconds
*/
public List<BulkReadResult> readBulk(int serverHandle, List<String> tagAddresses, Duration timeout) {
Objects.requireNonNull(tagAddresses, "tagAddresses");
int timeoutMs = 0;
if (timeout != null) {
if (timeout.isNegative()) {
throw new IllegalArgumentException("timeout must be non-negative");
}
long millis = timeout.toMillis();
if (millis > Integer.MAX_VALUE) {
throw new IllegalArgumentException("timeout exceeds Integer.MAX_VALUE milliseconds");
}
timeoutMs = (int) millis;
}
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_READ_BULK)
.setReadBulk(ReadBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllTagAddresses(tagAddresses)
.setTimeoutMs(timeoutMs))
.build());
return reply.getReadBulk().getResultsList();
}
/**
* Invokes MXAccess {@code Write}.
*
@@ -334,6 +334,138 @@ class Session:
)
return list(reply.unsubscribe_bulk.results)
async def write_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteBulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteBulk` and return one BulkWriteResult per entry.
Per-entry MXAccess failures appear as results with ``was_successful = False``
and a populated ``error_message`` / ``hresult``; this method does not raise
on per-entry failure, mirroring the existing add/advise bulk surface.
"""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_BULK,
write_bulk=pb.WriteBulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_bulk.results)
async def write2_bulk(
self,
server_handle: int,
entries: Sequence[pb.Write2BulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `Write2Bulk` (timestamped) and return per-entry results."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE2_BULK,
write2_bulk=pb.Write2BulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write2_bulk.results)
async def write_secured_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteSecuredBulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteSecuredBulk` — credential-sensitive values must not be logged."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_SECURED_BULK,
write_secured_bulk=pb.WriteSecuredBulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_secured_bulk.results)
async def write_secured2_bulk(
self,
server_handle: int,
entries: Sequence[pb.WriteSecured2BulkEntry],
*,
correlation_id: str = "",
) -> list[pb.BulkWriteResult]:
"""Invoke MXAccess `WriteSecured2Bulk` (timestamped + verified)."""
if entries is None:
raise TypeError("entries is required")
_ensure_bulk_size("entries", len(entries))
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE_SECURED2_BULK,
write_secured2_bulk=pb.WriteSecured2BulkCommand(
server_handle=server_handle,
entries=entries,
),
),
correlation_id=correlation_id,
)
return list(reply.write_secured2_bulk.results)
async def read_bulk(
self,
server_handle: int,
tag_addresses: Sequence[str],
*,
timeout_ms: int = 0,
correlation_id: str = "",
) -> list[pb.BulkReadResult]:
"""Invoke `ReadBulk` — snapshot the current value of each requested tag.
MXAccess COM has no synchronous read; the worker returns the cached
``OnDataChange`` value for any tag that is already advised (``was_cached =
True``) without modifying the existing subscription, and falls back to
a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle
otherwise. ``timeout_ms`` bounds the per-tag wait in the snapshot case;
pass ``0`` to use the worker default (1000 ms).
"""
if tag_addresses is None:
raise TypeError("tag_addresses is required")
_ensure_bulk_size("tag_addresses", len(tag_addresses))
if timeout_ms < 0:
raise ValueError("timeout_ms must be non-negative")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_READ_BULK,
read_bulk=pb.ReadBulkCommand(
server_handle=server_handle,
tag_addresses=tag_addresses,
timeout_ms=timeout_ms,
),
),
correlation_id=correlation_id,
)
return list(reply.read_bulk.results)
async def write(
self,
server_handle: int,
@@ -20,7 +20,7 @@ from zb_mom_ww_mxgateway.client import GatewayClient
from zb_mom_ww_mxgateway.errors import MxGatewayError
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
from zb_mom_ww_mxgateway.options import ClientOptions
from zb_mom_ww_mxgateway.values import MxValueInput
from zb_mom_ww_mxgateway.values import MxValueInput, to_mx_value
MAX_AGGREGATE_EVENTS = 10_000
@@ -263,6 +263,112 @@ def unsubscribe_bulk(**kwargs: Any) -> None:
)
@main.command("read-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--items", required=True, help="Comma-separated MXAccess tag addresses.")
@click.option("--timeout-ms", default=0, type=int, show_default=True,
help="Per-tag snapshot timeout in milliseconds. 0 = worker default.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def read_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess ReadBulk — cached value when advised, snapshot otherwise."""
_run(_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteBulk — sequential Write per entry."""
_run(_write_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write2-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write2_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess Write2Bulk — timestamped sequential Write2 per entry."""
_run(_write2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-secured-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--current-user-id", default=0, type=int, show_default=True)
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_secured_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteSecuredBulk — credential-sensitive."""
_run(_write_secured_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("write-secured2-bulk")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handles", required=True, help="Comma-separated MXAccess item handles.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--values", required=True, help="Comma-separated values, one per item handle.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp shared across all entries.")
@click.option("--current-user-id", default=0, type=int, show_default=True)
@click.option("--verifier-user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write_secured2_bulk(**kwargs: Any) -> None:
"""Invoke MXAccess WriteSecured2Bulk — timestamped + credential-sensitive."""
_run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("bench-read-bulk")
@gateway_options
@click.option("--client-name", default="mxgw-python-bench", show_default=True)
@click.option("--duration-seconds", default=30, type=int, show_default=True)
@click.option("--warmup-seconds", default=3, type=int, show_default=True)
@click.option("--bulk-size", default=6, type=int, show_default=True)
@click.option("--tag-start", default=1, type=int, show_default=True)
@click.option("--tag-prefix", default="TestMachine_", show_default=True)
@click.option("--tag-attribute", default="TestChangingInt", show_default=True)
@click.option("--timeout-ms", default=1500, type=int, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def bench_read_bulk(**kwargs: Any) -> None:
"""Cross-language ReadBulk stress benchmark.
Opens its own session, subscribes to bulk-size tags so the worker value
cache populates from real OnDataChange events, runs ReadBulk in a tight
loop for duration-seconds, and emits the shared JSON stats schema the
scripts/bench-read-bulk.ps1 driver collates across all five clients.
"""
_run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("stream-events")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@@ -417,6 +523,233 @@ async def _unsubscribe_bulk(**kwargs: Any) -> dict[str, Any]:
return {"results": [_message_dict(result) for result in results]}
async def _read_bulk(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.read_bulk(
kwargs["server_handle"],
_parse_string_list(kwargs["items"]),
timeout_ms=kwargs["timeout_ms"],
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
def _build_write_bulk_entries(kwargs: dict[str, Any]):
"""Build (item_handle, MxValue) pairs for the bulk-write families.
The CLI accepts a single ``--type`` plus ``--values`` (comma-separated
string-encoded values, one per ``--item-handles`` entry). Returns the
parsed item-handle list and the per-entry MxValue protobuf instances
callers wrap these into the appropriate per-entry message type.
"""
handles = _parse_int_list(kwargs["item_handles"])
value_texts = _parse_string_list(kwargs["values"])
if len(handles) != len(value_texts):
raise click.UsageError(
f"item-handles count ({len(handles)}) does not match values count ({len(value_texts)})",
)
parsed = [_parse_value(text, kwargs["value_type"]) for text in value_texts]
values = [to_mx_value(v) for v in parsed]
return handles, values
async def _write_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
entries = [
pb.WriteBulkEntry(item_handle=handle, user_id=kwargs["user_id"], value=value)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write2_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
entries = [
pb.Write2BulkEntry(
item_handle=handle,
user_id=kwargs["user_id"],
value=value,
timestamp_value=timestamp_value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write2_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write_secured_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
entries = [
pb.WriteSecuredBulkEntry(
item_handle=handle,
current_user_id=kwargs["current_user_id"],
verifier_user_id=kwargs["verifier_user_id"],
value=value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_secured_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]:
handles, values = _build_write_bulk_entries(kwargs)
timestamp_value = to_mx_value(_parse_datetime(kwargs["timestamp"]))
entries = [
pb.WriteSecured2BulkEntry(
item_handle=handle,
current_user_id=kwargs["current_user_id"],
verifier_user_id=kwargs["verifier_user_id"],
value=value,
timestamp_value=timestamp_value,
)
for handle, value in zip(handles, values)
]
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
results = await session.write_secured2_bulk(
kwargs["server_handle"],
entries,
correlation_id=kwargs["correlation_id"],
)
return {"results": [_message_dict(result) for result in results]}
async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]:
"""ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema."""
import time
bulk_size = int(kwargs["bulk_size"])
if bulk_size < 1:
raise click.UsageError("bulk-size must be positive")
duration_seconds = int(kwargs["duration_seconds"])
warmup_seconds = int(kwargs["warmup_seconds"])
tag_start = int(kwargs["tag_start"])
tag_prefix = kwargs["tag_prefix"]
tag_attribute = kwargs["tag_attribute"]
timeout_ms = int(kwargs["timeout_ms"])
client_name = kwargs["client_name"]
tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)]
async with await _connect(kwargs) as client:
session = await client.open_session(client_session_name=client_name)
server_handle = 0
item_handles: list[int] = []
try:
server_handle = await session.register(client_name)
subscribe_results = await session.subscribe_bulk(server_handle, tags)
item_handles = [r.item_handle for r in subscribe_results if r.was_successful]
# Warm-up window so JIT / connection pool / first-call costs are
# amortised before the measurement window opens.
warmup_deadline = time.perf_counter() + warmup_seconds
while time.perf_counter() < warmup_deadline:
await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms)
latencies_ms: list[float] = []
total_results = 0
cached_results = 0
successful = 0
failed = 0
steady_start = time.perf_counter()
steady_deadline = steady_start + duration_seconds
while time.perf_counter() < steady_deadline:
call_start = time.perf_counter()
try:
results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms)
except Exception:
failed += 1
latencies_ms.append((time.perf_counter() - call_start) * 1000.0)
continue
latencies_ms.append((time.perf_counter() - call_start) * 1000.0)
successful += 1
for r in results:
total_results += 1
if r.was_cached:
cached_results += 1
steady_elapsed = time.perf_counter() - steady_start
total_calls = successful + failed
calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0
finally:
if item_handles:
try:
await session.unsubscribe_bulk(server_handle, item_handles)
except Exception:
pass
try:
await session.close()
except Exception:
pass
return {
"language": "python",
"command": "bench-read-bulk",
"endpoint": kwargs.get("endpoint"),
"clientName": client_name,
"bulkSize": bulk_size,
"durationSeconds": duration_seconds,
"warmupSeconds": warmup_seconds,
"durationMs": int(steady_elapsed * 1000),
"tags": tags,
"totalCalls": total_calls,
"successfulCalls": successful,
"failedCalls": failed,
"totalReadResults": total_results,
"cachedReadResults": cached_results,
"callsPerSecond": round(calls_per_second, 2),
"latencyMs": _percentile_summary(latencies_ms),
}
def _percentile_summary(sample: list[float]) -> dict[str, float]:
if not sample:
return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0}
sorted_sample = sorted(sample)
return {
"p50": round(_percentile(sorted_sample, 0.50), 3),
"p95": round(_percentile(sorted_sample, 0.95), 3),
"p99": round(_percentile(sorted_sample, 0.99), 3),
"max": round(sorted_sample[-1], 3),
"mean": round(sum(sample) / len(sample), 3),
}
def _percentile(sorted_sample: list[float], quantile: float) -> float:
"""Nearest-rank with linear interpolation; matches every other client."""
n = len(sorted_sample)
if n == 0:
return 0.0
if n == 1:
return sorted_sample[0]
rank = quantile * (n - 1)
lower = int(rank)
upper = min(lower + 1, n - 1)
fraction = rank - lower
return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
+648 -3
View File
@@ -12,14 +12,15 @@ use std::env;
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
use std::time::{Duration, Instant};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest,
PingCommand, StreamEventsRequest,
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxValue as ProtoMxValue,
OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry,
WriteSecured2BulkEntry, WriteSecuredBulkEntry,
};
use zb_mom_ww_mxgateway_client::{
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION,
@@ -128,6 +129,137 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Snapshot the current value for each requested tag. Cached
/// OnDataChange values are returned for tags that are already advised
/// without touching the existing subscription; otherwise the worker
/// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle.
ReadBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
items: Vec<String>,
/// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms).
#[arg(long, default_value_t = 0)]
timeout_ms: u32,
#[arg(long)]
json: bool,
},
/// Bulk Write — one MXAccess Write per (item_handle, value) pair.
WriteBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk Write2 — timestamped variant; the timestamp applies to all entries.
Write2Bulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk WriteSecured.
WriteSecuredBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long, default_value_t = 0)]
current_user_id: i32,
#[arg(long, default_value_t = 0)]
verifier_user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk WriteSecured2 — timestamped + verified.
WriteSecured2Bulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
current_user_id: i32,
#[arg(long, default_value_t = 0)]
verifier_user_id: i32,
#[arg(long)]
json: bool,
},
/// Cross-language stress benchmark for ReadBulk: opens its own session,
/// subscribes to `--bulk-size` tags so the worker's per-session value cache
/// populates from real OnDataChange events, then hammers ReadBulk in a
/// tight loop for `--duration-seconds` with per-call latency capture. Emits
/// a single JSON object on stdout matching the schema the
/// `scripts/bench-read-bulk.ps1` driver collates across all five clients.
BenchReadBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "mxgw-rust-bench")]
client_name: String,
#[arg(long, default_value_t = 30)]
duration_seconds: u64,
#[arg(long, default_value_t = 3)]
warmup_seconds: u64,
#[arg(long, default_value_t = 6)]
bulk_size: usize,
#[arg(long, default_value_t = 1)]
tag_start: i32,
#[arg(long, default_value = "TestMachine_")]
tag_prefix: String,
#[arg(long, default_value = "TestChangingInt")]
tag_attribute: String,
#[arg(long, default_value_t = 1500)]
timeout_ms: u32,
#[arg(long)]
json: bool,
},
StreamEvents {
#[command(flatten)]
connection: ConnectionArgs,
@@ -450,6 +582,162 @@ async fn dispatch(command: Command) -> Result<(), Error> {
.await?;
print_bulk_results("unsubscribe-bulk", &results, json);
}
Command::ReadBulk {
connection,
session_id,
server_handle,
items,
timeout_ms,
json,
} => {
let session = session_for(connection, session_id).await?;
let results = session.read_bulk(server_handle, items, timeout_ms).await?;
print_read_bulk_results("read-bulk", &results, json);
}
Command::WriteBulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let session = session_for(connection, session_id).await?;
let results = session
.write_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteBulkEntry {
item_handle,
value: Some(value),
user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-bulk", &results, json);
}
Command::Write2Bulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
timestamp,
user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
let session = session_for(connection, session_id).await?;
let results = session
.write2_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| Write2BulkEntry {
item_handle,
value: Some(value),
timestamp_value: Some(timestamp_value.clone()),
user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write2-bulk", &results, json);
}
Command::WriteSecuredBulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
current_user_id,
verifier_user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let session = session_for(connection, session_id).await?;
let results = session
.write_secured_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteSecuredBulkEntry {
item_handle,
value: Some(value),
current_user_id,
verifier_user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-secured-bulk", &results, json);
}
Command::WriteSecured2Bulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
timestamp,
current_user_id,
verifier_user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
let session = session_for(connection, session_id).await?;
let results = session
.write_secured2_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteSecured2BulkEntry {
item_handle,
value: Some(value),
timestamp_value: Some(timestamp_value.clone()),
current_user_id,
verifier_user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-secured2-bulk", &results, json);
}
Command::BenchReadBulk {
connection,
client_name,
duration_seconds,
warmup_seconds,
bulk_size,
tag_start,
tag_prefix,
tag_attribute,
timeout_ms,
json,
} => {
run_bench_read_bulk(
connection,
client_name,
duration_seconds,
warmup_seconds,
bulk_size,
tag_start,
tag_prefix,
tag_attribute,
timeout_ms,
json,
)
.await?;
}
Command::StreamEvents {
connection,
session_id,
@@ -891,6 +1179,363 @@ fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error>
Ok(parsed)
}
fn print_write_bulk_results(
operation: &str,
results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult],
use_json: bool,
) {
if use_json {
let results_json: Vec<_> = results
.iter()
.map(|result| {
json!({
"serverHandle": result.server_handle,
"itemHandle": result.item_handle,
"wasSuccessful": result.was_successful,
"hresult": result.hresult,
"errorMessage": result.error_message,
})
})
.collect();
println!(
"{}",
json!({ "operation": operation, "results": results_json })
);
} else {
println!("{}", results.len());
}
}
fn print_read_bulk_results(
operation: &str,
results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult],
use_json: bool,
) {
if use_json {
let results_json: Vec<_> = results
.iter()
.map(|result| {
json!({
"serverHandle": result.server_handle,
"tagAddress": result.tag_address,
"itemHandle": result.item_handle,
"wasSuccessful": result.was_successful,
"wasCached": result.was_cached,
"quality": result.quality,
"errorMessage": result.error_message,
})
})
.collect();
println!(
"{}",
json!({ "operation": operation, "results": results_json })
);
} else {
println!("{}", results.len());
}
}
/// Drive the cross-language ReadBulk stress benchmark from Rust: opens its
/// own session, subscribes to `bulk_size` tags so the worker's per-session
/// value cache populates from real OnDataChange events, hammers ReadBulk in
/// a tight loop for `duration_seconds` with per-call latency capture, and
/// emits the shared JSON stats schema the `scripts/bench-read-bulk.ps1`
/// driver collates across all five clients.
#[allow(clippy::too_many_arguments)]
async fn run_bench_read_bulk(
connection: ConnectionArgs,
client_name: String,
duration_seconds: u64,
warmup_seconds: u64,
bulk_size: usize,
tag_start: i32,
tag_prefix: String,
tag_attribute: String,
timeout_ms: u32,
json: bool,
) -> Result<(), Error> {
if bulk_size == 0 {
return Err(Error::InvalidArgument {
name: "bulk-size".to_owned(),
detail: "bulk-size must be positive".to_owned(),
});
}
if duration_seconds == 0 {
return Err(Error::InvalidArgument {
name: "duration-seconds".to_owned(),
detail: "duration-seconds must be positive".to_owned(),
});
}
// Build TestMachine_NNN.<attribute> tags with three-digit machine numbers
// matching the existing cross-language tag-discovery convention.
let tags: Vec<String> = (0..bulk_size)
.map(|index| {
format!(
"{prefix}{number:03}.{attr}",
prefix = tag_prefix,
number = tag_start + index as i32,
attr = tag_attribute,
)
})
.collect();
let endpoint = connection.endpoint.clone();
let client = connect(connection).await?;
let session = client
.open_session(OpenSessionRequest {
client_session_name: client_name.clone(),
..OpenSessionRequest::default()
})
.await?;
let session_id = session.id().to_owned();
// Subscribe so the worker's MxAccessValueCache populates from real
// OnDataChange events before the measurement window opens. Any per-tag
// failures fall through silently; the bench is still meaningful for the
// successfully-subscribed subset.
let bench_outcome = async {
let server_handle = session.register(&client_name).await?;
let subscribe_results = session
.subscribe_bulk(server_handle, tags.clone())
.await?;
let item_handles: Vec<i32> = subscribe_results
.iter()
.filter(|r| r.was_successful)
.map(|r| r.item_handle)
.collect();
let timeout_ms_param = timeout_ms;
// Warm-up: drive identical calls so any connection-pool / channel
// setup is amortised before the measurement window opens.
let warmup_deadline = Instant::now() + Duration::from_secs(warmup_seconds);
while Instant::now() < warmup_deadline {
let _ = session
.read_bulk(server_handle, tags.clone(), timeout_ms_param)
.await;
}
// Steady-state measurement window: capture per-call latency as
// sub-millisecond f64 deltas from Instant::now() so the histogram
// resolution matches the .NET Stopwatch / Go time.Now path.
let mut latencies_ms: Vec<f64> = Vec::with_capacity(65_536);
let mut total_read_results: i64 = 0;
let mut cached_read_results: i64 = 0;
let mut successful_calls: u64 = 0;
let mut failed_calls: u64 = 0;
let steady_start = Instant::now();
let steady_deadline = steady_start + Duration::from_secs(duration_seconds);
while Instant::now() < steady_deadline {
let call_start = Instant::now();
let result = session
.read_bulk(server_handle, tags.clone(), timeout_ms_param)
.await;
let elapsed = call_start.elapsed();
latencies_ms.push(elapsed.as_secs_f64() * 1000.0);
match result {
Ok(results) => {
successful_calls += 1;
for r in &results {
total_read_results += 1;
if r.was_cached {
cached_read_results += 1;
}
}
}
Err(_) => failed_calls += 1,
}
}
let steady_elapsed = steady_start.elapsed();
// Best-effort cleanup: unsubscribe so the worker can release cache slots.
if !item_handles.is_empty() {
let _ = session
.unsubscribe_bulk(server_handle, item_handles)
.await;
}
let total_calls = successful_calls + failed_calls;
let calls_per_second = if steady_elapsed.as_secs_f64() > 0.0 {
total_calls as f64 / steady_elapsed.as_secs_f64()
} else {
0.0
};
Ok::<_, Error>(BenchStats {
endpoint,
client_name,
bulk_size,
duration_seconds,
warmup_seconds,
duration_ms: steady_elapsed.as_millis() as u64,
tags: tags.clone(),
total_calls,
successful_calls,
failed_calls,
total_read_results,
cached_read_results,
calls_per_second,
latencies_ms,
})
}
.await;
// Always close the session, even if the bench loop returned an error.
let close_result = client
.close_session_raw(CloseSessionRequest {
session_id: session_id.clone(),
client_correlation_id: "rust-cli-bench-read-bulk-close".to_owned(),
})
.await;
let stats = bench_outcome?;
// Closing the session is best-effort; never let it mask a real bench error.
let _ = close_result;
if json {
let latency = percentile_summary(&stats.latencies_ms);
let payload = json!({
"language": "rust",
"command": "bench-read-bulk",
"endpoint": stats.endpoint,
"clientName": stats.client_name,
"bulkSize": stats.bulk_size,
"durationSeconds": stats.duration_seconds,
"warmupSeconds": stats.warmup_seconds,
"durationMs": stats.duration_ms,
"tags": stats.tags,
"totalCalls": stats.total_calls,
"successfulCalls": stats.successful_calls,
"failedCalls": stats.failed_calls,
"totalReadResults": stats.total_read_results,
"cachedReadResults": stats.cached_read_results,
"callsPerSecond": round_to(stats.calls_per_second, 2),
"latencyMs": {
"p50": round_to(latency.p50, 3),
"p95": round_to(latency.p95, 3),
"p99": round_to(latency.p99, 3),
"max": round_to(latency.max, 3),
"mean": round_to(latency.mean, 3),
},
});
println!("{payload}");
} else {
println!("{}", stats.calls_per_second);
}
Ok(())
}
/// Collected bench-read-bulk measurements; carried in one struct so the
/// async block can finish cleanup (unsubscribe, close-session) before the
/// caller renders the JSON / plain output.
struct BenchStats {
endpoint: String,
client_name: String,
bulk_size: usize,
duration_seconds: u64,
warmup_seconds: u64,
duration_ms: u64,
tags: Vec<String>,
total_calls: u64,
successful_calls: u64,
failed_calls: u64,
total_read_results: i64,
cached_read_results: i64,
calls_per_second: f64,
latencies_ms: Vec<f64>,
}
/// The same `{ p50, p95, p99, max, mean }` shape every language bench emits.
/// `p50`/`p95`/`p99` use nearest-rank with linear interpolation, matching the
/// .NET / Go implementations so cross-language comparisons are apples-to-apples.
struct LatencySummary {
p50: f64,
p95: f64,
p99: f64,
max: f64,
mean: f64,
}
fn percentile_summary(sample: &[f64]) -> LatencySummary {
if sample.is_empty() {
return LatencySummary {
p50: 0.0,
p95: 0.0,
p99: 0.0,
max: 0.0,
mean: 0.0,
};
}
let mut sorted: Vec<f64> = sample.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let max = sorted[sorted.len() - 1];
let mean: f64 = sample.iter().copied().sum::<f64>() / sample.len() as f64;
LatencySummary {
p50: percentile(&sorted, 0.50),
p95: percentile(&sorted, 0.95),
p99: percentile(&sorted, 0.99),
max,
mean,
}
}
fn percentile(sorted: &[f64], quantile: f64) -> f64 {
if sorted.is_empty() {
return 0.0;
}
if sorted.len() == 1 {
return sorted[0];
}
let rank = quantile * (sorted.len() - 1) as f64;
let lower = rank.floor() as usize;
let upper = lower + 1;
if upper >= sorted.len() {
return sorted[lower];
}
let fraction = rank - lower as f64;
sorted[lower] + (sorted[upper] - sorted[lower]) * fraction
}
fn round_to(value: f64, digits: u32) -> f64 {
let shift = 10f64.powi(digits as i32);
(value * shift).round() / shift
}
/// Pairs each parsed item handle with its parsed MxValue (proto form) so a
/// single helper can build the four bulk-write families without each branch
/// repeating the length check and per-value parsing.
fn build_write_bulk_entries(
item_handles: &[i32],
value_type: CliValueType,
values: &[String],
) -> Result<
Vec<(
i32,
zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxValue,
)>,
Error,
> {
if item_handles.len() != values.len() {
return Err(Error::InvalidArgument {
name: "values".to_owned(),
detail: format!(
"item-handles count ({}) does not match values count ({})",
item_handles.len(),
values.len()
),
});
}
item_handles
.iter()
.zip(values.iter())
.map(|(handle, value)| {
parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto()))
})
.collect()
}
fn print_deploy_event(event: &DeployEvent, use_json: bool) {
if use_json {
println!(
+173 -4
View File
@@ -14,10 +14,13 @@ use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
use crate::generated::mxaccess_gateway::v1::mx_command_reply;
use crate::generated::mxaccess_gateway::v1::{
AddItem2Command, AddItemBulkCommand, AddItemCommand, AdviseCommand, AdviseItemBulkCommand,
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply, MxCommandRequest,
MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand, RemoveItemBulkCommand,
RemoveItemCommand, StreamEventsRequest, SubscribeBulkCommand, SubscribeResult, UnAdviseCommand,
UnAdviseItemBulkCommand, UnsubscribeBulkCommand, Write2Command, WriteCommand,
BulkReadResult, BulkWriteResult, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply,
MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, ReadBulkCommand,
RegisterCommand, RemoveItemBulkCommand, RemoveItemCommand, StreamEventsRequest,
SubscribeBulkCommand, SubscribeResult, UnAdviseCommand, UnAdviseItemBulkCommand,
UnsubscribeBulkCommand, Write2BulkCommand, Write2BulkEntry, Write2Command, WriteBulkCommand,
WriteBulkEntry, WriteCommand, WriteSecured2BulkCommand, WriteSecured2BulkEntry,
WriteSecuredBulkCommand, WriteSecuredBulkEntry,
};
use crate::value::MxValue;
@@ -350,6 +353,145 @@ impl Session {
Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk))
}
/// Bulk `Read` — snapshot the current value for each requested tag.
///
/// MXAccess COM has no synchronous `Read`; the worker satisfies this by
/// returning the most recent cached `OnDataChange` value when the tag is
/// already advised (`was_cached = true`), or by taking a full AddItem +
/// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle otherwise.
/// `timeout_ms == 0` lets the worker pick its default (1000 ms).
/// Per-tag failures appear as `BulkReadResult` entries with
/// `was_successful = false`; the call never errors on per-tag failure.
///
/// # Errors
///
/// Same conditions as [`Session::add_item_bulk`].
pub async fn read_bulk(
&self,
server_handle: i32,
tag_addresses: Vec<String>,
timeout_ms: u32,
) -> Result<Vec<BulkReadResult>, Error> {
ensure_bulk_size("tag_addresses", tag_addresses.len())?;
let reply = self
.invoke(
MxCommandKind::ReadBulk,
Payload::ReadBulk(ReadBulkCommand {
server_handle,
tag_addresses,
timeout_ms,
}),
)
.await?;
Ok(match reply.payload {
Some(mx_command_reply::Payload::ReadBulk(reply)) => reply.results,
_ => Vec::new(),
})
}
/// Bulk `Write` (sequential MXAccess Write per entry, on the worker's STA).
///
/// Per-entry MXAccess failures are reported as `BulkWriteResult` entries
/// with `was_successful = false`; the call never errors on per-entry
/// failure. Protocol-level failures still surface as [`Error::Command`].
///
/// # Errors
///
/// Same conditions as [`Session::add_item_bulk`], plus the usual
/// transport/status errors.
pub async fn write_bulk(
&self,
server_handle: i32,
entries: Vec<WriteBulkEntry>,
) -> Result<Vec<BulkWriteResult>, Error> {
ensure_bulk_size("entries", entries.len())?;
let reply = self
.invoke(
MxCommandKind::WriteBulk,
Payload::WriteBulk(WriteBulkCommand {
server_handle,
entries,
}),
)
.await?;
Ok(bulk_write_results(reply, BulkWriteReplyKind::Write))
}
/// Bulk `Write2` (timestamped) — see [`Session::write_bulk`].
///
/// # Errors
///
/// Same conditions as [`Session::write_bulk`].
pub async fn write2_bulk(
&self,
server_handle: i32,
entries: Vec<Write2BulkEntry>,
) -> Result<Vec<BulkWriteResult>, Error> {
ensure_bulk_size("entries", entries.len())?;
let reply = self
.invoke(
MxCommandKind::Write2Bulk,
Payload::Write2Bulk(Write2BulkCommand {
server_handle,
entries,
}),
)
.await?;
Ok(bulk_write_results(reply, BulkWriteReplyKind::Write2))
}
/// Bulk `WriteSecured` — credential-sensitive values follow the same
/// redaction contract as the single-item `write_secured` path.
///
/// # Errors
///
/// Same conditions as [`Session::write_bulk`].
pub async fn write_secured_bulk(
&self,
server_handle: i32,
entries: Vec<WriteSecuredBulkEntry>,
) -> Result<Vec<BulkWriteResult>, Error> {
ensure_bulk_size("entries", entries.len())?;
let reply = self
.invoke(
MxCommandKind::WriteSecuredBulk,
Payload::WriteSecuredBulk(WriteSecuredBulkCommand {
server_handle,
entries,
}),
)
.await?;
Ok(bulk_write_results(reply, BulkWriteReplyKind::WriteSecured))
}
/// Bulk `WriteSecured2` (timestamped) — see [`Session::write_secured_bulk`].
///
/// # Errors
///
/// Same conditions as [`Session::write_bulk`].
pub async fn write_secured2_bulk(
&self,
server_handle: i32,
entries: Vec<WriteSecured2BulkEntry>,
) -> Result<Vec<BulkWriteResult>, Error> {
ensure_bulk_size("entries", entries.len())?;
let reply = self
.invoke(
MxCommandKind::WriteSecured2Bulk,
Payload::WriteSecured2Bulk(WriteSecured2BulkCommand {
server_handle,
entries,
}),
)
.await?;
Ok(bulk_write_results(reply, BulkWriteReplyKind::WriteSecured2))
}
/// Run MXAccess `Write` (single-value, no caller-supplied timestamp).
///
/// # Errors
@@ -554,6 +696,33 @@ fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec<SubscribeResu
}
}
enum BulkWriteReplyKind {
Write,
Write2,
WriteSecured,
WriteSecured2,
}
fn bulk_write_results(reply: MxCommandReply, kind: BulkWriteReplyKind) -> Vec<BulkWriteResult> {
match (reply.payload, kind) {
(Some(mx_command_reply::Payload::WriteBulk(reply)), BulkWriteReplyKind::Write) => {
reply.results
}
(Some(mx_command_reply::Payload::Write2Bulk(reply)), BulkWriteReplyKind::Write2) => {
reply.results
}
(
Some(mx_command_reply::Payload::WriteSecuredBulk(reply)),
BulkWriteReplyKind::WriteSecured,
) => reply.results,
(
Some(mx_command_reply::Payload::WriteSecured2Bulk(reply)),
BulkWriteReplyKind::WriteSecured2,
) => reply.results,
_ => Vec::new(),
}
}
fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> {
match value.kind.as_ref()? {
crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value),