[CmdletBinding()] param( [string]$Endpoint = "http://127.0.0.1:5001", [string]$ApiKeyEnv = "MXGATEWAY_API_KEY", [string]$ApiKey, [int]$ClientCount = 5, [int]$MachineStart = 1, [int]$MachineEnd = 20, [string]$Attribute = "TestChangingInt", [int]$MaxEvents = [int]::MaxValue, [int]$StreamCallTimeoutSeconds = 86400, [string]$LogDirectory = (Join-Path (Get-Location) "artifacts\rust-testchangingint-subscribers") ) Set-StrictMode -Version Latest $ErrorActionPreference = "Stop" if ($ClientCount -le 0) { throw "ClientCount must be greater than zero." } if ($MachineStart -lt 1 -or $MachineEnd -lt $MachineStart) { throw "MachineStart must be at least 1 and MachineEnd must be greater than or equal to MachineStart." } if ($StreamCallTimeoutSeconds -le 0) { throw "StreamCallTimeoutSeconds must be greater than zero." } $repoRoot = Split-Path -Parent $PSScriptRoot $rustRoot = Join-Path $repoRoot "clients\rust" $mxgwExe = Join-Path $rustRoot "target\debug\mxgw.exe" $sessionIds = New-Object System.Collections.Generic.List[string] $streamProcesses = New-Object System.Collections.Generic.List[System.Diagnostics.Process] function Get-ConnectionArgs { $args = @("--endpoint", $Endpoint, "--plaintext") if (-not [string]::IsNullOrWhiteSpace($ApiKey)) { $args += @("--api-key", $ApiKey) } else { $args += @("--api-key-env", $ApiKeyEnv) } return $args } function Invoke-MxgwJson { param( [Parameter(Mandatory = $true)] [string[]]$Arguments ) $output = & $mxgwExe @Arguments 2>&1 if ($LASTEXITCODE -ne 0) { throw "mxgw failed with exit code $LASTEXITCODE for arguments: $($Arguments -join ' ')`n$output" } $jsonText = ($output | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) -join "`n" if ([string]::IsNullOrWhiteSpace($jsonText)) { throw "mxgw returned no JSON for arguments: $($Arguments -join ' ')" } return $jsonText | ConvertFrom-Json } function Close-SessionQuietly { param([string]$SessionId) if ([string]::IsNullOrWhiteSpace($SessionId)) { return } try { $args = @("close-session") + (Get-ConnectionArgs) + @("--session-id", $SessionId, "--json") [void](Invoke-MxgwJson -Arguments $args) Write-Host "Closed session $SessionId" } catch { Write-Warning "Failed to close session ${SessionId}: $($_.Exception.Message)" } } function Stop-StreamProcessQuietly { param([System.Diagnostics.Process]$Process) if ($null -eq $Process -or $Process.HasExited) { return } try { Stop-Process -Id $Process.Id -Force -ErrorAction Stop Write-Host "Stopped stream process $($Process.Id)" } catch { Write-Warning "Failed to stop stream process $($Process.Id): $($_.Exception.Message)" } } New-Item -ItemType Directory -Force -Path $LogDirectory | Out-Null Write-Host "Building Rust CLI..." Push-Location $rustRoot try { cargo build -p mxgw-cli } finally { Pop-Location } if (-not (Test-Path -LiteralPath $mxgwExe)) { throw "Rust CLI executable was not found at $mxgwExe" } $tags = for ($machine = $MachineStart; $machine -le $MachineEnd; $machine++) { "TestMachine_{0:D3}.{1}" -f $machine, $Attribute } try { for ($clientIndex = 0; $clientIndex -lt $ClientCount; $clientIndex++) { $clientNumber = $clientIndex + 1 $clientTags = for ($tagIndex = $clientIndex; $tagIndex -lt $tags.Count; $tagIndex += $ClientCount) { $tags[$tagIndex] } if ($clientTags.Count -eq 0) { continue } $clientName = "mxgw-rust-changingint-$clientNumber" Write-Host "Opening session for client $clientNumber with $($clientTags.Count) tag(s)..." $openArgs = @("open-session") + (Get-ConnectionArgs) + @("--client-name", $clientName, "--json") $open = Invoke-MxgwJson -Arguments $openArgs $sessionId = [string]$open.sessionId $sessionIds.Add($sessionId) $registerArgs = @("register") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--client-name", $clientName, "--json") $register = Invoke-MxgwJson -Arguments $registerArgs $serverHandle = [int]$register.serverHandle foreach ($tag in $clientTags) { $addArgs = @("add-item") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item", $tag, "--json") $add = Invoke-MxgwJson -Arguments $addArgs $itemHandle = [int]$add.itemHandle $adviseArgs = @("advise") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item-handle", $itemHandle, "--json") [void](Invoke-MxgwJson -Arguments $adviseArgs) Write-Host "Client $clientNumber subscribed $tag itemHandle=$itemHandle" } $stdout = Join-Path $LogDirectory ("client-{0:D2}.stdout.log" -f $clientNumber) $stderr = Join-Path $LogDirectory ("client-{0:D2}.stderr.log" -f $clientNumber) $streamArgs = @("stream-events") + (Get-ConnectionArgs) + @( "--session-id", $sessionId, "--max-events", $MaxEvents.ToString(), "--call-timeout-seconds", $StreamCallTimeoutSeconds.ToString(), "--json") $process = Start-Process -FilePath $mxgwExe -ArgumentList $streamArgs -WorkingDirectory $rustRoot -WindowStyle Hidden -RedirectStandardOutput $stdout -RedirectStandardError $stderr -PassThru $streamProcesses.Add($process) Write-Host "Client $clientNumber streaming session $sessionId in process $($process.Id); logs: $stdout" } Write-Host "Started $($streamProcesses.Count) Rust stream client(s). Press Ctrl+C to stop and close sessions." while ($true) { Start-Sleep -Seconds 5 foreach ($process in @($streamProcesses)) { if ($process.HasExited) { throw "Stream process $($process.Id) exited with code $($process.ExitCode). Check $LogDirectory." } } } } finally { Write-Host "Stopping Rust stream clients and closing gateway sessions..." foreach ($process in @($streamProcesses)) { Stop-StreamProcessQuietly -Process $process } foreach ($sessionId in @($sessionIds)) { Close-SessionQuietly -SessionId $sessionId } }