Files
mxaccessgw/clients/go/cmd/mxgw-go/main.go
Joseph Doherty 58259016b0 Add stream-alarms and acknowledge-alarm to the Go CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json); acknowledge-alarm is a unary
session-less ack (--reference required, --comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:48 -04:00

1470 lines
46 KiB
Go

// Command mxgw-go is the reference Go CLI for the MXAccess Gateway.
//
// It exposes versioning, session lifecycle, command invocation, event
// streaming, a smoke-test workflow, and Galaxy Repository browse subcommands
// that exercise the same gRPC contract used by the mxgateway library.
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
"time"
"gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)
type versionOutput struct {
ClientVersion string `json:"clientVersion"`
GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"`
WorkerProtocolVersion uint32 `json:"workerProtocolVersion"`
}
type commonOptions struct {
Endpoint string `json:"endpoint"`
APIKey string `json:"apiKey"`
APIKeyEnv string `json:"apiKeyEnv,omitempty"`
Plaintext bool `json:"plaintext"`
CACertFile string `json:"caCertFile,omitempty"`
ServerName string `json:"serverNameOverride,omitempty"`
CallTimeout string `json:"callTimeout,omitempty"`
apiKeyValue string
timeout time.Duration
}
type openSessionOutput struct {
Command string `json:"command"`
Options commonOptions `json:"options"`
Reply json.RawMessage `json:"reply"`
}
type commandReplyOutput struct {
Command string `json:"command"`
Options commonOptions `json:"options"`
Reply json.RawMessage `json:"reply"`
}
func main() {
if err := runWithIO(context.Background(), os.Args[1:], os.Stdout, os.Stderr); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
}
func run(args []string) error {
return runWithIO(context.Background(), args, os.Stdout, os.Stderr)
}
func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) error {
if len(args) == 0 {
writeUsage(stderr)
return errors.New("missing command")
}
switch args[0] {
case "version":
return runVersion(args[1:], stdout, stderr)
case "open-session":
return runOpenSession(ctx, args[1:], stdout, stderr)
case "close-session":
return runCloseSession(ctx, args[1:], stdout, stderr)
case "register":
return runRegister(ctx, args[1:], stdout, stderr)
case "add-item":
return runAddItem(ctx, args[1:], stdout, stderr)
case "advise":
return runAdvise(ctx, args[1:], stdout, stderr)
case "subscribe-bulk":
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
case "unsubscribe-bulk":
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
case "read-bulk":
return runReadBulk(ctx, args[1:], stdout, stderr)
case "write-bulk":
return runWriteBulk(ctx, args[1:], stdout, stderr)
case "write2-bulk":
return runWrite2Bulk(ctx, args[1:], stdout, stderr)
case "write-secured-bulk":
return runWriteSecuredBulk(ctx, args[1:], stdout, stderr)
case "write-secured2-bulk":
return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr)
case "bench-read-bulk":
return runBenchReadBulk(ctx, args[1:], stdout, stderr)
case "write":
return runWrite(ctx, args[1:], stdout, stderr)
case "stream-events":
return runStreamEvents(ctx, args[1:], stdout, stderr)
case "stream-alarms":
return runStreamAlarms(ctx, args[1:], stdout, stderr)
case "acknowledge-alarm":
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
case "smoke":
return runSmoke(ctx, args[1:], stdout, stderr)
case "galaxy-test-connection":
return runGalaxyTestConnection(ctx, args[1:], stdout, stderr)
case "galaxy-last-deploy":
return runGalaxyLastDeploy(ctx, args[1:], stdout, stderr)
case "galaxy-discover":
return runGalaxyDiscover(ctx, args[1:], stdout, stderr)
case "galaxy-watch":
return runGalaxyWatch(ctx, args[1:], stdout, stderr)
case "batch":
return runBatch(ctx, os.Stdin, stdout, stderr)
default:
writeUsage(stderr)
return fmt.Errorf("unknown command %q", args[0])
}
}
func runVersion(args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("version", flag.ContinueOnError)
flags.SetOutput(stderr)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
output := versionOutput{
ClientVersion: mxgateway.ClientVersion,
GatewayProtocolVersion: mxgateway.GatewayProtocolVersion,
WorkerProtocolVersion: mxgateway.WorkerProtocolVersion,
}
if *jsonOutput {
return writeJSON(stdout, output)
}
fmt.Fprintf(stdout, "mxgw-go %s\n", output.ClientVersion)
fmt.Fprintf(stdout, "gateway protocol %d\n", output.GatewayProtocolVersion)
fmt.Fprintf(stdout, "worker protocol %d\n", output.WorkerProtocolVersion)
return nil
}
func runOpenSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("open-session", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-session-name", "", "client session name")
backend := flags.String("backend", "", "requested backend")
if err := flags.Parse(args); err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.OpenSessionRaw(ctx, (&mxgateway.OpenSessionOptions{
RequestedBackend: *backend,
ClientSessionName: *clientName,
}).Request())
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, openSessionOutput{
Command: "open-session",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetSessionId())
return nil
}
func runCloseSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("close-session", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.CloseSessionRaw(ctx, &mxgateway.CloseSessionRequest{SessionId: *sessionID})
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: "close-session",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetFinalState())
return nil
}
func runRegister(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("register", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
clientName := flags.String("client-name", "", "MXAccess client name")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *clientName == "" {
return errors.New("session-id and client-name are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.RegisterRaw(ctx, *clientName)
return writeCommandOutput(stdout, *jsonOutput, "register", options, reply, err)
}
func runAddItem(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("add-item", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
item := flags.String("item", "", "item definition")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *item == "" {
return errors.New("session-id and item are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.AddItemRaw(ctx, int32(*serverHandle), *item)
return writeCommandOutput(stdout, *jsonOutput, "add-item", options, reply, err)
}
func runAdvise(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("advise", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.AdviseRaw(ctx, int32(*serverHandle), int32(*itemHandle))
return writeCommandOutput(stdout, *jsonOutput, "advise", options, reply, err)
}
func runSubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("subscribe-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
items := flags.String("items", "", "comma-separated item definitions")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *items == "" {
return errors.New("session-id and items are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.SubscribeBulk(ctx, int32(*serverHandle), parseStringList(*items))
return writeBulkOutput(stdout, *jsonOutput, "subscribe-bulk", options, results, err)
}
func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("unsubscribe-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *itemHandles == "" {
return errors.New("session-id and item-handles are required")
}
handles, err := parseInt32List(*itemHandles)
if err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles)
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
}
func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
items := flags.String("items", "", "comma-separated tag addresses")
timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *items == "" {
return errors.New("session-id and items are required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond)
return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err)
}
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false)
}
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true)
}
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false)
}
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true)
}
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
// the four bulk-write families. command selects which of the four routes
// runs; withTimestamp adds a --timestamp-value flag for the Write2 / Secured2
// variants. Secured-only flags (--current-user-id / --verifier-user-id) are
// only registered for the secured variants and the non-secured -user-id flag
// is only registered for Write/Write2, so a wrong-variant flag becomes a
// clean "flag provided but not defined" error instead of silently no-op'ing.
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool) error {
secured := command == "write-secured-bulk" || command == "write-secured2-bulk"
flags := flag.NewFlagSet(command, flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
values := flags.String("values", "", "comma-separated values (one per item handle)")
var userID, currentUserID, verifierUserID *int
if secured {
currentUserID = flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
verifierUserID = flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
} else {
userID = flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
}
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" || *itemHandles == "" || *values == "" {
return errors.New("session-id, item-handles, and values are required")
}
handles, err := parseInt32List(*itemHandles)
if err != nil {
return err
}
valueTexts := parseStringList(*values)
if len(handles) != len(valueTexts) {
return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts))
}
parsedValues := make([]*mxgateway.MxValue, len(handles))
for i, text := range valueTexts {
v, err := parseValue(*valueType, text)
if err != nil {
return fmt.Errorf("entry %d: %w", i, err)
}
parsedValues[i] = v
}
var tsValue *mxgateway.MxValue
if withTimestamp {
if *timestampValue == "" {
return errors.New("timestamp-value is required for write2/write-secured2 bulk variants")
}
parsed, err := parseRfc3339Timestamp(*timestampValue)
if err != nil {
return err
}
tsValue = parsed
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
var results []*mxgateway.BulkWriteResult
switch command {
case "write-bulk":
entries := make([]*mxgateway.WriteBulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)}
}
results, err = session.WriteBulk(ctx, int32(*serverHandle), entries)
case "write2-bulk":
entries := make([]*mxgateway.Write2BulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)}
}
results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries)
case "write-secured-bulk":
entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteSecuredBulkEntry{
ItemHandle: handles[i],
Value: parsedValues[i],
CurrentUserId: int32(*currentUserID),
VerifierUserId: int32(*verifierUserID),
}
}
results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries)
case "write-secured2-bulk":
entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles))
for i := range handles {
entries[i] = &mxgateway.WriteSecured2BulkEntry{
ItemHandle: handles[i],
Value: parsedValues[i],
TimestampValue: tsValue,
CurrentUserId: int32(*currentUserID),
VerifierUserId: int32(*verifierUserID),
}
}
results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries)
default:
return fmt.Errorf("unsupported bulk write command %q", command)
}
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
}
// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go:
// opens its own session, subscribes to bulk-size tags so the worker value cache
// populates from real OnDataChange events, runs ReadBulk in a tight loop for
// duration-seconds with per-call timing, and emits the shared JSON schema the
// scripts/bench-read-bulk.ps1 driver collates across all five clients.
func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-name", "mxgw-go-bench", "session client name")
durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds")
warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds")
bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call")
tagStart := flags.Int("tag-start", 1, "first machine number")
tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)")
tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix")
timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds")
if err := flags.Parse(args); err != nil {
return err
}
if *bulkSize < 1 {
return errors.New("bulk-size must be positive")
}
if *durationSeconds < 1 {
return errors.New("duration-seconds must be positive")
}
tags := make([]string, *bulkSize)
for i := 0; i < *bulkSize; i++ {
tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute)
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
if err != nil {
return err
}
defer func() {
_, _ = session.Close(context.Background())
}()
serverHandle, err := session.Register(ctx, *clientName)
if err != nil {
return err
}
subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags)
if err != nil {
return err
}
itemHandles := make([]int32, 0, len(subscribeResults))
for _, result := range subscribeResults {
if result.GetWasSuccessful() {
itemHandles = append(itemHandles, result.GetItemHandle())
}
}
defer func() {
if len(itemHandles) > 0 {
_, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles)
}
}()
// Warm-up: drive identical calls so any first-call JIT / connection-pool
// setup is amortised before the measurement window opens. Honor ctx so
// Ctrl+C or a parent-cancel (e.g. the cross-language bench driver killing
// the child early) exits promptly rather than spinning failing calls until
// the wall-clock deadline.
warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second)
timeout := time.Duration(*timeoutMs) * time.Millisecond
for time.Now().Before(warmupDeadline) && ctx.Err() == nil {
_, _ = session.ReadBulk(ctx, serverHandle, tags, timeout)
}
// Steady state: per-call latency captured via time.Now() deltas. Same ctx
// guard as warm-up; on cancel we stop the loop and report the truncated
// window faithfully.
latenciesMs := make([]float64, 0, 65536)
var totalReadResults int64
var cachedReadResults int64
var successfulCalls, failedCalls int
steadyStart := time.Now()
steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second)
for time.Now().Before(steadyDeadline) && ctx.Err() == nil {
callStart := time.Now()
results, err := session.ReadBulk(ctx, serverHandle, tags, timeout)
elapsed := time.Since(callStart)
latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6)
if err != nil {
failedCalls++
continue
}
successfulCalls++
for _, r := range results {
totalReadResults++
if r.GetWasCached() {
cachedReadResults++
}
}
}
steadyElapsed := time.Since(steadyStart)
totalCalls := successfulCalls + failedCalls
callsPerSecond := 0.0
if steadyElapsed.Seconds() > 0 {
callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds()
}
stats := map[string]any{
"language": "go",
"command": "bench-read-bulk",
"endpoint": options.Endpoint,
"clientName": *clientName,
"bulkSize": *bulkSize,
"durationSeconds": *durationSeconds,
"warmupSeconds": *warmupSeconds,
"durationMs": steadyElapsed.Milliseconds(),
"tags": tags,
"totalCalls": totalCalls,
"successfulCalls": successfulCalls,
"failedCalls": failedCalls,
"totalReadResults": totalReadResults,
"cachedReadResults": cachedReadResults,
"callsPerSecond": roundTo(callsPerSecond, 2),
"latencyMs": percentileSummary(latenciesMs),
}
if *jsonOutput {
return writeJSON(stdout, stats)
}
fmt.Fprintln(stdout, callsPerSecond)
return nil
}
// percentileSummary returns the same { p50, p95, p99, max, mean } shape every
// language bench emits, rounded to 3 decimal places so the PowerShell driver
// sees one schema across all five clients.
func percentileSummary(sample []float64) map[string]float64 {
if len(sample) == 0 {
return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0}
}
sorted := append([]float64(nil), sample...)
sort.Float64s(sorted)
mean := 0.0
max := sorted[len(sorted)-1]
for _, v := range sample {
mean += v
}
mean /= float64(len(sample))
return map[string]float64{
"p50": roundTo(percentile(sorted, 0.50), 3),
"p95": roundTo(percentile(sorted, 0.95), 3),
"p99": roundTo(percentile(sorted, 0.99), 3),
"max": roundTo(max, 3),
"mean": roundTo(mean, 3),
}
}
// percentile uses nearest-rank with linear interpolation; matches the .NET
// implementation so cross-language comparisons are apples-to-apples.
func percentile(sorted []float64, quantile float64) float64 {
if len(sorted) == 0 {
return 0
}
if len(sorted) == 1 {
return sorted[0]
}
rank := quantile * float64(len(sorted)-1)
lower := int(rank)
upper := lower + 1
if upper >= len(sorted) {
return sorted[lower]
}
fraction := rank - float64(lower)
return sorted[lower] + (sorted[upper]-sorted[lower])*fraction
}
func roundTo(value float64, digits int) float64 {
shift := 1.0
for i := 0; i < digits; i++ {
shift *= 10
}
return float64(int64(value*shift+0.5)) / shift
}
// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the
// MxValue protobuf representation used for the timestamped write families.
func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) {
t, err := time.Parse(time.RFC3339Nano, text)
if err != nil {
return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err)
}
return mxgateway.TimestampValue(t), nil
}
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("write", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
valueText := flags.String("value", "", "value text")
userID := flags.Int("user-id", 0, "MXAccess user id")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
value, err := parseValue(*valueType, *valueText)
if err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
reply, err := session.WriteRaw(ctx, int32(*serverHandle), int32(*itemHandle), value, int32(*userID))
return writeCommandOutput(stdout, *jsonOutput, "write", options, reply, err)
}
func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("stream-events", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
sessionID := flags.String("session-id", "", "gateway session id")
after := flags.Uint64("after-worker-sequence", 0, "first worker sequence to read after")
limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded")
if err := flags.Parse(args); err != nil {
return err
}
if *sessionID == "" {
return errors.New("session-id is required")
}
client, _, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
// Mirror runGalaxyWatch so Ctrl+C on a long-running stream-events command
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
// than a torn TCP connection) and the deferred subscription.Close() /
// client.Close() actually run.
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignals()
session := mxgateway.NewSessionForID(client, *sessionID)
streamCtx, cancelStream := context.WithCancel(signalCtx)
defer cancelStream()
subscription, err := session.SubscribeEventsAfter(streamCtx, *after)
if err != nil {
return err
}
defer subscription.Close()
events := subscription.Events()
count := 0
for result := range events {
if result.Err != nil {
return result.Err
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(result.Event)))
} else {
fmt.Fprintf(stdout, "%d %s\n", result.Event.GetWorkerSequence(), result.Event.GetFamily())
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
return nil
}
}
return nil
}
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
if err := flags.Parse(args); err != nil {
return err
}
client, _, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
// than a torn TCP connection) and the deferred client.Close() actually runs.
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignals()
streamCtx, cancelStream := context.WithCancel(signalCtx)
defer cancelStream()
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
if err != nil {
return err
}
count := 0
for {
message, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
} else {
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
return nil
}
}
}
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
// output style, distinguishing the active-alarm snapshot, snapshot-complete
// sentinel, and transition cases of the message's payload oneof.
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
switch {
case message.GetActiveAlarm() != nil:
alarm := message.GetActiveAlarm()
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
case message.GetSnapshotComplete():
return "snapshot-complete"
case message.GetTransition() != nil:
transition := message.GetTransition()
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
default:
return "unknown"
}
}
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
reference := flags.String("reference", "", "full alarm reference to acknowledge")
comment := flags.String("comment", "", "operator acknowledge comment")
operator := flags.String("operator", "", "operator user performing the acknowledge")
if err := flags.Parse(args); err != nil {
return err
}
if *reference == "" {
return errors.New("reference is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
AlarmFullReference: *reference,
Comment: *comment,
OperatorUser: *operator,
})
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: "acknowledge-alarm",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetHresult())
return nil
}
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
clientName := flags.String("client-name", "mxgw-go-smoke", "MXAccess client name")
item := flags.String("item", "", "item definition")
if err := flags.Parse(args); err != nil {
return err
}
if *item == "" {
return errors.New("item is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
if err != nil {
return err
}
serverHandle, err := session.Register(ctx, *clientName)
if err != nil {
return closeSmokeSession(ctx, session, err)
}
itemHandle, err := session.AddItem(ctx, serverHandle, *item)
if err != nil {
return closeSmokeSession(ctx, session, err)
}
if err := session.Advise(ctx, serverHandle, itemHandle); err != nil {
return closeSmokeSession(ctx, session, err)
}
if err := closeSmokeSession(ctx, session, nil); err != nil {
return err
}
output := map[string]any{
"command": "smoke",
"options": options,
"sessionId": session.ID(),
"serverHandle": serverHandle,
"itemHandle": itemHandle,
}
if *jsonOutput {
return writeJSON(stdout, output)
}
fmt.Fprintf(stdout, "session=%s server=%d item=%d\n", session.ID(), serverHandle, itemHandle)
return nil
}
func closeSmokeSession(ctx context.Context, session *mxgateway.Session, primaryErr error) error {
closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if deadline, ok := ctx.Deadline(); ok {
if until := time.Until(deadline); until > 0 && until < 5*time.Second {
cancel()
closeCtx, cancel = context.WithTimeout(context.Background(), until)
defer cancel()
}
}
_, closeErr := session.Close(closeCtx)
if primaryErr != nil {
return primaryErr
}
return closeErr
}
func parseStringList(value string) []string {
parts := strings.Split(value, ",")
items := make([]string, 0, len(parts))
for _, part := range parts {
item := strings.TrimSpace(part)
if item != "" {
items = append(items, item)
}
}
return items
}
func parseInt32List(value string) ([]int32, error) {
parts := strings.Split(value, ",")
items := make([]int32, 0, len(parts))
for _, part := range parts {
item := strings.TrimSpace(part)
if item == "" {
continue
}
parsed, err := strconv.ParseInt(item, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid item handle %q: %w", item, err)
}
items = append(items, int32(parsed))
}
return items, nil
}
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
common := &commonOptions{}
flags.StringVar(&common.Endpoint, "endpoint", "localhost:5000", "gateway endpoint")
flags.StringVar(&common.APIKey, "api-key", "", "gateway API key")
flags.StringVar(&common.APIKeyEnv, "api-key-env", "MXGATEWAY_API_KEY", "environment variable containing the API key")
flags.BoolVar(&common.Plaintext, "plaintext", false, "use plaintext transport")
flags.StringVar(&common.CACertFile, "ca-cert", "", "CA certificate file")
flags.StringVar(&common.ServerName, "server-name-override", "", "TLS server name override")
flags.StringVar(&common.CallTimeout, "call-timeout", "30s", "per-call timeout")
return common
}
func dialForCommand(ctx context.Context, common *commonOptions) (*mxgateway.Client, commonOptions, error) {
options, err := common.resolved()
if err != nil {
return nil, options, err
}
client, err := mxgateway.Dial(ctx, mxgateway.Options{
Endpoint: options.Endpoint,
APIKey: options.apiKeyValue,
Plaintext: options.Plaintext,
CACertFile: options.CACertFile,
ServerNameOverride: options.ServerName,
CallTimeout: options.timeout,
})
return client, options, err
}
func (o *commonOptions) resolved() (commonOptions, error) {
resolved := *o
if resolved.APIKey == "" && resolved.APIKeyEnv != "" {
resolved.apiKeyValue = os.Getenv(resolved.APIKeyEnv)
} else {
resolved.apiKeyValue = resolved.APIKey
}
resolved.APIKey = mxgateway.RedactAPIKey(resolved.apiKeyValue)
if resolved.CallTimeout != "" {
timeout, err := time.ParseDuration(resolved.CallTimeout)
if err != nil {
return resolved, err
}
resolved.timeout = timeout
}
return resolved, nil
}
func parseValue(valueType, valueText string) (*mxgateway.MxValue, error) {
switch valueType {
case "bool":
value, err := strconv.ParseBool(valueText)
if err != nil {
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
}
return mxgateway.BoolValue(value), nil
case "int32":
value, err := strconv.ParseInt(valueText, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
}
return mxgateway.Int32Value(int32(value)), nil
case "int64":
value, err := strconv.ParseInt(valueText, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
}
return mxgateway.Int64Value(value), nil
case "float":
value, err := strconv.ParseFloat(valueText, 32)
if err != nil {
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
}
return mxgateway.FloatValue(float32(value)), nil
case "double":
value, err := strconv.ParseFloat(valueText, 64)
if err != nil {
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
}
return mxgateway.DoubleValue(value), nil
case "string":
return mxgateway.StringValue(valueText), nil
default:
return nil, fmt.Errorf("unsupported value type %q", valueType)
}
}
func writeCommandOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, reply *mxgateway.MxCommandReply, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: command,
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetKind())
return nil
}
func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.SubscribeResult, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, map[string]any{
"command": command,
"options": options,
"results": results,
})
}
fmt.Fprintln(stdout, len(results))
return nil
}
func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, map[string]any{
"command": command,
"options": options,
"results": results,
})
}
fmt.Fprintln(stdout, len(results))
return nil
}
func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error {
if err != nil {
return err
}
if jsonOutput {
return writeJSON(stdout, map[string]any{
"command": command,
"options": options,
"results": results,
})
}
fmt.Fprintln(stdout, len(results))
return nil
}
func writeJSON(writer io.Writer, value any) error {
encoder := json.NewEncoder(writer)
encoder.SetIndent("", " ")
return encoder.Encode(value)
}
func mustMarshalProto(message protojsonMessage) json.RawMessage {
data, err := protojson.MarshalOptions{UseProtoNames: false}.Marshal(message)
if err != nil {
panic(err)
}
return data
}
type protojsonMessage interface {
ProtoReflect() protoreflect.Message
}
// batchEOR is the end-of-result sentinel emitted to stdout after every command
// in batch mode, regardless of success or failure.
const batchEOR = "__MXGW_BATCH_EOR__"
// runBatch reads one command line at a time from in, dispatches each via the
// normal runWithIO routing, and writes a batchEOR sentinel to stdout after
// every result. Errors are serialised as JSON to stdout (not stderr) so the
// harness can parse them without interleaving stderr. The loop never terminates
// on command error; only stdin EOF (or an empty line) ends the session.
func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error {
bw := bufio.NewWriter(stdout)
scanner := bufio.NewScanner(in)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
break
}
args := strings.Fields(line)
if len(args) == 0 {
continue
}
if err := runWithIO(ctx, args, bw, stderr); err != nil {
// Write error as JSON to stdout (bw) so the harness sees it in the
// same stream as normal output, framed by the EOR sentinel.
errPayload := map[string]string{
"error": err.Error(),
"type": "error",
}
_ = writeJSON(bw, errPayload)
}
_, _ = fmt.Fprintln(bw, batchEOR)
_ = bw.Flush()
}
return scanner.Err()
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
options, err := common.resolved()
if err != nil {
return nil, options, err
}
client, err := mxgateway.DialGalaxy(ctx, mxgateway.Options{
Endpoint: options.Endpoint,
APIKey: options.apiKeyValue,
Plaintext: options.Plaintext,
CACertFile: options.CACertFile,
ServerNameOverride: options.ServerName,
CallTimeout: options.timeout,
})
return client, options, err
}
func runGalaxyTestConnection(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("galaxy-test-connection", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
client, options, err := dialGalaxyForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
ok, err := client.TestConnection(ctx)
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, map[string]any{
"command": "galaxy-test-connection",
"options": options,
"ok": ok,
})
}
fmt.Fprintln(stdout, ok)
return nil
}
func runGalaxyLastDeploy(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("galaxy-last-deploy", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
client, options, err := dialGalaxyForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
deployTime, present, err := client.GetLastDeployTime(ctx)
if err != nil {
return err
}
if *jsonOutput {
payload := map[string]any{
"command": "galaxy-last-deploy",
"options": options,
"present": present,
}
if present {
payload["timeOfLastDeploy"] = deployTime.UTC().Format(time.RFC3339Nano)
}
return writeJSON(stdout, payload)
}
if !present {
fmt.Fprintln(stdout, "absent")
return nil
}
fmt.Fprintln(stdout, deployTime.UTC().Format(time.RFC3339Nano))
return nil
}
func runGalaxyDiscover(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("galaxy-discover", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
if err := flags.Parse(args); err != nil {
return err
}
client, options, err := dialGalaxyForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
objects, err := client.DiscoverHierarchy(ctx)
if err != nil {
return err
}
if *jsonOutput {
marshaled := make([]json.RawMessage, 0, len(objects))
for _, obj := range objects {
marshaled = append(marshaled, mustMarshalProto(obj))
}
return writeJSON(stdout, map[string]any{
"command": "galaxy-discover",
"options": options,
"objects": marshaled,
})
}
for _, obj := range objects {
fmt.Fprintf(stdout, "%d\t%s\t%s\t(attrs=%d)\n", obj.GetGobjectId(), obj.GetTagName(), obj.GetContainedName(), len(obj.GetAttributes()))
}
return nil
}
func runGalaxyWatch(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("galaxy-watch", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
lastSeen := flags.String("last-seen-deploy-time", "", "RFC 3339 timestamp (with optional fractional seconds); when set, suppresses the bootstrap event")
limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded (Ctrl+C to stop)")
if err := flags.Parse(args); err != nil {
return err
}
var lastSeenPtr *time.Time
if *lastSeen != "" {
// Use RFC3339Nano so values copy-pasted from galaxy-watch -json output
// (which formatDeployEvent emits with fractional seconds) round-trip;
// RFC3339Nano also accepts whole-second values, so the layout switch is
// strictly broader than the previous time.RFC3339 parse.
parsed, err := time.Parse(time.RFC3339Nano, *lastSeen)
if err != nil {
return fmt.Errorf("invalid -last-seen-deploy-time: %w", err)
}
lastSeenPtr = &parsed
}
client, _, err := dialGalaxyForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignals()
streamCtx, cancelStream := context.WithCancel(signalCtx)
defer cancelStream()
events, errs, err := client.WatchDeployEvents(streamCtx, lastSeenPtr)
if err != nil {
return err
}
count := 0
for {
select {
case event, ok := <-events:
if !ok {
// Drain any terminal error before returning.
if streamErr, errOk := <-errs; errOk && streamErr != nil {
return streamErr
}
return nil
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(event)))
} else {
fmt.Fprintln(stdout, formatDeployEvent(event))
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
// Allow goroutine to drain.
for range events {
}
return nil
}
case streamErr, ok := <-errs:
if !ok {
return nil
}
if streamErr != nil {
return streamErr
}
case <-signalCtx.Done():
cancelStream()
// Allow goroutine to drain.
for range events {
}
return nil
}
}
}
func formatDeployEvent(event *mxgateway.DeployEvent) string {
observed := ""
if ts := event.GetObservedAt(); ts != nil {
observed = ts.AsTime().UTC().Format(time.RFC3339Nano)
}
deploy := "absent"
if event.GetTimeOfLastDeployPresent() {
if ts := event.GetTimeOfLastDeploy(); ts != nil {
deploy = ts.AsTime().UTC().Format(time.RFC3339Nano)
}
}
return fmt.Sprintf(
"seq=%d observed=%s deploy=%s objects=%d attributes=%d",
event.GetSequence(),
observed,
deploy,
event.GetObjectCount(),
event.GetAttributeCount(),
)
}