Make the e2e write phase work live across all five clients
Running the matrix against a live gateway surfaced several issues: - The write phase is now opt-in (-VerifyWrite, was -SkipWrite). It runs right after register so only a small event backlog precedes the write, and asserts the reliable OnWriteComplete signal (the written value is not echoed back by a provider-driven attribute like TestChangingInt, so the value compare is best-effort). - Java was launched as bare "gradle", which .NET's Process.Start cannot exec (it is gradle.bat) — resolve the launcher and run it via cmd.exe. - The Java client's MxEventStream queue capacity was 16, which overflows on any active session's backlog-replay burst; raised to 1024. - The Rust stream-events CLI now renders the event family as the proto enum name, matching the protobuf-JSON the other four clients emit. Update docs/GatewayTesting.md for the reworked write phase. Verified live: the full five-client matrix passes with -VerifyWrite. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+6
-1
@@ -260,7 +260,12 @@ public final class MxGatewayClient implements AutoCloseable {
|
|||||||
* @return an iterator-style stream of events
|
* @return an iterator-style stream of events
|
||||||
*/
|
*/
|
||||||
public MxEventStream streamEvents(StreamEventsRequest request) {
|
public MxEventStream streamEvents(StreamEventsRequest request) {
|
||||||
MxEventStream stream = new MxEventStream(16);
|
// The buffer must absorb the gateway's session-backlog replay burst,
|
||||||
|
// which arrives far faster than the iterator drains it. A small queue
|
||||||
|
// overflows on any moderately active session; 1024 covers a realistic
|
||||||
|
// backlog while still bounding memory and preserving overflow
|
||||||
|
// detection for a genuinely unbounded stream.
|
||||||
|
MxEventStream stream = new MxEventStream(1024);
|
||||||
MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options).streamEvents(request, stream.observer());
|
MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options).streamEvents(request, stream.observer());
|
||||||
return stream;
|
return stream;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
|
|||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
||||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||||
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent,
|
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
|
||||||
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest,
|
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest,
|
||||||
};
|
};
|
||||||
use mxgateway_client::{
|
use mxgateway_client::{
|
||||||
@@ -842,8 +842,13 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) {
|
|||||||
/// matrix can extract and compare event values uniformly across all five
|
/// matrix can extract and compare event values uniformly across all five
|
||||||
/// client CLIs.
|
/// client CLIs.
|
||||||
fn event_to_json(event: &MxEvent) -> Value {
|
fn event_to_json(event: &MxEvent) -> Value {
|
||||||
|
// Render the family as the proto enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE)
|
||||||
|
// so it matches the protobuf-JSON the .NET/Go/Java/Python CLIs emit.
|
||||||
|
let family = MxEventFamily::try_from(event.family)
|
||||||
|
.map(|family| family.as_str_name().to_owned())
|
||||||
|
.unwrap_or_else(|_| event.family.to_string());
|
||||||
json!({
|
json!({
|
||||||
"family": event.family,
|
"family": family,
|
||||||
"sessionId": event.session_id,
|
"sessionId": event.session_id,
|
||||||
"serverHandle": event.server_handle,
|
"serverHandle": event.server_handle,
|
||||||
"itemHandle": event.item_handle,
|
"itemHandle": event.item_handle,
|
||||||
|
|||||||
+28
-16
@@ -180,19 +180,30 @@ path and writes a JSON report under `artifacts/e2e/`:
|
|||||||
2. **Bulk** — verifies `SubscribeBulk` / `UnsubscribeBulk` on a bounded tag
|
2. **Bulk** — verifies `SubscribeBulk` / `UnsubscribeBulk` on a bounded tag
|
||||||
subset (skip with `-SkipBulk`).
|
subset (skip with `-SkipBulk`).
|
||||||
3. **Add-item / advise** — adds and advises every discovered test tag.
|
3. **Add-item / advise** — adds and advises every discovered test tag.
|
||||||
4. **Write round-trip** — writes a per-client sentinel value to a configurable
|
4. **Stream** — asserts a bounded event stream delivers at least one event
|
||||||
writable attribute (`-WriteAttribute`, default `TestChangingInt`), then
|
|
||||||
asserts the same value is echoed back through the event stream. Skip with
|
|
||||||
`-SkipWrite`. The Rust `stream-events` CLI emits full per-event JSON
|
|
||||||
(`itemHandle` + `value`) so all five clients run an identical value compare.
|
|
||||||
5. **Stream** — asserts a bounded event stream delivers at least one event
|
|
||||||
(skip with `-SkipStream`).
|
(skip with `-SkipStream`).
|
||||||
6. **Parity** — asserts MXAccess error paths are rejected rather than silently
|
5. **Parity** — asserts MXAccess error paths are rejected rather than silently
|
||||||
succeeding: an invalid item handle and an unknown session id (skip with
|
succeeding: an invalid item handle and an unknown session id (skip with
|
||||||
`-SkipParity`).
|
`-SkipParity`).
|
||||||
7. **Auth rejection** — asserts `open-session` is rejected when the API key is
|
6. **Auth rejection** — asserts `open-session` is rejected when the API key is
|
||||||
missing, and (when `-RejectScopeApiKeyEnv` names an insufficient-scope key)
|
missing, and (when `-RejectScopeApiKeyEnv` names an insufficient-scope key)
|
||||||
when the key lacks the required scope. Skip with `-SkipAuth`.
|
when the key lacks the required scope. Skip with `-SkipAuth`.
|
||||||
|
7. **Write round-trip** — *opt-in (`-VerifyWrite`).* Runs right after
|
||||||
|
`register`: adds and advises a configurable writable attribute
|
||||||
|
(`-WriteAttribute`, default `TestChangingInt`), writes a per-client
|
||||||
|
sentinel value, then streams events and asserts an `OnWriteComplete` event
|
||||||
|
for that item is observed — proof the write round-tripped through the
|
||||||
|
gateway, worker, and MXAccess provider. The written value being echoed back
|
||||||
|
in an `OnDataChange` is recorded best-effort (`echoObserved`): a
|
||||||
|
provider-driven attribute such as `TestChangingInt` accepts the write but
|
||||||
|
immediately overwrites it, so no data-change carries the value back. The
|
||||||
|
Rust `stream-events` CLI emits full per-event JSON (`family`, `itemHandle`,
|
||||||
|
`value`) so all five clients apply the same checks.
|
||||||
|
|
||||||
|
It is opt-in because it mutates live tag state. The phase fails fast if the
|
||||||
|
write command is rejected — e.g. against a gateway whose worker predates
|
||||||
|
write support (`MxAccessCommandExecutor` returning `InvalidRequest` for
|
||||||
|
`Write`/`Write2`/`WriteSecured`/`WriteSecured2`).
|
||||||
|
|
||||||
Build the gateway and worker, start the gateway, and provide a valid API key
|
Build the gateway and worker, start the gateway, and provide a valid API key
|
||||||
before running the client e2e script:
|
before running the client e2e script:
|
||||||
@@ -209,9 +220,9 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Clien
|
|||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -BulkTagCount 10
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -BulkTagCount 10
|
||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipStream
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipStream
|
||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipBulk
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipBulk
|
||||||
# Write round-trip: point at a writable scalar attribute and its value type.
|
# Write round-trip (opt-in): point at a writable scalar attribute and its
|
||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -WriteAttribute TestChangingInt -WriteType int32
|
# value type.
|
||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipWrite
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyWrite -WriteAttribute TestChangingInt -WriteType int32
|
||||||
# Auth rejection: also assert an insufficient-scope key is denied.
|
# Auth rejection: also assert an insufficient-scope key is denied.
|
||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY
|
||||||
# Run all five clients concurrently as isolated child processes.
|
# Run all five clients concurrently as isolated child processes.
|
||||||
@@ -221,11 +232,12 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -DryRu
|
|||||||
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Endpoint localhost:5000 -ApiKeyEnv MXGATEWAY_API_KEY
|
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Endpoint localhost:5000 -ApiKeyEnv MXGATEWAY_API_KEY
|
||||||
```
|
```
|
||||||
|
|
||||||
The write round-trip fails loudly if `-WriteAttribute` does not name a writable
|
When `-VerifyWrite` is enabled, the write round-trip fails loudly if the write
|
||||||
scalar attribute, if the write is rejected, or if the sentinel value is not
|
command is rejected, if `-WriteAttribute` does not name a writable scalar
|
||||||
observed within `-WriteEchoMaxEvents` (default 200) streamed events. Point
|
attribute, or if no `OnWriteComplete` event is observed for the written item
|
||||||
`-WriteAttribute` at a stable writable attribute, raise `-WriteEchoMaxEvents`,
|
within `-WriteEchoMaxEvents` (default 200) streamed events. Raise
|
||||||
or pass `-SkipWrite` if no suitable attribute is deployed.
|
`-WriteEchoMaxEvents` if the gateway's per-session event backlog is large
|
||||||
|
enough to push `OnWriteComplete` past that bound.
|
||||||
|
|
||||||
## Focused Commands
|
## Focused Commands
|
||||||
|
|
||||||
|
|||||||
@@ -33,8 +33,10 @@ param(
|
|||||||
[int]$BulkTagCount = 6,
|
[int]$BulkTagCount = 6,
|
||||||
[switch]$SkipStream,
|
[switch]$SkipStream,
|
||||||
[switch]$SkipBulk,
|
[switch]$SkipBulk,
|
||||||
# Write round-trip + value assertion.
|
# Write round-trip. Opt-in because it mutates live tag state: it writes a
|
||||||
[switch]$SkipWrite,
|
# sentinel value to -WriteAttribute and asserts an OnWriteComplete event
|
||||||
|
# confirms the write reached the MXAccess provider.
|
||||||
|
[switch]$VerifyWrite,
|
||||||
[string]$WriteAttribute = "TestChangingInt",
|
[string]$WriteAttribute = "TestChangingInt",
|
||||||
[string]$WriteType = "int32",
|
[string]$WriteType = "int32",
|
||||||
[int]$WriteValueBase = 424200,
|
[int]$WriteValueBase = 424200,
|
||||||
@@ -335,6 +337,14 @@ function Get-EventItemHandle {
|
|||||||
return [int]$handle
|
return [int]$handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Extracts the event family as a string. All five CLIs render it as the
|
||||||
|
# protobuf enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE).
|
||||||
|
function Get-EventFamily {
|
||||||
|
param([object]$Event)
|
||||||
|
|
||||||
|
return [string](Get-PropertyValue -Object $Event -Names @("family"))
|
||||||
|
}
|
||||||
|
|
||||||
# Extracts the scalar payload from a streamed event's MxValue as a string.
|
# Extracts the scalar payload from a streamed event's MxValue as a string.
|
||||||
# The MxValue oneof renders to one protobuf-JSON `*Value` key; all five
|
# The MxValue oneof renders to one protobuf-JSON `*Value` key; all five
|
||||||
# CLIs (after the Rust stream-events extension) emit the same key names.
|
# CLIs (after the Rust stream-events extension) emit the same key names.
|
||||||
@@ -595,7 +605,20 @@ function Get-ClientCommand {
|
|||||||
$cliArgs += @("--session-id", $Values.sessionId)
|
$cliArgs += @("--session-id", $Values.sessionId)
|
||||||
}
|
}
|
||||||
$arguments = @("--quiet", ":mxgateway-cli:run", "--args=$($cliArgs -join ' ')")
|
$arguments = @("--quiet", ":mxgateway-cli:run", "--args=$($cliArgs -join ' ')")
|
||||||
return [pscustomobject]@{ file = "gradle"; args = $arguments; cwd = (Join-Path $repoRoot "clients/java"); env = @{} }
|
# Gradle ships as gradle.bat on Windows; .NET's Process.Start
|
||||||
|
# (UseShellExecute=false) cannot launch a batch file directly, so
|
||||||
|
# resolve the launcher and run it through cmd.exe.
|
||||||
|
$gradleCommand = Get-Command "gradle.bat", "gradle.cmd", "gradle.exe", "gradle" `
|
||||||
|
-ErrorAction SilentlyContinue | Select-Object -First 1
|
||||||
|
if ($null -eq $gradleCommand) {
|
||||||
|
throw "The 'gradle' command was not found on PATH; the Java client e2e flow requires Gradle."
|
||||||
|
}
|
||||||
|
return [pscustomobject]@{
|
||||||
|
file = "cmd.exe"
|
||||||
|
args = @("/c", $gradleCommand.Source) + $arguments
|
||||||
|
cwd = (Join-Path $repoRoot "clients/java")
|
||||||
|
env = @{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -627,23 +650,30 @@ function Get-DryRunReply {
|
|||||||
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
|
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
|
||||||
}
|
}
|
||||||
"stream-events" {
|
"stream-events" {
|
||||||
# Echo the requested write value back so the write round-trip
|
# Synthesize an OnDataChange (carrying the written value) and an
|
||||||
# assertion passes under -DryRun. The reply is shaped per client:
|
# OnWriteComplete so the write round-trip assertion passes under
|
||||||
# Go and Java emit one event object per line (Read-JsonObject
|
# -DryRun. The reply is shaped per client: Go and Java emit one
|
||||||
# collapses NDJSON to a bare array), the others aggregate the
|
# event object per line (Read-JsonObject collapses NDJSON to a
|
||||||
# events under an `events` property.
|
# bare array), the others aggregate the events under `events`.
|
||||||
$itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 }
|
$itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 }
|
||||||
$echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 }
|
$echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 }
|
||||||
$event = [pscustomobject]@{
|
$dataEvent = [pscustomobject]@{
|
||||||
workerSequence = 1
|
workerSequence = 1
|
||||||
|
family = "MX_EVENT_FAMILY_ON_DATA_CHANGE"
|
||||||
itemHandle = $itemHandle
|
itemHandle = $itemHandle
|
||||||
value = [pscustomobject]@{ int32Value = $echoValue }
|
value = [pscustomobject]@{ int32Value = $echoValue }
|
||||||
}
|
}
|
||||||
|
$writeCompleteEvent = [pscustomobject]@{
|
||||||
|
workerSequence = 2
|
||||||
|
family = "MX_EVENT_FAMILY_ON_WRITE_COMPLETE"
|
||||||
|
itemHandle = $itemHandle
|
||||||
|
}
|
||||||
|
$events = @($dataEvent, $writeCompleteEvent)
|
||||||
switch ($Client) {
|
switch ($Client) {
|
||||||
"go" { return ,@($event) }
|
"go" { return ,$events }
|
||||||
"java" { return ,@($event) }
|
"java" { return ,$events }
|
||||||
"rust" { return [pscustomobject]@{ eventCount = 1; events = @($event) } }
|
"rust" { return [pscustomobject]@{ eventCount = $events.Count; events = $events } }
|
||||||
default { return [pscustomobject]@{ events = @($event) } }
|
default { return [pscustomobject]@{ events = $events } }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } }
|
default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } }
|
||||||
@@ -735,6 +765,83 @@ function Invoke-ClientFlow {
|
|||||||
$serverHandle = Get-ServerHandle -Client $Client -Json $registerJson
|
$serverHandle = Get-ServerHandle -Client $Client -Json $registerJson
|
||||||
$clientResult.serverHandle = $serverHandle
|
$clientResult.serverHandle = $serverHandle
|
||||||
|
|
||||||
|
# --- Write round-trip + value assertion ---------------------------
|
||||||
|
# Runs right after register, before the bulk and add-item phases, so
|
||||||
|
# only a small backlog of events precedes the write. The gateway
|
||||||
|
# replays the per-session event buffer from the start, so the
|
||||||
|
# post-write OnWriteComplete must be reachable within the bounded
|
||||||
|
# -WriteEchoMaxEvents window.
|
||||||
|
if ($VerifyWrite) {
|
||||||
|
$writeTag = @($Tags | Where-Object {
|
||||||
|
$_.attributeName -eq $WriteAttribute
|
||||||
|
}) | Select-Object -First 1
|
||||||
|
|
||||||
|
if ($null -eq $writeTag) {
|
||||||
|
Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'."
|
||||||
|
} else {
|
||||||
|
$writeAddJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
serverHandle = $serverHandle
|
||||||
|
item = $writeTag.fullTagReference
|
||||||
|
}
|
||||||
|
$writeItemHandle = Get-ItemHandle -Client $Client -Json $writeAddJson
|
||||||
|
Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
serverHandle = $serverHandle
|
||||||
|
itemHandle = $writeItemHandle
|
||||||
|
} | Out-Null
|
||||||
|
|
||||||
|
$sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)"
|
||||||
|
Invoke-ClientOperation -Client $Client -Operation "write" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
serverHandle = $serverHandle
|
||||||
|
itemHandle = $writeItemHandle
|
||||||
|
valueType = $WriteType
|
||||||
|
value = $sentinelValue
|
||||||
|
} | Out-Null
|
||||||
|
|
||||||
|
$writeStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{
|
||||||
|
sessionId = $sessionId
|
||||||
|
maxEvents = $WriteEchoMaxEvents
|
||||||
|
echoItemHandle = $writeItemHandle
|
||||||
|
echoValue = $sentinelValue
|
||||||
|
}
|
||||||
|
$writeEvents = @(Get-StreamEvents -Client $Client -Json $writeStreamJson)
|
||||||
|
$writeItemEvents = @($writeEvents | Where-Object {
|
||||||
|
(Get-EventItemHandle -Event $_) -eq $writeItemHandle
|
||||||
|
})
|
||||||
|
|
||||||
|
# The reliable write round-trip signal: MXAccess fires
|
||||||
|
# OnWriteComplete once the write reaches the provider. The
|
||||||
|
# value echo is best-effort — a provider-driven attribute
|
||||||
|
# (e.g. a simulated counter) accepts the write but does not
|
||||||
|
# hold the value, so no OnDataChange carries it back.
|
||||||
|
$writeCompleteEvent = $writeItemEvents | Where-Object {
|
||||||
|
(Get-EventFamily -Event $_) -match "WRITE_COMPLETE"
|
||||||
|
} | Select-Object -First 1
|
||||||
|
$echoEvent = $writeItemEvents | Where-Object {
|
||||||
|
Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_)
|
||||||
|
} | Select-Object -First 1
|
||||||
|
|
||||||
|
if ($null -eq $writeCompleteEvent) {
|
||||||
|
throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " +
|
||||||
|
"'$($writeTag.fullTagReference)' (item handle $writeItemHandle) but no " +
|
||||||
|
"OnWriteComplete event was observed in $($writeEvents.Count) streamed event(s). " +
|
||||||
|
"Increase -WriteEchoMaxEvents, or drop -VerifyWrite.")
|
||||||
|
}
|
||||||
|
|
||||||
|
$clientResult.write = [ordered]@{
|
||||||
|
attributeName = $WriteAttribute
|
||||||
|
fullTagReference = $writeTag.fullTagReference
|
||||||
|
itemHandle = $writeItemHandle
|
||||||
|
valueType = $WriteType
|
||||||
|
value = $sentinelValue
|
||||||
|
writeCompleteObserved = $true
|
||||||
|
echoObserved = ($null -ne $echoEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (-not $SkipBulk) {
|
if (-not $SkipBulk) {
|
||||||
$bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count)))
|
$bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count)))
|
||||||
$bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join ","
|
$bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join ","
|
||||||
@@ -788,71 +895,15 @@ function Invoke-ClientFlow {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- Write round-trip + value assertion ---------------------------
|
# --- Event streaming ----------------------------------------------
|
||||||
# Write a per-client sentinel value to a configured writable
|
if (-not $SkipStream) {
|
||||||
# attribute, then assert it is echoed back through the event stream.
|
$streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{
|
||||||
$writeTarget = $null
|
|
||||||
if (-not $SkipWrite) {
|
|
||||||
$writeTarget = @($clientResult.addedItems | Where-Object {
|
|
||||||
$_.attributeName -eq $WriteAttribute
|
|
||||||
}) | Select-Object -First 1
|
|
||||||
}
|
|
||||||
|
|
||||||
$doWrite = $null -ne $writeTarget
|
|
||||||
$sentinelValue = $null
|
|
||||||
if ($doWrite) {
|
|
||||||
$sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)"
|
|
||||||
Invoke-ClientOperation -Client $Client -Operation "write" -Values @{
|
|
||||||
sessionId = $sessionId
|
sessionId = $sessionId
|
||||||
serverHandle = $serverHandle
|
|
||||||
itemHandle = $writeTarget.itemHandle
|
|
||||||
valueType = $WriteType
|
|
||||||
value = $sentinelValue
|
|
||||||
} | Out-Null
|
|
||||||
} elseif (-not $SkipWrite) {
|
|
||||||
Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'."
|
|
||||||
}
|
|
||||||
|
|
||||||
# --- Event streaming (also serves the write echo assertion) -------
|
|
||||||
$captureEvents = (-not $SkipStream) -or $doWrite
|
|
||||||
if ($captureEvents) {
|
|
||||||
$streamValues = @{ sessionId = $sessionId }
|
|
||||||
if ($doWrite) {
|
|
||||||
$streamValues.maxEvents = $WriteEchoMaxEvents
|
|
||||||
$streamValues.echoItemHandle = $writeTarget.itemHandle
|
|
||||||
$streamValues.echoValue = $sentinelValue
|
|
||||||
}
|
}
|
||||||
$streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values $streamValues
|
|
||||||
$events = @(Get-StreamEvents -Client $Client -Json $streamJson)
|
|
||||||
$clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson
|
$clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson
|
||||||
|
if ($clientResult.eventCount -lt 1) {
|
||||||
if (-not $SkipStream -and $clientResult.eventCount -lt 1) {
|
|
||||||
throw "The $Client stream-events command returned no events."
|
throw "The $Client stream-events command returned no events."
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($doWrite) {
|
|
||||||
$echoEvent = $events | Where-Object {
|
|
||||||
(Get-EventItemHandle -Event $_) -eq $writeTarget.itemHandle -and
|
|
||||||
(Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_))
|
|
||||||
} | Select-Object -First 1
|
|
||||||
|
|
||||||
if ($null -eq $echoEvent) {
|
|
||||||
throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " +
|
|
||||||
"'$($writeTarget.fullTagReference)' (item handle $($writeTarget.itemHandle)) " +
|
|
||||||
"but no matching value was observed in $($events.Count) streamed event(s). " +
|
|
||||||
"Increase -WriteEchoMaxEvents, point -WriteAttribute at a writable attribute, or pass -SkipWrite.")
|
|
||||||
}
|
|
||||||
|
|
||||||
$clientResult.write = [ordered]@{
|
|
||||||
attributeName = $WriteAttribute
|
|
||||||
fullTagReference = $writeTarget.fullTagReference
|
|
||||||
itemHandle = $writeTarget.itemHandle
|
|
||||||
valueType = $WriteType
|
|
||||||
value = $sentinelValue
|
|
||||||
echoObserved = $true
|
|
||||||
echoWorkerSequence = (Get-PropertyValue -Object $echoEvent -Names @("workerSequence", "worker_sequence"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- Error-path (parity) checks -----------------------------------
|
# --- Error-path (parity) checks -----------------------------------
|
||||||
@@ -965,7 +1016,7 @@ function Get-ChildArgumentList {
|
|||||||
}
|
}
|
||||||
if ($SkipStream) { $childArgs += "-SkipStream" }
|
if ($SkipStream) { $childArgs += "-SkipStream" }
|
||||||
if ($SkipBulk) { $childArgs += "-SkipBulk" }
|
if ($SkipBulk) { $childArgs += "-SkipBulk" }
|
||||||
if ($SkipWrite) { $childArgs += "-SkipWrite" }
|
if ($VerifyWrite) { $childArgs += "-VerifyWrite" }
|
||||||
if ($SkipParity) { $childArgs += "-SkipParity" }
|
if ($SkipParity) { $childArgs += "-SkipParity" }
|
||||||
if ($SkipAuth) { $childArgs += "-SkipAuth" }
|
if ($SkipAuth) { $childArgs += "-SkipAuth" }
|
||||||
if ($DryRun) { $childArgs += "-DryRun" }
|
if ($DryRun) { $childArgs += "-DryRun" }
|
||||||
@@ -1043,7 +1094,7 @@ if ($Parallel -and $Clients.Count -gt 1) {
|
|||||||
bulkTagCount = $BulkTagCount
|
bulkTagCount = $BulkTagCount
|
||||||
skipStream = [bool]$SkipStream
|
skipStream = [bool]$SkipStream
|
||||||
skipBulk = [bool]$SkipBulk
|
skipBulk = [bool]$SkipBulk
|
||||||
skipWrite = [bool]$SkipWrite
|
verifyWrite = [bool]$VerifyWrite
|
||||||
skipParity = [bool]$SkipParity
|
skipParity = [bool]$SkipParity
|
||||||
skipAuth = [bool]$SkipAuth
|
skipAuth = [bool]$SkipAuth
|
||||||
writeAttribute = $WriteAttribute
|
writeAttribute = $WriteAttribute
|
||||||
@@ -1101,7 +1152,7 @@ $run = [ordered]@{
|
|||||||
bulkTagCount = $BulkTagCount
|
bulkTagCount = $BulkTagCount
|
||||||
skipStream = [bool]$SkipStream
|
skipStream = [bool]$SkipStream
|
||||||
skipBulk = [bool]$SkipBulk
|
skipBulk = [bool]$SkipBulk
|
||||||
skipWrite = [bool]$SkipWrite
|
verifyWrite = [bool]$VerifyWrite
|
||||||
skipParity = [bool]$SkipParity
|
skipParity = [bool]$SkipParity
|
||||||
skipAuth = [bool]$SkipAuth
|
skipAuth = [bool]$SkipAuth
|
||||||
writeAttribute = $WriteAttribute
|
writeAttribute = $WriteAttribute
|
||||||
|
|||||||
Reference in New Issue
Block a user