.NET client: port bulk read/write SDK methods + CLI subcommands
Adds the value-bulk SDK surface and CLI subcommands that lived on the
divergent branch (commit f220908) but were never merged into main.
HEAD's MxGatewaySession only had the subscribe-style bulks (AddItem /
Advise / Remove / UnAdvise / Subscribe / Unsubscribe). The proto
contract already defined ReadBulkCommand / WriteBulkCommand /
Write2BulkCommand / WriteSecuredBulkCommand / WriteSecured2BulkCommand
/ BulkReadReply / BulkWriteReply, so this is purely a client-side
addition.
SDK (MxGatewaySession.cs):
- WriteBulkAsync(serverHandle, IReadOnlyList<WriteBulkEntry>, ct)
- Write2BulkAsync(serverHandle, IReadOnlyList<Write2BulkEntry>, ct)
- WriteSecuredBulkAsync(serverHandle, IReadOnlyList<WriteSecuredBulkEntry>, ct)
- WriteSecured2BulkAsync(serverHandle, IReadOnlyList<WriteSecured2BulkEntry>, ct)
- ReadBulkAsync(serverHandle, IReadOnlyList<string> tagAddresses, TimeSpan timeout, ct)
Per-entry secured user ids live on each WriteSecured(2)BulkEntry — they
are NOT lifted to ctor args because the proto field shape allows distinct
ids per row.
CLI (MxGatewayClientCli.cs):
- read-bulk / write-bulk / write2-bulk / write-secured-bulk / write-secured2-bulk
routed through the existing dispatch table, with --type, --values,
--item-handles, --timeout-ms, --current-user-id, --verifier-user-id,
--timestamp flags matching the cross-language CLI surface.
- bench-read-bulk benchmark harness: warmup + steady-state ReadBulk loop
with p50/p95/p99/max/mean latency, emitting the shared JSON schema so
scripts/bench-read-bulk.ps1 collates the .NET line alongside the four
other clients.
The new subcommands flow through the existing batch dispatcher without
further changes.
Verification: dotnet build clean (0 warnings / 0 errors);
dotnet test 59/59 passing. Manual smoke against the live gateway
on localhost:5120: read-bulk returned 2 BulkReadResult entries with
wasSuccessful=true, wasCached=true; write-bulk on int32 returned
wasSuccessful=true; close-session returned SESSION_STATE_CLOSED.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -114,6 +114,18 @@ public static class MxGatewayClientCli
|
||||
.ConfigureAwait(false),
|
||||
"unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
.ConfigureAwait(false),
|
||||
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
||||
@@ -458,6 +470,451 @@ public static class MxGatewayClientCli
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static Task<int> ReadBulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ReadBulkCommand command = new()
|
||||
{
|
||||
ServerHandle = arguments.GetInt32("server-handle"),
|
||||
TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0),
|
||||
};
|
||||
command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items")));
|
||||
|
||||
return InvokeAndWriteAsync(
|
||||
arguments,
|
||||
client,
|
||||
output,
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.ReadBulk,
|
||||
ReadBulk = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static Task<int> WriteBulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
WriteBulkCommand command = new()
|
||||
{
|
||||
ServerHandle = arguments.GetInt32("server-handle"),
|
||||
};
|
||||
|
||||
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||
int userId = arguments.GetInt32("user-id", 0);
|
||||
EnsureSameLength(handles.Count, values.Count);
|
||||
|
||||
for (int i = 0; i < handles.Count; i++)
|
||||
{
|
||||
command.Entries.Add(new WriteBulkEntry
|
||||
{
|
||||
ItemHandle = handles[i],
|
||||
Value = values[i],
|
||||
UserId = userId,
|
||||
});
|
||||
}
|
||||
|
||||
return InvokeAndWriteAsync(
|
||||
arguments,
|
||||
client,
|
||||
output,
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteBulk,
|
||||
WriteBulk = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static Task<int> Write2BulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
Write2BulkCommand command = new()
|
||||
{
|
||||
ServerHandle = arguments.GetInt32("server-handle"),
|
||||
};
|
||||
|
||||
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||
MxValue timestampValue = ParseTimestampValue(arguments);
|
||||
int userId = arguments.GetInt32("user-id", 0);
|
||||
EnsureSameLength(handles.Count, values.Count);
|
||||
|
||||
for (int i = 0; i < handles.Count; i++)
|
||||
{
|
||||
command.Entries.Add(new Write2BulkEntry
|
||||
{
|
||||
ItemHandle = handles[i],
|
||||
Value = values[i],
|
||||
TimestampValue = timestampValue,
|
||||
UserId = userId,
|
||||
});
|
||||
}
|
||||
|
||||
return InvokeAndWriteAsync(
|
||||
arguments,
|
||||
client,
|
||||
output,
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write2Bulk,
|
||||
Write2Bulk = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static Task<int> WriteSecuredBulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
WriteSecuredBulkCommand command = new()
|
||||
{
|
||||
ServerHandle = arguments.GetInt32("server-handle"),
|
||||
};
|
||||
|
||||
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||
int currentUserId = arguments.GetInt32("current-user-id");
|
||||
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
||||
EnsureSameLength(handles.Count, values.Count);
|
||||
|
||||
for (int i = 0; i < handles.Count; i++)
|
||||
{
|
||||
command.Entries.Add(new WriteSecuredBulkEntry
|
||||
{
|
||||
ItemHandle = handles[i],
|
||||
Value = values[i],
|
||||
CurrentUserId = currentUserId,
|
||||
VerifierUserId = verifierUserId,
|
||||
});
|
||||
}
|
||||
|
||||
return InvokeAndWriteAsync(
|
||||
arguments,
|
||||
client,
|
||||
output,
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteSecuredBulk,
|
||||
WriteSecuredBulk = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private static Task<int> WriteSecured2BulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
WriteSecured2BulkCommand command = new()
|
||||
{
|
||||
ServerHandle = arguments.GetInt32("server-handle"),
|
||||
};
|
||||
|
||||
IReadOnlyList<int> handles = ParseInt32List(arguments.GetRequired("item-handles"));
|
||||
IReadOnlyList<MxValue> values = ParseValuesList(arguments);
|
||||
MxValue timestampValue = ParseTimestampValue(arguments);
|
||||
int currentUserId = arguments.GetInt32("current-user-id");
|
||||
int verifierUserId = arguments.GetInt32("verifier-user-id", 0);
|
||||
EnsureSameLength(handles.Count, values.Count);
|
||||
|
||||
for (int i = 0; i < handles.Count; i++)
|
||||
{
|
||||
command.Entries.Add(new WriteSecured2BulkEntry
|
||||
{
|
||||
ItemHandle = handles[i],
|
||||
Value = values[i],
|
||||
TimestampValue = timestampValue,
|
||||
CurrentUserId = currentUserId,
|
||||
VerifierUserId = verifierUserId,
|
||||
});
|
||||
}
|
||||
|
||||
return InvokeAndWriteAsync(
|
||||
arguments,
|
||||
client,
|
||||
output,
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteSecured2Bulk,
|
||||
WriteSecured2Bulk = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses the bulk-write CLI's <c>--values</c> list. All entries share
|
||||
/// the single <c>--type</c> argument; the comma-separated values are
|
||||
/// each parsed via <see cref="ParseValue(string, string)"/> on a per-entry basis.
|
||||
/// This keeps the CLI simple for e2e use (one type, N values) — callers
|
||||
/// that need heterogeneous types per entry should drive the library
|
||||
/// directly.
|
||||
/// </summary>
|
||||
private static IReadOnlyList<MxValue> ParseValuesList(CliArguments arguments)
|
||||
{
|
||||
string type = arguments.GetRequired("type");
|
||||
string[] values = ParseStringList(arguments.GetRequired("values")).ToArray();
|
||||
MxValue[] result = new MxValue[values.Length];
|
||||
for (int i = 0; i < values.Length; i++)
|
||||
{
|
||||
result[i] = ParseValue(type, values[i]);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void EnsureSameLength(int handles, int values)
|
||||
{
|
||||
if (handles != values)
|
||||
{
|
||||
throw new ArgumentException(
|
||||
$"Bulk write requires the same number of --item-handles ({handles}) and --values ({values}).");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cross-language stress benchmark for ReadBulk. Opens its own session,
|
||||
/// subscribes to N tags so the worker's MxAccessValueCache populates from
|
||||
/// real OnDataChange events, then hammers ReadBulk in a tight in-process
|
||||
/// loop with per-call Stopwatch timing. Emits a single JSON object on
|
||||
/// stdout that the scripts/bench-read-bulk.ps1 driver collates across
|
||||
/// all five language clients.
|
||||
/// </summary>
|
||||
private static async Task<int> BenchReadBulkAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
TextWriter output,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
int durationSeconds = arguments.GetInt32("duration-seconds", 30);
|
||||
int warmupSeconds = arguments.GetInt32("warmup-seconds", 3);
|
||||
int bulkSize = arguments.GetInt32("bulk-size", 6);
|
||||
int tagStart = arguments.GetInt32("tag-start", 1);
|
||||
string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_";
|
||||
string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt";
|
||||
uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500);
|
||||
string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench";
|
||||
|
||||
string[] tags = new string[bulkSize];
|
||||
for (int i = 0; i < bulkSize; i++)
|
||||
{
|
||||
// TestMachine_NNN.<attribute>, three-digit machine numbers matching
|
||||
// the existing e2e tag-discovery convention.
|
||||
tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}";
|
||||
}
|
||||
|
||||
// Open + register + subscribe-bulk so the cache populates before the
|
||||
// measurement window opens.
|
||||
OpenSessionReply openReply = await client.OpenSessionAsync(
|
||||
new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() },
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
string sessionId = openReply.SessionId;
|
||||
|
||||
try
|
||||
{
|
||||
MxCommandReply registerReply = await InvokeAndEnsureAsync(
|
||||
client,
|
||||
CreateCommandRequest(sessionId, new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand { ClientName = clientName },
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value;
|
||||
|
||||
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
|
||||
subscribe.TagAddresses.Add(tags);
|
||||
MxCommandReply subscribeReply = await InvokeAndEnsureAsync(
|
||||
client,
|
||||
CreateCommandRequest(sessionId, new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeBulk,
|
||||
SubscribeBulk = subscribe,
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
int[] itemHandles = subscribeReply.SubscribeBulk?.Results
|
||||
.Where(r => r.WasSuccessful)
|
||||
.Select(r => r.ItemHandle)
|
||||
.ToArray() ?? [];
|
||||
|
||||
// Warm-up: drive the same call shape so the JIT / connection
|
||||
// pipelines settle before the measurement window opens.
|
||||
DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds);
|
||||
ReadBulkCommand readBulkCommand = new()
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
TimeoutMs = timeoutMs,
|
||||
};
|
||||
readBulkCommand.TagAddresses.Add(tags);
|
||||
MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand };
|
||||
|
||||
while (DateTime.UtcNow < warmupDeadline)
|
||||
{
|
||||
_ = await client.InvokeAsync(
|
||||
CreateCommandRequest(sessionId, readBulkMxCommand),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Steady state — capture per-call wall latency with a high-res
|
||||
// Stopwatch so the resolution is sub-millisecond on modern Windows.
|
||||
List<double> latencyMillis = new(capacity: 65536);
|
||||
long totalReadResults = 0;
|
||||
long cachedReadResults = 0;
|
||||
int successfulCalls = 0;
|
||||
int failedCalls = 0;
|
||||
DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds);
|
||||
DateTime steadyStart = DateTime.UtcNow;
|
||||
|
||||
while (DateTime.UtcNow < steadyDeadline)
|
||||
{
|
||||
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
MxCommandReply reply;
|
||||
try
|
||||
{
|
||||
reply = await client.InvokeAsync(
|
||||
CreateCommandRequest(sessionId, readBulkMxCommand),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
sw.Stop();
|
||||
}
|
||||
catch
|
||||
{
|
||||
sw.Stop();
|
||||
failedCalls++;
|
||||
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
||||
continue;
|
||||
}
|
||||
|
||||
latencyMillis.Add(sw.Elapsed.TotalMilliseconds);
|
||||
if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
|
||||
{
|
||||
failedCalls++;
|
||||
continue;
|
||||
}
|
||||
|
||||
successfulCalls++;
|
||||
if (reply.ReadBulk is not null)
|
||||
{
|
||||
foreach (BulkReadResult r in reply.ReadBulk.Results)
|
||||
{
|
||||
totalReadResults++;
|
||||
if (r.WasCached)
|
||||
{
|
||||
cachedReadResults++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds;
|
||||
|
||||
if (itemHandles.Length > 0)
|
||||
{
|
||||
UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle };
|
||||
unsubscribe.ItemHandles.Add(itemHandles);
|
||||
_ = await client.InvokeAsync(
|
||||
CreateCommandRequest(sessionId, new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.UnsubscribeBulk,
|
||||
UnsubscribeBulk = unsubscribe,
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
int totalCalls = successfulCalls + failedCalls;
|
||||
double callsPerSecond = steadyElapsedSeconds > 0
|
||||
? totalCalls / steadyElapsedSeconds
|
||||
: 0;
|
||||
|
||||
object stats = new
|
||||
{
|
||||
language = "dotnet",
|
||||
command = "bench-read-bulk",
|
||||
endpoint = arguments.GetOptional("endpoint") ?? "(default)",
|
||||
clientName,
|
||||
bulkSize,
|
||||
durationSeconds,
|
||||
warmupSeconds,
|
||||
durationMs = (long)(steadyElapsedSeconds * 1000),
|
||||
tags,
|
||||
totalCalls,
|
||||
successfulCalls,
|
||||
failedCalls,
|
||||
totalReadResults,
|
||||
cachedReadResults,
|
||||
callsPerSecond = Math.Round(callsPerSecond, 2),
|
||||
latencyMs = new
|
||||
{
|
||||
p50 = Percentile(latencyMillis, 0.50),
|
||||
p95 = Percentile(latencyMillis, 0.95),
|
||||
p99 = Percentile(latencyMillis, 0.99),
|
||||
max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0,
|
||||
mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0,
|
||||
},
|
||||
};
|
||||
output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions));
|
||||
return 0;
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
await client.CloseSessionAsync(
|
||||
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() },
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Closing the session is best-effort — never let it mask a real bench error.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Computes the requested percentile from an unsorted latency sample using
|
||||
/// nearest-rank with linear interpolation. Rounds to 3 decimal places to
|
||||
/// match the JSON schema the PS driver collates.
|
||||
/// </summary>
|
||||
private static double Percentile(IReadOnlyList<double> sample, double quantile)
|
||||
{
|
||||
if (sample.Count == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
double[] sorted = sample.ToArray();
|
||||
Array.Sort(sorted);
|
||||
if (sorted.Length == 1)
|
||||
{
|
||||
return Math.Round(sorted[0], 3);
|
||||
}
|
||||
|
||||
double rank = quantile * (sorted.Length - 1);
|
||||
int lower = (int)Math.Floor(rank);
|
||||
int upper = (int)Math.Ceiling(rank);
|
||||
double fraction = rank - lower;
|
||||
double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction;
|
||||
return Math.Round(value, 3);
|
||||
}
|
||||
|
||||
private static Task<int> WriteAsync(
|
||||
CliArguments arguments,
|
||||
IMxGatewayCliClient client,
|
||||
@@ -844,11 +1301,15 @@ public static class MxGatewayClientCli
|
||||
|
||||
private static MxValue ParseValue(CliArguments arguments)
|
||||
{
|
||||
string type = arguments.GetRequired("type").ToLowerInvariant();
|
||||
string value = arguments.GetRequired("value");
|
||||
return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value"));
|
||||
}
|
||||
|
||||
private static MxValue ParseValue(string type, string value)
|
||||
{
|
||||
string normalisedType = type.ToLowerInvariant();
|
||||
string[] values = value.Split(',', StringSplitOptions.TrimEntries);
|
||||
|
||||
return type switch
|
||||
return normalisedType switch
|
||||
{
|
||||
"bool" or "boolean" => bool.Parse(value).ToMxValue(),
|
||||
"bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(),
|
||||
@@ -867,7 +1328,7 @@ public static class MxGatewayClientCli
|
||||
.Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal))
|
||||
.ToArray()
|
||||
.ToMxValue(),
|
||||
_ => throw new ArgumentException($"Unsupported MX value type '{type}'."),
|
||||
_ => throw new ArgumentException($"Unsupported MX value type '{normalisedType}'."),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1078,6 +1539,12 @@ public static class MxGatewayClientCli
|
||||
or "advise"
|
||||
or "subscribe-bulk"
|
||||
or "unsubscribe-bulk"
|
||||
or "read-bulk"
|
||||
or "write-bulk"
|
||||
or "write2-bulk"
|
||||
or "write-secured-bulk"
|
||||
or "write-secured2-bulk"
|
||||
or "bench-read-bulk"
|
||||
or "stream-events"
|
||||
or "write"
|
||||
or "write2"
|
||||
@@ -1131,6 +1598,12 @@ public static class MxGatewayClientCli
|
||||
writer.WriteLine("mxgw-dotnet advise --session-id <id> --server-handle <n> --item-handle <n> [--json]");
|
||||
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
|
||||
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
|
||||
writer.WriteLine("mxgw-dotnet read-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--timeout-ms <n>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--user-id <n>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> [--timestamp <iso>] [--user-id <n>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--timestamp <iso>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>] [--tag-start <n>] [--tag-prefix <s>] [--tag-attribute <s>] [--timeout-ms <n>] [--client-name <name>]");
|
||||
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
|
||||
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
|
||||
|
||||
@@ -502,6 +502,171 @@ public sealed class MxGatewaySession : IAsyncDisposable
|
||||
return reply.UnsubscribeBulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk Write — sequential MXAccess Write per entry on the worker's STA.
|
||||
/// Per-item failures appear as <see cref="BulkWriteResult"/> entries with
|
||||
/// <c>WasSuccessful = false</c>; the call never throws on per-item errors.
|
||||
/// Protocol-level failures still throw via EnsureProtocolSuccess.
|
||||
/// </summary>
|
||||
/// <param name="serverHandle">The ServerHandle from register.</param>
|
||||
/// <param name="entries">Per-item write entries; each carries the item handle, value, and user id.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
|
||||
public async Task<IReadOnlyList<BulkWriteResult>> WriteBulkAsync(
|
||||
int serverHandle,
|
||||
IReadOnlyList<WriteBulkEntry> entries,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
|
||||
WriteBulkCommand command = new() { ServerHandle = serverHandle };
|
||||
command.Entries.Add(entries);
|
||||
|
||||
MxCommandReply reply = await InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteBulk,
|
||||
WriteBulk = command,
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
return reply.WriteBulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk Write2 — sequential MXAccess Write2 (timestamped) per entry.
|
||||
/// Per-item failures appear as <see cref="BulkWriteResult"/> entries with
|
||||
/// <c>WasSuccessful = false</c>; the call never throws on per-item errors.
|
||||
/// </summary>
|
||||
/// <param name="serverHandle">The ServerHandle from register.</param>
|
||||
/// <param name="entries">Per-item write entries; each carries the item handle, value, timestamp, and user id.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
|
||||
public async Task<IReadOnlyList<BulkWriteResult>> Write2BulkAsync(
|
||||
int serverHandle,
|
||||
IReadOnlyList<Write2BulkEntry> entries,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
|
||||
Write2BulkCommand command = new() { ServerHandle = serverHandle };
|
||||
command.Entries.Add(entries);
|
||||
|
||||
MxCommandReply reply = await InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write2Bulk,
|
||||
Write2Bulk = command,
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
return reply.Write2Bulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk WriteSecured — sequential MXAccess WriteSecured per entry.
|
||||
/// Credential-sensitive values must never reach logs; the client mirrors
|
||||
/// the single-item WriteSecured redaction contract.
|
||||
/// </summary>
|
||||
/// <param name="serverHandle">The ServerHandle from register.</param>
|
||||
/// <param name="entries">Per-item write entries; each carries the item handle, value, current user id, and verifier user id.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
|
||||
public async Task<IReadOnlyList<BulkWriteResult>> WriteSecuredBulkAsync(
|
||||
int serverHandle,
|
||||
IReadOnlyList<WriteSecuredBulkEntry> entries,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
|
||||
WriteSecuredBulkCommand command = new() { ServerHandle = serverHandle };
|
||||
command.Entries.Add(entries);
|
||||
|
||||
MxCommandReply reply = await InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteSecuredBulk,
|
||||
WriteSecuredBulk = command,
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
return reply.WriteSecuredBulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk WriteSecured2 — sequential MXAccess WriteSecured2 (timestamped) per entry.
|
||||
/// Same redaction rules as <see cref="WriteSecuredBulkAsync"/>.
|
||||
/// </summary>
|
||||
/// <param name="serverHandle">The ServerHandle from register.</param>
|
||||
/// <param name="entries">Per-item write entries; each carries the item handle, value, timestamp, current user id, and verifier user id.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>One <see cref="BulkWriteResult"/> per requested entry, in request order.</returns>
|
||||
public async Task<IReadOnlyList<BulkWriteResult>> WriteSecured2BulkAsync(
|
||||
int serverHandle,
|
||||
IReadOnlyList<WriteSecured2BulkEntry> entries,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
|
||||
WriteSecured2BulkCommand command = new() { ServerHandle = serverHandle };
|
||||
command.Entries.Add(entries);
|
||||
|
||||
MxCommandReply reply = await InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.WriteSecured2Bulk,
|
||||
WriteSecured2Bulk = command,
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
return reply.WriteSecured2Bulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk Read — snapshot the current value for each requested tag.
|
||||
/// Returns the cached OnDataChange value when the tag is already advised
|
||||
/// (<c>WasCached = true</c>), otherwise the worker takes the full AddItem +
|
||||
/// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle. Per-tag
|
||||
/// failures (timeout, invalid tag) appear as <see cref="BulkReadResult"/>
|
||||
/// entries with <c>WasSuccessful = false</c>; the call never throws on
|
||||
/// per-tag errors.
|
||||
/// </summary>
|
||||
/// <param name="serverHandle">The ServerHandle from register.</param>
|
||||
/// <param name="tagAddresses">Tag addresses to read (one per result).</param>
|
||||
/// <param name="timeout">Per-call timeout for the snapshot lifecycle path; <see cref="TimeSpan.Zero"/> uses the gateway default.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>One <see cref="BulkReadResult"/> per requested tag, in request order.</returns>
|
||||
public async Task<IReadOnlyList<BulkReadResult>> ReadBulkAsync(
|
||||
int serverHandle,
|
||||
IReadOnlyList<string> tagAddresses,
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(tagAddresses);
|
||||
|
||||
ReadBulkCommand command = new()
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue),
|
||||
};
|
||||
command.TagAddresses.Add(tagAddresses);
|
||||
|
||||
MxCommandReply reply = await InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.ReadBulk,
|
||||
ReadBulk = command,
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
return reply.ReadBulk?.Results.ToArray() ?? [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Writes a value to an item on the MXAccess server.
|
||||
/// </summary>
|
||||
|
||||
Reference in New Issue
Block a user