Files
mxaccessgw/scripts/run-rust-testchangingint-subscribers.ps1
2026-04-28 00:13:22 -04:00

178 lines
6.3 KiB
PowerShell

[CmdletBinding()]
param(
[string]$Endpoint = "http://127.0.0.1:5001",
[string]$ApiKeyEnv = "MXGATEWAY_API_KEY",
[string]$ApiKey,
[int]$ClientCount = 5,
[int]$MachineStart = 1,
[int]$MachineEnd = 20,
[string]$Attribute = "TestChangingInt",
[int]$MaxEvents = [int]::MaxValue,
[int]$StreamCallTimeoutSeconds = 86400,
[string]$LogDirectory = (Join-Path (Get-Location) "artifacts\rust-testchangingint-subscribers")
)
Set-StrictMode -Version Latest
$ErrorActionPreference = "Stop"
if ($ClientCount -le 0) {
throw "ClientCount must be greater than zero."
}
if ($MachineStart -lt 1 -or $MachineEnd -lt $MachineStart) {
throw "MachineStart must be at least 1 and MachineEnd must be greater than or equal to MachineStart."
}
if ($StreamCallTimeoutSeconds -le 0) {
throw "StreamCallTimeoutSeconds must be greater than zero."
}
$repoRoot = Split-Path -Parent $PSScriptRoot
$rustRoot = Join-Path $repoRoot "clients\rust"
$mxgwExe = Join-Path $rustRoot "target\debug\mxgw.exe"
$sessionIds = New-Object System.Collections.Generic.List[string]
$streamProcesses = New-Object System.Collections.Generic.List[System.Diagnostics.Process]
function Get-ConnectionArgs {
$args = @("--endpoint", $Endpoint, "--plaintext")
if (-not [string]::IsNullOrWhiteSpace($ApiKey)) {
$args += @("--api-key", $ApiKey)
} else {
$args += @("--api-key-env", $ApiKeyEnv)
}
return $args
}
function Invoke-MxgwJson {
param(
[Parameter(Mandatory = $true)]
[string[]]$Arguments
)
$output = & $mxgwExe @Arguments 2>&1
if ($LASTEXITCODE -ne 0) {
throw "mxgw failed with exit code $LASTEXITCODE for arguments: $($Arguments -join ' ')`n$output"
}
$jsonText = ($output | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) -join "`n"
if ([string]::IsNullOrWhiteSpace($jsonText)) {
throw "mxgw returned no JSON for arguments: $($Arguments -join ' ')"
}
return $jsonText | ConvertFrom-Json
}
function Close-SessionQuietly {
param([string]$SessionId)
if ([string]::IsNullOrWhiteSpace($SessionId)) {
return
}
try {
$args = @("close-session") + (Get-ConnectionArgs) + @("--session-id", $SessionId, "--json")
[void](Invoke-MxgwJson -Arguments $args)
Write-Host "Closed session $SessionId"
} catch {
Write-Warning "Failed to close session ${SessionId}: $($_.Exception.Message)"
}
}
function Stop-StreamProcessQuietly {
param([System.Diagnostics.Process]$Process)
if ($null -eq $Process -or $Process.HasExited) {
return
}
try {
Stop-Process -Id $Process.Id -Force -ErrorAction Stop
Write-Host "Stopped stream process $($Process.Id)"
} catch {
Write-Warning "Failed to stop stream process $($Process.Id): $($_.Exception.Message)"
}
}
New-Item -ItemType Directory -Force -Path $LogDirectory | Out-Null
Write-Host "Building Rust CLI..."
Push-Location $rustRoot
try {
cargo build -p mxgw-cli
} finally {
Pop-Location
}
if (-not (Test-Path -LiteralPath $mxgwExe)) {
throw "Rust CLI executable was not found at $mxgwExe"
}
$tags = for ($machine = $MachineStart; $machine -le $MachineEnd; $machine++) {
"TestMachine_{0:D3}.{1}" -f $machine, $Attribute
}
try {
for ($clientIndex = 0; $clientIndex -lt $ClientCount; $clientIndex++) {
$clientNumber = $clientIndex + 1
$clientTags = for ($tagIndex = $clientIndex; $tagIndex -lt $tags.Count; $tagIndex += $ClientCount) {
$tags[$tagIndex]
}
if ($clientTags.Count -eq 0) {
continue
}
$clientName = "mxgw-rust-changingint-$clientNumber"
Write-Host "Opening session for client $clientNumber with $($clientTags.Count) tag(s)..."
$openArgs = @("open-session") + (Get-ConnectionArgs) + @("--client-name", $clientName, "--json")
$open = Invoke-MxgwJson -Arguments $openArgs
$sessionId = [string]$open.sessionId
$sessionIds.Add($sessionId)
$registerArgs = @("register") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--client-name", $clientName, "--json")
$register = Invoke-MxgwJson -Arguments $registerArgs
$serverHandle = [int]$register.serverHandle
foreach ($tag in $clientTags) {
$addArgs = @("add-item") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item", $tag, "--json")
$add = Invoke-MxgwJson -Arguments $addArgs
$itemHandle = [int]$add.itemHandle
$adviseArgs = @("advise") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item-handle", $itemHandle, "--json")
[void](Invoke-MxgwJson -Arguments $adviseArgs)
Write-Host "Client $clientNumber subscribed $tag itemHandle=$itemHandle"
}
$stdout = Join-Path $LogDirectory ("client-{0:D2}.stdout.log" -f $clientNumber)
$stderr = Join-Path $LogDirectory ("client-{0:D2}.stderr.log" -f $clientNumber)
$streamArgs = @("stream-events") + (Get-ConnectionArgs) + @(
"--session-id", $sessionId,
"--max-events", $MaxEvents.ToString(),
"--call-timeout-seconds", $StreamCallTimeoutSeconds.ToString(),
"--json")
$process = Start-Process -FilePath $mxgwExe -ArgumentList $streamArgs -WorkingDirectory $rustRoot -WindowStyle Hidden -RedirectStandardOutput $stdout -RedirectStandardError $stderr -PassThru
$streamProcesses.Add($process)
Write-Host "Client $clientNumber streaming session $sessionId in process $($process.Id); logs: $stdout"
}
Write-Host "Started $($streamProcesses.Count) Rust stream client(s). Press Ctrl+C to stop and close sessions."
while ($true) {
Start-Sleep -Seconds 5
foreach ($process in @($streamProcesses)) {
if ($process.HasExited) {
throw "Stream process $($process.Id) exited with code $($process.ExitCode). Check $LogDirectory."
}
}
}
} finally {
Write-Host "Stopping Rust stream clients and closing gateway sessions..."
foreach ($process in @($streamProcesses)) {
Stop-StreamProcessQuietly -Process $process
}
foreach ($sessionId in @($sessionIds)) {
Close-SessionQuietly -SessionId $sessionId
}
}