diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs index a9d794f..44ec41b 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -114,6 +114,18 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "unsubscribe-bulk" => await UnsubscribeBulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "read-bulk" => await ReadBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-bulk" => await WriteBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write2-bulk" => await Write2BulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-secured-bulk" => await WriteSecuredBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) @@ -458,6 +470,451 @@ public static class MxGatewayClientCli cancellationToken); } + private static Task ReadBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + ReadBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + TimeoutMs = (uint)arguments.GetInt32("timeout-ms", 0), + }; + command.TagAddresses.Add(ParseStringList(arguments.GetRequired("items"))); + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.ReadBulk, + ReadBulk = command, + }, + cancellationToken); + } + + private static Task WriteBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + int userId = arguments.GetInt32("user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteBulkEntry + { + ItemHandle = handles[i], + Value = values[i], + UserId = userId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteBulk, + WriteBulk = command, + }, + cancellationToken); + } + + private static Task Write2BulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + Write2BulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + MxValue timestampValue = ParseTimestampValue(arguments); + int userId = arguments.GetInt32("user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new Write2BulkEntry + { + ItemHandle = handles[i], + Value = values[i], + TimestampValue = timestampValue, + UserId = userId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.Write2Bulk, + Write2Bulk = command, + }, + cancellationToken); + } + + private static Task WriteSecuredBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteSecuredBulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + int currentUserId = arguments.GetInt32("current-user-id"); + int verifierUserId = arguments.GetInt32("verifier-user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteSecuredBulkEntry + { + ItemHandle = handles[i], + Value = values[i], + CurrentUserId = currentUserId, + VerifierUserId = verifierUserId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteSecuredBulk, + WriteSecuredBulk = command, + }, + cancellationToken); + } + + private static Task WriteSecured2BulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + WriteSecured2BulkCommand command = new() + { + ServerHandle = arguments.GetInt32("server-handle"), + }; + + IReadOnlyList handles = ParseInt32List(arguments.GetRequired("item-handles")); + IReadOnlyList values = ParseValuesList(arguments); + MxValue timestampValue = ParseTimestampValue(arguments); + int currentUserId = arguments.GetInt32("current-user-id"); + int verifierUserId = arguments.GetInt32("verifier-user-id", 0); + EnsureSameLength(handles.Count, values.Count); + + for (int i = 0; i < handles.Count; i++) + { + command.Entries.Add(new WriteSecured2BulkEntry + { + ItemHandle = handles[i], + Value = values[i], + TimestampValue = timestampValue, + CurrentUserId = currentUserId, + VerifierUserId = verifierUserId, + }); + } + + return InvokeAndWriteAsync( + arguments, + client, + output, + new MxCommand + { + Kind = MxCommandKind.WriteSecured2Bulk, + WriteSecured2Bulk = command, + }, + cancellationToken); + } + + /// + /// Parses the bulk-write CLI's --values list. All entries share + /// the single --type argument; the comma-separated values are + /// each parsed via on a per-entry basis. + /// This keeps the CLI simple for e2e use (one type, N values) — callers + /// that need heterogeneous types per entry should drive the library + /// directly. + /// + private static IReadOnlyList ParseValuesList(CliArguments arguments) + { + string type = arguments.GetRequired("type"); + string[] values = ParseStringList(arguments.GetRequired("values")).ToArray(); + MxValue[] result = new MxValue[values.Length]; + for (int i = 0; i < values.Length; i++) + { + result[i] = ParseValue(type, values[i]); + } + return result; + } + + private static void EnsureSameLength(int handles, int values) + { + if (handles != values) + { + throw new ArgumentException( + $"Bulk write requires the same number of --item-handles ({handles}) and --values ({values})."); + } + } + + /// + /// Cross-language stress benchmark for ReadBulk. Opens its own session, + /// subscribes to N tags so the worker's MxAccessValueCache populates from + /// real OnDataChange events, then hammers ReadBulk in a tight in-process + /// loop with per-call Stopwatch timing. Emits a single JSON object on + /// stdout that the scripts/bench-read-bulk.ps1 driver collates across + /// all five language clients. + /// + private static async Task BenchReadBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + int durationSeconds = arguments.GetInt32("duration-seconds", 30); + int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); + int bulkSize = arguments.GetInt32("bulk-size", 6); + int tagStart = arguments.GetInt32("tag-start", 1); + string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; + string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; + uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500); + string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench"; + + string[] tags = new string[bulkSize]; + for (int i = 0; i < bulkSize; i++) + { + // TestMachine_NNN., three-digit machine numbers matching + // the existing e2e tag-discovery convention. + tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}"; + } + + // Open + register + subscribe-bulk so the cache populates before the + // measurement window opens. + OpenSessionReply openReply = await client.OpenSessionAsync( + new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + string sessionId = openReply.SessionId; + + try + { + MxCommandReply registerReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = clientName }, + }), + cancellationToken) + .ConfigureAwait(false); + int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value; + + SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; + subscribe.TagAddresses.Add(tags); + MxCommandReply subscribeReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.SubscribeBulk, + SubscribeBulk = subscribe, + }), + cancellationToken) + .ConfigureAwait(false); + int[] itemHandles = subscribeReply.SubscribeBulk?.Results + .Where(r => r.WasSuccessful) + .Select(r => r.ItemHandle) + .ToArray() ?? []; + + // Warm-up: drive the same call shape so the JIT / connection + // pipelines settle before the measurement window opens. + DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds); + ReadBulkCommand readBulkCommand = new() + { + ServerHandle = serverHandle, + TimeoutMs = timeoutMs, + }; + readBulkCommand.TagAddresses.Add(tags); + MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand }; + + while (DateTime.UtcNow < warmupDeadline) + { + _ = await client.InvokeAsync( + CreateCommandRequest(sessionId, readBulkMxCommand), + cancellationToken) + .ConfigureAwait(false); + } + + // Steady state — capture per-call wall latency with a high-res + // Stopwatch so the resolution is sub-millisecond on modern Windows. + List latencyMillis = new(capacity: 65536); + long totalReadResults = 0; + long cachedReadResults = 0; + int successfulCalls = 0; + int failedCalls = 0; + DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds); + DateTime steadyStart = DateTime.UtcNow; + + while (DateTime.UtcNow < steadyDeadline) + { + System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew(); + MxCommandReply reply; + try + { + reply = await client.InvokeAsync( + CreateCommandRequest(sessionId, readBulkMxCommand), + cancellationToken) + .ConfigureAwait(false); + sw.Stop(); + } + catch + { + sw.Stop(); + failedCalls++; + latencyMillis.Add(sw.Elapsed.TotalMilliseconds); + continue; + } + + latencyMillis.Add(sw.Elapsed.TotalMilliseconds); + if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok) + { + failedCalls++; + continue; + } + + successfulCalls++; + if (reply.ReadBulk is not null) + { + foreach (BulkReadResult r in reply.ReadBulk.Results) + { + totalReadResults++; + if (r.WasCached) + { + cachedReadResults++; + } + } + } + } + + double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds; + + if (itemHandles.Length > 0) + { + UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle }; + unsubscribe.ItemHandles.Add(itemHandles); + _ = await client.InvokeAsync( + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.UnsubscribeBulk, + UnsubscribeBulk = unsubscribe, + }), + cancellationToken) + .ConfigureAwait(false); + } + + int totalCalls = successfulCalls + failedCalls; + double callsPerSecond = steadyElapsedSeconds > 0 + ? totalCalls / steadyElapsedSeconds + : 0; + + object stats = new + { + language = "dotnet", + command = "bench-read-bulk", + endpoint = arguments.GetOptional("endpoint") ?? "(default)", + clientName, + bulkSize, + durationSeconds, + warmupSeconds, + durationMs = (long)(steadyElapsedSeconds * 1000), + tags, + totalCalls, + successfulCalls, + failedCalls, + totalReadResults, + cachedReadResults, + callsPerSecond = Math.Round(callsPerSecond, 2), + latencyMs = new + { + p50 = Percentile(latencyMillis, 0.50), + p95 = Percentile(latencyMillis, 0.95), + p99 = Percentile(latencyMillis, 0.99), + max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0, + mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0, + }, + }; + output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); + return 0; + } + finally + { + try + { + await client.CloseSessionAsync( + new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + } + catch + { + // Closing the session is best-effort — never let it mask a real bench error. + } + } + } + + /// + /// Computes the requested percentile from an unsorted latency sample using + /// nearest-rank with linear interpolation. Rounds to 3 decimal places to + /// match the JSON schema the PS driver collates. + /// + private static double Percentile(IReadOnlyList sample, double quantile) + { + if (sample.Count == 0) + { + return 0; + } + + double[] sorted = sample.ToArray(); + Array.Sort(sorted); + if (sorted.Length == 1) + { + return Math.Round(sorted[0], 3); + } + + double rank = quantile * (sorted.Length - 1); + int lower = (int)Math.Floor(rank); + int upper = (int)Math.Ceiling(rank); + double fraction = rank - lower; + double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction; + return Math.Round(value, 3); + } + private static Task WriteAsync( CliArguments arguments, IMxGatewayCliClient client, @@ -844,11 +1301,15 @@ public static class MxGatewayClientCli private static MxValue ParseValue(CliArguments arguments) { - string type = arguments.GetRequired("type").ToLowerInvariant(); - string value = arguments.GetRequired("value"); + return ParseValue(arguments.GetRequired("type"), arguments.GetRequired("value")); + } + + private static MxValue ParseValue(string type, string value) + { + string normalisedType = type.ToLowerInvariant(); string[] values = value.Split(',', StringSplitOptions.TrimEntries); - return type switch + return normalisedType switch { "bool" or "boolean" => bool.Parse(value).ToMxValue(), "bool-array" or "boolean-array" => values.Select(bool.Parse).ToArray().ToMxValue(), @@ -867,7 +1328,7 @@ public static class MxGatewayClientCli .Select(item => DateTimeOffset.Parse(item, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal)) .ToArray() .ToMxValue(), - _ => throw new ArgumentException($"Unsupported MX value type '{type}'."), + _ => throw new ArgumentException($"Unsupported MX value type '{normalisedType}'."), }; } @@ -1078,6 +1539,12 @@ public static class MxGatewayClientCli or "advise" or "subscribe-bulk" or "unsubscribe-bulk" + or "read-bulk" + or "write-bulk" + or "write2-bulk" + or "write-secured-bulk" + or "write-secured2-bulk" + or "bench-read-bulk" or "stream-events" or "write" or "write2" @@ -1131,6 +1598,12 @@ public static class MxGatewayClientCli writer.WriteLine("mxgw-dotnet advise --session-id --server-handle --item-handle [--json]"); writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id --server-handle --items [--json]"); writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id --server-handle --item-handles [--json]"); + writer.WriteLine("mxgw-dotnet read-bulk --session-id --server-handle --items [--timeout-ms ] [--json]"); + writer.WriteLine("mxgw-dotnet write-bulk --session-id --server-handle --item-handles --type --values [--user-id ] [--json]"); + writer.WriteLine("mxgw-dotnet write2-bulk --session-id --server-handle --item-handles --type --values [--timestamp ] [--user-id ] [--json]"); + writer.WriteLine("mxgw-dotnet write-secured-bulk --session-id --server-handle --item-handles --type --values --current-user-id [--verifier-user-id ] [--json]"); + writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id --server-handle --item-handles --type --values --current-user-id [--verifier-user-id ] [--timestamp ] [--json]"); + writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds ] [--warmup-seconds ] [--bulk-size ] [--tag-start ] [--tag-prefix ] [--tag-attribute ] [--timeout-ms ] [--client-name ]"); writer.WriteLine("mxgw-dotnet stream-events --session-id [--max-events ] [--json]"); writer.WriteLine("mxgw-dotnet write --session-id --server-handle --item-handle --type --value [--json]"); writer.WriteLine("mxgw-dotnet write2 --session-id --server-handle --item-handle --type --value [--timestamp ] [--json]"); diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewaySession.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewaySession.cs index 92c9f4b..158589c 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewaySession.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewaySession.cs @@ -502,6 +502,171 @@ public sealed class MxGatewaySession : IAsyncDisposable return reply.UnsubscribeBulk?.Results.ToArray() ?? []; } + /// + /// Bulk Write — sequential MXAccess Write per entry on the worker's STA. + /// Per-item failures appear as entries with + /// WasSuccessful = false; the call never throws on per-item errors. + /// Protocol-level failures still throw via EnsureProtocolSuccess. + /// + /// The ServerHandle from register. + /// Per-item write entries; each carries the item handle, value, and user id. + /// Cancellation token. + /// One per requested entry, in request order. + public async Task> WriteBulkAsync( + int serverHandle, + IReadOnlyList entries, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entries); + + WriteBulkCommand command = new() { ServerHandle = serverHandle }; + command.Entries.Add(entries); + + MxCommandReply reply = await InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.WriteBulk, + WriteBulk = command, + }, + cancellationToken) + .ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + return reply.WriteBulk?.Results.ToArray() ?? []; + } + + /// + /// Bulk Write2 — sequential MXAccess Write2 (timestamped) per entry. + /// Per-item failures appear as entries with + /// WasSuccessful = false; the call never throws on per-item errors. + /// + /// The ServerHandle from register. + /// Per-item write entries; each carries the item handle, value, timestamp, and user id. + /// Cancellation token. + /// One per requested entry, in request order. + public async Task> Write2BulkAsync( + int serverHandle, + IReadOnlyList entries, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entries); + + Write2BulkCommand command = new() { ServerHandle = serverHandle }; + command.Entries.Add(entries); + + MxCommandReply reply = await InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Write2Bulk, + Write2Bulk = command, + }, + cancellationToken) + .ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + return reply.Write2Bulk?.Results.ToArray() ?? []; + } + + /// + /// Bulk WriteSecured — sequential MXAccess WriteSecured per entry. + /// Credential-sensitive values must never reach logs; the client mirrors + /// the single-item WriteSecured redaction contract. + /// + /// The ServerHandle from register. + /// Per-item write entries; each carries the item handle, value, current user id, and verifier user id. + /// Cancellation token. + /// One per requested entry, in request order. + public async Task> WriteSecuredBulkAsync( + int serverHandle, + IReadOnlyList entries, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entries); + + WriteSecuredBulkCommand command = new() { ServerHandle = serverHandle }; + command.Entries.Add(entries); + + MxCommandReply reply = await InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.WriteSecuredBulk, + WriteSecuredBulk = command, + }, + cancellationToken) + .ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + return reply.WriteSecuredBulk?.Results.ToArray() ?? []; + } + + /// + /// Bulk WriteSecured2 — sequential MXAccess WriteSecured2 (timestamped) per entry. + /// Same redaction rules as . + /// + /// The ServerHandle from register. + /// Per-item write entries; each carries the item handle, value, timestamp, current user id, and verifier user id. + /// Cancellation token. + /// One per requested entry, in request order. + public async Task> WriteSecured2BulkAsync( + int serverHandle, + IReadOnlyList entries, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entries); + + WriteSecured2BulkCommand command = new() { ServerHandle = serverHandle }; + command.Entries.Add(entries); + + MxCommandReply reply = await InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.WriteSecured2Bulk, + WriteSecured2Bulk = command, + }, + cancellationToken) + .ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + return reply.WriteSecured2Bulk?.Results.ToArray() ?? []; + } + + /// + /// Bulk Read — snapshot the current value for each requested tag. + /// Returns the cached OnDataChange value when the tag is already advised + /// (WasCached = true), otherwise the worker takes the full AddItem + + /// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle. Per-tag + /// failures (timeout, invalid tag) appear as + /// entries with WasSuccessful = false; the call never throws on + /// per-tag errors. + /// + /// The ServerHandle from register. + /// Tag addresses to read (one per result). + /// Per-call timeout for the snapshot lifecycle path; uses the gateway default. + /// Cancellation token. + /// One per requested tag, in request order. + public async Task> ReadBulkAsync( + int serverHandle, + IReadOnlyList tagAddresses, + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(tagAddresses); + + ReadBulkCommand command = new() + { + ServerHandle = serverHandle, + TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue), + }; + command.TagAddresses.Add(tagAddresses); + + MxCommandReply reply = await InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.ReadBulk, + ReadBulk = command, + }, + cancellationToken) + .ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + return reply.ReadBulk?.Results.ToArray() ?? []; + } + /// /// Writes a value to an item on the MXAccess server. ///