178 lines
6.3 KiB
PowerShell
178 lines
6.3 KiB
PowerShell
[CmdletBinding()]
|
|
param(
|
|
[string]$Endpoint = "http://127.0.0.1:5001",
|
|
[string]$ApiKeyEnv = "MXGATEWAY_API_KEY",
|
|
[string]$ApiKey,
|
|
[int]$ClientCount = 5,
|
|
[int]$MachineStart = 1,
|
|
[int]$MachineEnd = 20,
|
|
[string]$Attribute = "TestChangingInt",
|
|
[int]$MaxEvents = [int]::MaxValue,
|
|
[int]$StreamCallTimeoutSeconds = 86400,
|
|
[string]$LogDirectory = (Join-Path (Get-Location) "artifacts\rust-testchangingint-subscribers")
|
|
)
|
|
|
|
Set-StrictMode -Version Latest
|
|
$ErrorActionPreference = "Stop"
|
|
|
|
if ($ClientCount -le 0) {
|
|
throw "ClientCount must be greater than zero."
|
|
}
|
|
|
|
if ($MachineStart -lt 1 -or $MachineEnd -lt $MachineStart) {
|
|
throw "MachineStart must be at least 1 and MachineEnd must be greater than or equal to MachineStart."
|
|
}
|
|
|
|
if ($StreamCallTimeoutSeconds -le 0) {
|
|
throw "StreamCallTimeoutSeconds must be greater than zero."
|
|
}
|
|
|
|
$repoRoot = Split-Path -Parent $PSScriptRoot
|
|
$rustRoot = Join-Path $repoRoot "clients\rust"
|
|
$mxgwExe = Join-Path $rustRoot "target\debug\mxgw.exe"
|
|
$sessionIds = New-Object System.Collections.Generic.List[string]
|
|
$streamProcesses = New-Object System.Collections.Generic.List[System.Diagnostics.Process]
|
|
|
|
function Get-ConnectionArgs {
|
|
$args = @("--endpoint", $Endpoint, "--plaintext")
|
|
if (-not [string]::IsNullOrWhiteSpace($ApiKey)) {
|
|
$args += @("--api-key", $ApiKey)
|
|
} else {
|
|
$args += @("--api-key-env", $ApiKeyEnv)
|
|
}
|
|
|
|
return $args
|
|
}
|
|
|
|
function Invoke-MxgwJson {
|
|
param(
|
|
[Parameter(Mandatory = $true)]
|
|
[string[]]$Arguments
|
|
)
|
|
|
|
$output = & $mxgwExe @Arguments 2>&1
|
|
if ($LASTEXITCODE -ne 0) {
|
|
throw "mxgw failed with exit code $LASTEXITCODE for arguments: $($Arguments -join ' ')`n$output"
|
|
}
|
|
|
|
$jsonText = ($output | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) -join "`n"
|
|
if ([string]::IsNullOrWhiteSpace($jsonText)) {
|
|
throw "mxgw returned no JSON for arguments: $($Arguments -join ' ')"
|
|
}
|
|
|
|
return $jsonText | ConvertFrom-Json
|
|
}
|
|
|
|
function Close-SessionQuietly {
|
|
param([string]$SessionId)
|
|
|
|
if ([string]::IsNullOrWhiteSpace($SessionId)) {
|
|
return
|
|
}
|
|
|
|
try {
|
|
$args = @("close-session") + (Get-ConnectionArgs) + @("--session-id", $SessionId, "--json")
|
|
[void](Invoke-MxgwJson -Arguments $args)
|
|
Write-Host "Closed session $SessionId"
|
|
} catch {
|
|
Write-Warning "Failed to close session ${SessionId}: $($_.Exception.Message)"
|
|
}
|
|
}
|
|
|
|
function Stop-StreamProcessQuietly {
|
|
param([System.Diagnostics.Process]$Process)
|
|
|
|
if ($null -eq $Process -or $Process.HasExited) {
|
|
return
|
|
}
|
|
|
|
try {
|
|
Stop-Process -Id $Process.Id -Force -ErrorAction Stop
|
|
Write-Host "Stopped stream process $($Process.Id)"
|
|
} catch {
|
|
Write-Warning "Failed to stop stream process $($Process.Id): $($_.Exception.Message)"
|
|
}
|
|
}
|
|
|
|
New-Item -ItemType Directory -Force -Path $LogDirectory | Out-Null
|
|
|
|
Write-Host "Building Rust CLI..."
|
|
Push-Location $rustRoot
|
|
try {
|
|
cargo build -p mxgw-cli
|
|
} finally {
|
|
Pop-Location
|
|
}
|
|
|
|
if (-not (Test-Path -LiteralPath $mxgwExe)) {
|
|
throw "Rust CLI executable was not found at $mxgwExe"
|
|
}
|
|
|
|
$tags = for ($machine = $MachineStart; $machine -le $MachineEnd; $machine++) {
|
|
"TestMachine_{0:D3}.{1}" -f $machine, $Attribute
|
|
}
|
|
|
|
try {
|
|
for ($clientIndex = 0; $clientIndex -lt $ClientCount; $clientIndex++) {
|
|
$clientNumber = $clientIndex + 1
|
|
$clientTags = for ($tagIndex = $clientIndex; $tagIndex -lt $tags.Count; $tagIndex += $ClientCount) {
|
|
$tags[$tagIndex]
|
|
}
|
|
|
|
if ($clientTags.Count -eq 0) {
|
|
continue
|
|
}
|
|
|
|
$clientName = "mxgw-rust-changingint-$clientNumber"
|
|
Write-Host "Opening session for client $clientNumber with $($clientTags.Count) tag(s)..."
|
|
$openArgs = @("open-session") + (Get-ConnectionArgs) + @("--client-name", $clientName, "--json")
|
|
$open = Invoke-MxgwJson -Arguments $openArgs
|
|
$sessionId = [string]$open.sessionId
|
|
$sessionIds.Add($sessionId)
|
|
|
|
$registerArgs = @("register") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--client-name", $clientName, "--json")
|
|
$register = Invoke-MxgwJson -Arguments $registerArgs
|
|
$serverHandle = [int]$register.serverHandle
|
|
|
|
foreach ($tag in $clientTags) {
|
|
$addArgs = @("add-item") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item", $tag, "--json")
|
|
$add = Invoke-MxgwJson -Arguments $addArgs
|
|
$itemHandle = [int]$add.itemHandle
|
|
|
|
$adviseArgs = @("advise") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item-handle", $itemHandle, "--json")
|
|
[void](Invoke-MxgwJson -Arguments $adviseArgs)
|
|
Write-Host "Client $clientNumber subscribed $tag itemHandle=$itemHandle"
|
|
}
|
|
|
|
$stdout = Join-Path $LogDirectory ("client-{0:D2}.stdout.log" -f $clientNumber)
|
|
$stderr = Join-Path $LogDirectory ("client-{0:D2}.stderr.log" -f $clientNumber)
|
|
$streamArgs = @("stream-events") + (Get-ConnectionArgs) + @(
|
|
"--session-id", $sessionId,
|
|
"--max-events", $MaxEvents.ToString(),
|
|
"--call-timeout-seconds", $StreamCallTimeoutSeconds.ToString(),
|
|
"--json")
|
|
$process = Start-Process -FilePath $mxgwExe -ArgumentList $streamArgs -WorkingDirectory $rustRoot -WindowStyle Hidden -RedirectStandardOutput $stdout -RedirectStandardError $stderr -PassThru
|
|
$streamProcesses.Add($process)
|
|
Write-Host "Client $clientNumber streaming session $sessionId in process $($process.Id); logs: $stdout"
|
|
}
|
|
|
|
Write-Host "Started $($streamProcesses.Count) Rust stream client(s). Press Ctrl+C to stop and close sessions."
|
|
while ($true) {
|
|
Start-Sleep -Seconds 5
|
|
foreach ($process in @($streamProcesses)) {
|
|
if ($process.HasExited) {
|
|
throw "Stream process $($process.Id) exited with code $($process.ExitCode). Check $LogDirectory."
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
Write-Host "Stopping Rust stream clients and closing gateway sessions..."
|
|
foreach ($process in @($streamProcesses)) {
|
|
Stop-StreamProcessQuietly -Process $process
|
|
}
|
|
|
|
foreach ($sessionId in @($sessionIds)) {
|
|
Close-SessionQuietly -SessionId $sessionId
|
|
}
|
|
}
|