Add bulk read/write CLI subcommands and e2e matrix coverage

The previous commit added the bulk read/write library surface in every
client; this commit makes that surface reachable from each client's CLI
and exercises it through scripts/run-client-e2e-tests.ps1.

Five new subcommands in every client CLI (.NET / Go / Rust / Python /
Java): read-bulk, write-bulk, write2-bulk, write-secured-bulk, and
write-secured2-bulk. Each follows the existing subscribe-bulk shape:

  - read-bulk takes --server-handle, --items <csv tag list>, and
    --timeout-ms (0 = worker default). JSON output carries the
    BulkReadResult fields, including was_cached so the e2e matrix can
    verify the cached-path semantics.
  - The four bulk-write families take --server-handle, --item-handles
    <csv>, --type, --values <csv>. write2-bulk and write-secured2-bulk
    add a single --timestamp applied to every entry; the secured
    variants take --current-user-id and --verifier-user-id. All four
    output BulkWriteResult JSON.

A new -SkipReadWriteBulk switch on the matrix script (default OFF)
controls two new e2e phases:

  - After the existing subscribe-bulk phase leaves tags advised, the
    script runs read-bulk against the same tag list and asserts most
    results return was_cached = true. This is the only e2e coverage of
    the cache-then-snapshot fork — the unit + gateway tests verify the
    semantics with a fake worker, but only the live cross-language
    matrix proves the cache populates from real OnDataChange events and
    survives the round-trip through every client''s JSON parser.
  - When -VerifyWrite is set, the write phase now also runs a single-
    entry write-bulk against the same writable item handle (using a
    distinct sentinel value) and asserts a per-entry success. Confirms
    the BulkWriteResult wire format end-to-end without complicating
    the OnWriteComplete echo assertion the single-item phase already
    verifies.

Dry-run validation passes for all five clients: each emits the correct
read-bulk and write-bulk CLI invocations with the right flags.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-20 04:06:14 -04:00
parent 5e375f6d3d
commit f220908f3f
6 changed files with 1411 additions and 4 deletions
+128 -1
View File
@@ -33,6 +33,12 @@ param(
[int]$BulkTagCount = 6,
[switch]$SkipStream,
[switch]$SkipBulk,
# Skip the bulk read+write coverage that runs alongside the existing
# subscribe-bulk phase. The read-bulk phase confirms cached-path
# semantics against tags left advised by subscribe-bulk (was_cached
# = true); the write-bulk phase runs when -VerifyWrite is set and
# exercises the BulkWriteResult shape against the writable tag.
[switch]$SkipReadWriteBulk,
# Write round-trip. Opt-in because it mutates live tag state: it writes a
# sentinel value to -WriteAttribute and asserts an OnWriteComplete event
# confirms the write reached the MXAccess provider.
@@ -400,7 +406,18 @@ function Get-BulkResults {
return @(Get-PropertyValue -Object $Json -Names @("results"))
}
$replyName = if ($Operation -eq "subscribe-bulk") { "subscribeBulk" } else { "unsubscribeBulk" }
# .NET emits the full MxCommandReply via protobuf JSON, with results
# nested under a per-command field name.
$replyName = switch ($Operation) {
"subscribe-bulk" { "subscribeBulk" }
"unsubscribe-bulk" { "unsubscribeBulk" }
"read-bulk" { "readBulk" }
"write-bulk" { "writeBulk" }
"write2-bulk" { "write2Bulk" }
"write-secured-bulk" { "writeSecuredBulk" }
"write-secured2-bulk" { "writeSecured2Bulk" }
default { $Operation }
}
$reply = Get-PropertyValue -Object $Json -Names @($replyName)
return @(Get-PropertyValue -Object $reply -Names @("results"))
}
@@ -478,6 +495,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
@@ -507,6 +531,13 @@ function Get-ClientCommand {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("-timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)",
"-item-handles", $Values.itemHandles, "-type", $Values.valueType, "-values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("-user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
} elseif ($Operation -eq "stream-events") {
@@ -535,6 +566,14 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
# Rust uses --value-type for the type flag.
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--value-type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
# Rust names the type flag --value-type, unlike the other CLIs.
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
@@ -565,6 +604,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
@@ -597,6 +643,13 @@ function Get-ClientCommand {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $cliArgs += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $cliArgs += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
@@ -649,6 +702,23 @@ function Get-DryRunReply {
})
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"read-bulk" {
$results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process {
[pscustomobject]@{
itemHandle = $index++
tagAddress = $_
wasSuccessful = $true
wasCached = $true
}
})
return [pscustomobject]@{ readBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"write-bulk" {
$results = @($Values.itemHandles -split "," | ForEach-Object {
[pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true }
})
return [pscustomobject]@{ writeBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"stream-events" {
# Synthesize an OnDataChange (carrying the written value) and an
# OnWriteComplete so the write round-trip assertion passes under
@@ -839,6 +909,28 @@ function Invoke-ClientFlow {
writeCompleteObserved = $true
echoObserved = ($null -ne $echoEvent)
}
# WriteBulk smoke: single-entry batch against the same writable
# tag. Exercises the BulkWriteResult wire format end-to-end
# without complicating the OnWriteComplete echo assertion that
# the single-item write phase already verified above. Pinned
# to a different sentinel value so a subsequent read-bulk
# against the same tag would see the bulk write's effect.
if (-not $SkipReadWriteBulk) {
$bulkSentinel = $sentinelValue + 1
$writeBulkJson = Invoke-ClientOperation -Client $Client -Operation "write-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandles = "$writeItemHandle"
valueType = $WriteType
values = "$bulkSentinel"
userId = 0
}
$writeBulkResults = @(Get-BulkResults -Client $Client -Operation "write-bulk" -Json $writeBulkJson)
Assert-BulkResults -Client $Client -Operation "write-bulk" -Results $writeBulkResults -ExpectedCount 1
$clientResult.write.writeBulkValue = $bulkSentinel
$clientResult.write.writeBulkResultCount = $writeBulkResults.Count
}
}
}
@@ -857,6 +949,40 @@ function Invoke-ClientFlow {
throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)."
}
# ReadBulk over the already-advised tags: every result must come
# from the per-session value cache (was_cached = true). Confirms
# the gateway/worker/cache wiring serves cached values for tags
# the caller did not create the subscription for.
$readBulkSummary = $null
if (-not $SkipReadWriteBulk) {
$readBulkJson = Invoke-ClientOperation -Client $Client -Operation "read-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
items = $bulkItems
timeoutMs = 1500
}
$readResults = @(Get-BulkResults -Client $Client -Operation "read-bulk" -Json $readBulkJson)
Assert-BulkResults -Client $Client -Operation "read-bulk" -Results $readResults -ExpectedCount $bulkTags.Count
$cachedCount = @($readResults | Where-Object {
[bool](Get-PropertyValue -Object $_ -Names @("wasCached", "was_cached"))
}).Count
# Allow up to one snapshot fallback per batch: a freshly
# advised tag may not have an OnDataChange cached yet if it
# hasn't pushed an update in the small window between
# subscribe-bulk and read-bulk. Anything beyond that means
# the cached-path optimization is broken.
$maxSnapshotFallbacks = 1
if ($cachedCount -lt ($readResults.Count - $maxSnapshotFallbacks)) {
throw ("$Client read-bulk only returned $cachedCount cached result(s) " +
"out of $($readResults.Count); the cache-then-snapshot fork must " +
"serve cached values for already-advised tags.")
}
$readBulkSummary = [ordered]@{
tagCount = $readResults.Count
cachedCount = $cachedCount
}
}
$unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
@@ -870,6 +996,7 @@ function Invoke-ClientFlow {
subscribedCount = $subscribeResults.Count
unsubscribedCount = $unsubscribeResults.Count
itemHandles = $bulkItemHandles
readBulk = $readBulkSummary
}
}