// Command mxgw-go is the reference Go CLI for the MXAccess Gateway. // // It exposes versioning, session lifecycle, command invocation, event // streaming, a smoke-test workflow, and Galaxy Repository browse subcommands // that exercise the same gRPC contract used by the mxgateway library. package main import ( "bufio" "context" "encoding/json" "errors" "flag" "fmt" "io" "os" "os/signal" "sort" "strconv" "strings" "syscall" "time" "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" ) type versionOutput struct { ClientVersion string `json:"clientVersion"` GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"` WorkerProtocolVersion uint32 `json:"workerProtocolVersion"` } type commonOptions struct { Endpoint string `json:"endpoint"` APIKey string `json:"apiKey"` APIKeyEnv string `json:"apiKeyEnv,omitempty"` Plaintext bool `json:"plaintext"` CACertFile string `json:"caCertFile,omitempty"` ServerName string `json:"serverNameOverride,omitempty"` CallTimeout string `json:"callTimeout,omitempty"` apiKeyValue string timeout time.Duration } type openSessionOutput struct { Command string `json:"command"` Options commonOptions `json:"options"` Reply json.RawMessage `json:"reply"` } type commandReplyOutput struct { Command string `json:"command"` Options commonOptions `json:"options"` Reply json.RawMessage `json:"reply"` } func main() { if err := runWithIO(context.Background(), os.Args[1:], os.Stdout, os.Stderr); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(2) } } func run(args []string) error { return runWithIO(context.Background(), args, os.Stdout, os.Stderr) } func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) error { if len(args) == 0 { writeUsage(stderr) return errors.New("missing command") } switch args[0] { case "version": return runVersion(args[1:], stdout, stderr) case "open-session": return runOpenSession(ctx, args[1:], stdout, stderr) case "close-session": return runCloseSession(ctx, args[1:], stdout, stderr) case "register": return runRegister(ctx, args[1:], stdout, stderr) case "add-item": return runAddItem(ctx, args[1:], stdout, stderr) case "advise": return runAdvise(ctx, args[1:], stdout, stderr) case "subscribe-bulk": return runSubscribeBulk(ctx, args[1:], stdout, stderr) case "unsubscribe-bulk": return runUnsubscribeBulk(ctx, args[1:], stdout, stderr) case "read-bulk": return runReadBulk(ctx, args[1:], stdout, stderr) case "write-bulk": return runWriteBulk(ctx, args[1:], stdout, stderr) case "write2-bulk": return runWrite2Bulk(ctx, args[1:], stdout, stderr) case "write-secured-bulk": return runWriteSecuredBulk(ctx, args[1:], stdout, stderr) case "write-secured2-bulk": return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr) case "bench-read-bulk": return runBenchReadBulk(ctx, args[1:], stdout, stderr) case "write": return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": return runStreamEvents(ctx, args[1:], stdout, stderr) case "stream-alarms": return runStreamAlarms(ctx, args[1:], stdout, stderr) case "acknowledge-alarm": return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr) case "smoke": return runSmoke(ctx, args[1:], stdout, stderr) case "galaxy-test-connection": return runGalaxyTestConnection(ctx, args[1:], stdout, stderr) case "galaxy-last-deploy": return runGalaxyLastDeploy(ctx, args[1:], stdout, stderr) case "galaxy-discover": return runGalaxyDiscover(ctx, args[1:], stdout, stderr) case "galaxy-watch": return runGalaxyWatch(ctx, args[1:], stdout, stderr) case "batch": return runBatch(ctx, os.Stdin, stdout, stderr) default: writeUsage(stderr) return fmt.Errorf("unknown command %q", args[0]) } } func runVersion(args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("version", flag.ContinueOnError) flags.SetOutput(stderr) jsonOutput := flags.Bool("json", false, "write JSON output") if err := flags.Parse(args); err != nil { return err } output := versionOutput{ ClientVersion: mxgateway.ClientVersion, GatewayProtocolVersion: mxgateway.GatewayProtocolVersion, WorkerProtocolVersion: mxgateway.WorkerProtocolVersion, } if *jsonOutput { return writeJSON(stdout, output) } fmt.Fprintf(stdout, "mxgw-go %s\n", output.ClientVersion) fmt.Fprintf(stdout, "gateway protocol %d\n", output.GatewayProtocolVersion) fmt.Fprintf(stdout, "worker protocol %d\n", output.WorkerProtocolVersion) return nil } func runOpenSession(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("open-session", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") clientName := flags.String("client-session-name", "", "client session name") backend := flags.String("backend", "", "requested backend") if err := flags.Parse(args); err != nil { return err } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() reply, err := client.OpenSessionRaw(ctx, (&mxgateway.OpenSessionOptions{ RequestedBackend: *backend, ClientSessionName: *clientName, }).Request()) if err != nil { return err } if *jsonOutput { return writeJSON(stdout, openSessionOutput{ Command: "open-session", Options: options, Reply: mustMarshalProto(reply), }) } fmt.Fprintln(stdout, reply.GetSessionId()) return nil } func runCloseSession(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("close-session", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" { return errors.New("session-id is required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() reply, err := client.CloseSessionRaw(ctx, &mxgateway.CloseSessionRequest{SessionId: *sessionID}) if err != nil { return err } if *jsonOutput { return writeJSON(stdout, commandReplyOutput{ Command: "close-session", Options: options, Reply: mustMarshalProto(reply), }) } fmt.Fprintln(stdout, reply.GetFinalState()) return nil } func runRegister(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("register", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") clientName := flags.String("client-name", "", "MXAccess client name") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *clientName == "" { return errors.New("session-id and client-name are required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) reply, err := session.RegisterRaw(ctx, *clientName) return writeCommandOutput(stdout, *jsonOutput, "register", options, reply, err) } func runAddItem(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("add-item", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") item := flags.String("item", "", "item definition") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *item == "" { return errors.New("session-id and item are required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) reply, err := session.AddItemRaw(ctx, int32(*serverHandle), *item) return writeCommandOutput(stdout, *jsonOutput, "add-item", options, reply, err) } func runAdvise(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("advise", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") itemHandle := flags.Int("item-handle", 0, "MXAccess item handle") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" { return errors.New("session-id is required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) reply, err := session.AdviseRaw(ctx, int32(*serverHandle), int32(*itemHandle)) return writeCommandOutput(stdout, *jsonOutput, "advise", options, reply, err) } func runSubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("subscribe-bulk", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") items := flags.String("items", "", "comma-separated item definitions") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *items == "" { return errors.New("session-id and items are required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) results, err := session.SubscribeBulk(ctx, int32(*serverHandle), parseStringList(*items)) return writeBulkOutput(stdout, *jsonOutput, "subscribe-bulk", options, results, err) } func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("unsubscribe-bulk", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") itemHandles := flags.String("item-handles", "", "comma-separated item handles") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *itemHandles == "" { return errors.New("session-id and item-handles are required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() handles, err := parseInt32List(*itemHandles) if err != nil { return err } session := mxgateway.NewSessionForID(client, *sessionID) results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles) return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err) } func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") items := flags.String("items", "", "comma-separated tag addresses") timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *items == "" { return errors.New("session-id and items are required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond) return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err) } func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false) } func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true) } func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false) } func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true) } // runWriteBulkVariant shares the flag-parsing + entry-build skeleton across // the four bulk-write families. The variant is derived from command alone; // withTimestamp adds a --timestamp-value flag. To keep wrong-variant flags // from silently no-op'ing, secured-only flags (-current-user-id / // -verifier-user-id) are only registered for the secured variants, and // -user-id only for the non-secured Write/Write2 variants — a wrong-variant // flag then surfaces as a clean "flag provided but not defined" error. func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool) error { secured := command == "write-secured-bulk" || command == "write-secured2-bulk" flags := flag.NewFlagSet(command, flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") itemHandles := flags.String("item-handles", "", "comma-separated item handles") valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string") values := flags.String("values", "", "comma-separated values (one per item handle)") var ( userID *int currentUserID *int verifierUserID *int ) if secured { currentUserID = flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)") verifierUserID = flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)") } else { userID = flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)") } timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" || *itemHandles == "" || *values == "" { return errors.New("session-id, item-handles, and values are required") } handles, err := parseInt32List(*itemHandles) if err != nil { return err } valueTexts := parseStringList(*values) if len(handles) != len(valueTexts) { return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts)) } parsedValues := make([]*mxgateway.MxValue, len(handles)) for i, text := range valueTexts { v, err := parseValue(*valueType, text) if err != nil { return fmt.Errorf("entry %d: %w", i, err) } parsedValues[i] = v } var tsValue *mxgateway.MxValue if withTimestamp { if *timestampValue == "" { return errors.New("timestamp-value is required for write2/write-secured2 bulk variants") } parsed, err := parseRfc3339Timestamp(*timestampValue) if err != nil { return err } tsValue = parsed } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) var results []*mxgateway.BulkWriteResult switch command { case "write-bulk": entries := make([]*mxgateway.WriteBulkEntry, len(handles)) for i := range handles { entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)} } results, err = session.WriteBulk(ctx, int32(*serverHandle), entries) case "write2-bulk": entries := make([]*mxgateway.Write2BulkEntry, len(handles)) for i := range handles { entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)} } results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries) case "write-secured-bulk": entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles)) for i := range handles { entries[i] = &mxgateway.WriteSecuredBulkEntry{ ItemHandle: handles[i], Value: parsedValues[i], CurrentUserId: int32(*currentUserID), VerifierUserId: int32(*verifierUserID), } } results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries) case "write-secured2-bulk": entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles)) for i := range handles { entries[i] = &mxgateway.WriteSecured2BulkEntry{ ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, CurrentUserId: int32(*currentUserID), VerifierUserId: int32(*verifierUserID), } } results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries) default: return fmt.Errorf("unsupported bulk write command %q", command) } return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err) } // parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the // MxValue protobuf representation used for the timestamped write families. func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) { t, err := time.Parse(time.RFC3339Nano, text) if err != nil { return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err) } return mxgateway.TimestampValue(t), nil } // runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go: // opens its own session, subscribes to bulk-size tags so the worker value cache // populates from real OnDataChange events, runs ReadBulk in a tight loop for // duration-seconds with per-call timing, and emits the shared JSON schema the // scripts/bench-read-bulk.ps1 driver collates across all five clients. func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") clientName := flags.String("client-name", "mxgw-go-bench", "session client name") durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds") warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds") bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call") tagStart := flags.Int("tag-start", 1, "first machine number") tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)") tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix") timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds") if err := flags.Parse(args); err != nil { return err } if *bulkSize < 1 { return errors.New("bulk-size must be positive") } if *durationSeconds < 1 { return errors.New("duration-seconds must be positive") } tags := make([]string, *bulkSize) for i := 0; i < *bulkSize; i++ { tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute) } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName}) if err != nil { return err } defer func() { _, _ = session.Close(context.Background()) }() serverHandle, err := session.Register(ctx, *clientName) if err != nil { return err } subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags) if err != nil { return err } itemHandles := make([]int32, 0, len(subscribeResults)) for _, result := range subscribeResults { if result.GetWasSuccessful() { itemHandles = append(itemHandles, result.GetItemHandle()) } } defer func() { if len(itemHandles) > 0 { _, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles) } }() // Warm-up: drive identical calls so any first-call JIT / connection-pool // setup is amortised before the measurement window opens. The ctx.Err() // guard short-circuits on Ctrl+C / parent-cancel instead of spinning // failing ReadBulk calls until the wall-clock deadline elapses. warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second) timeout := time.Duration(*timeoutMs) * time.Millisecond for time.Now().Before(warmupDeadline) && ctx.Err() == nil { _, _ = session.ReadBulk(ctx, serverHandle, tags, timeout) } // Steady state: per-call latency captured via time.Now() deltas. latenciesMs := make([]float64, 0, 65536) var totalReadResults int64 var cachedReadResults int64 var successfulCalls, failedCalls int steadyStart := time.Now() steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second) for time.Now().Before(steadyDeadline) && ctx.Err() == nil { callStart := time.Now() results, err := session.ReadBulk(ctx, serverHandle, tags, timeout) elapsed := time.Since(callStart) latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6) if err != nil { failedCalls++ continue } successfulCalls++ for _, r := range results { totalReadResults++ if r.GetWasCached() { cachedReadResults++ } } } steadyElapsed := time.Since(steadyStart) totalCalls := successfulCalls + failedCalls callsPerSecond := 0.0 if steadyElapsed.Seconds() > 0 { callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds() } stats := map[string]any{ "language": "go", "command": "bench-read-bulk", "endpoint": options.Endpoint, "clientName": *clientName, "bulkSize": *bulkSize, "durationSeconds": *durationSeconds, "warmupSeconds": *warmupSeconds, "durationMs": steadyElapsed.Milliseconds(), "tags": tags, "totalCalls": totalCalls, "successfulCalls": successfulCalls, "failedCalls": failedCalls, "totalReadResults": totalReadResults, "cachedReadResults": cachedReadResults, "callsPerSecond": roundTo(callsPerSecond, 2), "latencyMs": percentileSummary(latenciesMs), } if *jsonOutput { return writeJSON(stdout, stats) } fmt.Fprintln(stdout, callsPerSecond) return nil } // percentileSummary returns the same { p50, p95, p99, max, mean } shape every // language bench emits, rounded to 3 decimal places so the PowerShell driver // sees one schema across all five clients. func percentileSummary(sample []float64) map[string]float64 { if len(sample) == 0 { return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0} } sorted := append([]float64(nil), sample...) sort.Float64s(sorted) mean := 0.0 maxValue := sorted[len(sorted)-1] for _, v := range sample { mean += v } mean /= float64(len(sample)) return map[string]float64{ "p50": roundTo(percentile(sorted, 0.50), 3), "p95": roundTo(percentile(sorted, 0.95), 3), "p99": roundTo(percentile(sorted, 0.99), 3), "max": roundTo(maxValue, 3), "mean": roundTo(mean, 3), } } // percentile uses nearest-rank with linear interpolation; matches the .NET // implementation so cross-language comparisons are apples-to-apples. func percentile(sorted []float64, quantile float64) float64 { if len(sorted) == 0 { return 0 } if len(sorted) == 1 { return sorted[0] } rank := quantile * float64(len(sorted)-1) lower := int(rank) upper := lower + 1 if upper >= len(sorted) { return sorted[lower] } fraction := rank - float64(lower) return sorted[lower] + (sorted[upper]-sorted[lower])*fraction } func roundTo(value float64, digits int) float64 { shift := 1.0 for i := 0; i < digits; i++ { shift *= 10 } return float64(int64(value*shift+0.5)) / shift } func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("write", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") itemHandle := flags.Int("item-handle", 0, "MXAccess item handle") valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string") valueText := flags.String("value", "", "value text") userID := flags.Int("user-id", 0, "MXAccess user id") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" { return errors.New("session-id is required") } value, err := parseValue(*valueType, *valueText) if err != nil { return err } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) reply, err := session.WriteRaw(ctx, int32(*serverHandle), int32(*itemHandle), value, int32(*userID)) return writeCommandOutput(stdout, *jsonOutput, "write", options, reply, err) } func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("stream-events", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") sessionID := flags.String("session-id", "", "gateway session id") after := flags.Uint64("after-worker-sequence", 0, "first worker sequence to read after") limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded") if err := flags.Parse(args); err != nil { return err } if *sessionID == "" { return errors.New("session-id is required") } client, _, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) streamCtx, cancelStream := context.WithCancel(ctx) defer cancelStream() subscription, err := session.SubscribeEventsAfter(streamCtx, *after) if err != nil { return err } defer subscription.Close() events := subscription.Events() count := 0 for result := range events { if result.Err != nil { return result.Err } if *jsonOutput { fmt.Fprintln(stdout, string(mustMarshalProto(result.Event))) } else { fmt.Fprintf(stdout, "%d %s\n", result.Event.GetWorkerSequence(), result.Event.GetFamily()) } count++ if *limit > 0 && count >= *limit { cancelStream() return nil } } return nil } func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped") limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded") if err := flags.Parse(args); err != nil { return err } client, _, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() // Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command // cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather // than a torn TCP connection) and the deferred client.Close() actually runs. signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) defer stopSignals() streamCtx, cancelStream := context.WithCancel(signalCtx) defer cancelStream() stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix}) if err != nil { return err } count := 0 for { message, err := stream.Recv() if errors.Is(err, io.EOF) { return nil } if err != nil { return err } if *jsonOutput { fmt.Fprintln(stdout, string(mustMarshalProto(message))) } else { fmt.Fprintln(stdout, formatAlarmFeedMessage(message)) } count++ if *limit > 0 && count >= *limit { cancelStream() return nil } } } // formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text // output style, distinguishing the active-alarm snapshot, snapshot-complete // sentinel, and transition cases of the message's payload oneof. func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string { switch { case message.GetActiveAlarm() != nil: alarm := message.GetActiveAlarm() return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity()) case message.GetSnapshotComplete(): return "snapshot-complete" case message.GetTransition() != nil: transition := message.GetTransition() return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity()) default: return "unknown" } } func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") reference := flags.String("reference", "", "full alarm reference to acknowledge") comment := flags.String("comment", "", "operator acknowledge comment") operator := flags.String("operator", "", "operator user performing the acknowledge") if err := flags.Parse(args); err != nil { return err } if *reference == "" { return errors.New("reference is required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{ AlarmFullReference: *reference, Comment: *comment, OperatorUser: *operator, }) if err != nil { return err } if *jsonOutput { return writeJSON(stdout, commandReplyOutput{ Command: "acknowledge-alarm", Options: options, Reply: mustMarshalProto(reply), }) } fmt.Fprintln(stdout, reply.GetHresult()) return nil } func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("smoke", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") clientName := flags.String("client-name", "mxgw-go-smoke", "MXAccess client name") item := flags.String("item", "", "item definition") if err := flags.Parse(args); err != nil { return err } if *item == "" { return errors.New("item is required") } client, options, err := dialForCommand(ctx, common) if err != nil { return err } defer client.Close() session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName}) if err != nil { return err } serverHandle, err := session.Register(ctx, *clientName) if err != nil { return closeSmokeSession(ctx, session, err) } itemHandle, err := session.AddItem(ctx, serverHandle, *item) if err != nil { return closeSmokeSession(ctx, session, err) } if err := session.Advise(ctx, serverHandle, itemHandle); err != nil { return closeSmokeSession(ctx, session, err) } if err := closeSmokeSession(ctx, session, nil); err != nil { return err } output := map[string]any{ "command": "smoke", "options": options, "sessionId": session.ID(), "serverHandle": serverHandle, "itemHandle": itemHandle, } if *jsonOutput { return writeJSON(stdout, output) } fmt.Fprintf(stdout, "session=%s server=%d item=%d\n", session.ID(), serverHandle, itemHandle) return nil } func closeSmokeSession(ctx context.Context, session *mxgateway.Session, primaryErr error) error { closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if deadline, ok := ctx.Deadline(); ok { if until := time.Until(deadline); until > 0 && until < 5*time.Second { cancel() closeCtx, cancel = context.WithTimeout(context.Background(), until) defer cancel() } } _, closeErr := session.Close(closeCtx) if primaryErr != nil { return primaryErr } return closeErr } func parseStringList(value string) []string { parts := strings.Split(value, ",") items := make([]string, 0, len(parts)) for _, part := range parts { item := strings.TrimSpace(part) if item != "" { items = append(items, item) } } return items } func parseInt32List(value string) ([]int32, error) { parts := strings.Split(value, ",") items := make([]int32, 0, len(parts)) for _, part := range parts { item := strings.TrimSpace(part) if item == "" { continue } parsed, err := strconv.ParseInt(item, 10, 32) if err != nil { return nil, fmt.Errorf("invalid item handle %q: %w", item, err) } items = append(items, int32(parsed)) } return items, nil } func bindCommonFlags(flags *flag.FlagSet) *commonOptions { common := &commonOptions{} flags.StringVar(&common.Endpoint, "endpoint", "localhost:5000", "gateway endpoint") flags.StringVar(&common.APIKey, "api-key", "", "gateway API key") flags.StringVar(&common.APIKeyEnv, "api-key-env", "MXGATEWAY_API_KEY", "environment variable containing the API key") flags.BoolVar(&common.Plaintext, "plaintext", false, "use plaintext transport") flags.StringVar(&common.CACertFile, "ca-cert", "", "CA certificate file") flags.StringVar(&common.ServerName, "server-name-override", "", "TLS server name override") flags.StringVar(&common.CallTimeout, "call-timeout", "30s", "per-call timeout") return common } func dialForCommand(ctx context.Context, common *commonOptions) (*mxgateway.Client, commonOptions, error) { options, err := common.resolved() if err != nil { return nil, options, err } client, err := mxgateway.Dial(ctx, mxgateway.Options{ Endpoint: options.Endpoint, APIKey: options.apiKeyValue, Plaintext: options.Plaintext, CACertFile: options.CACertFile, ServerNameOverride: options.ServerName, CallTimeout: options.timeout, }) return client, options, err } func (o *commonOptions) resolved() (commonOptions, error) { resolved := *o if resolved.APIKey == "" && resolved.APIKeyEnv != "" { resolved.apiKeyValue = os.Getenv(resolved.APIKeyEnv) } else { resolved.apiKeyValue = resolved.APIKey } resolved.APIKey = mxgateway.RedactAPIKey(resolved.apiKeyValue) if resolved.CallTimeout != "" { timeout, err := time.ParseDuration(resolved.CallTimeout) if err != nil { return resolved, err } resolved.timeout = timeout } return resolved, nil } func parseValue(valueType, valueText string) (*mxgateway.MxValue, error) { switch valueType { case "bool": value, err := strconv.ParseBool(valueText) if err != nil { return nil, err } return mxgateway.BoolValue(value), nil case "int32": value, err := strconv.ParseInt(valueText, 10, 32) if err != nil { return nil, err } return mxgateway.Int32Value(int32(value)), nil case "int64": value, err := strconv.ParseInt(valueText, 10, 64) if err != nil { return nil, err } return mxgateway.Int64Value(value), nil case "float": value, err := strconv.ParseFloat(valueText, 32) if err != nil { return nil, err } return mxgateway.FloatValue(float32(value)), nil case "double": value, err := strconv.ParseFloat(valueText, 64) if err != nil { return nil, err } return mxgateway.DoubleValue(value), nil case "string": return mxgateway.StringValue(valueText), nil default: return nil, fmt.Errorf("unsupported value type %q", valueType) } } func writeCommandOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, reply *mxgateway.MxCommandReply, err error) error { if err != nil { return err } if jsonOutput { return writeJSON(stdout, commandReplyOutput{ Command: command, Options: options, Reply: mustMarshalProto(reply), }) } fmt.Fprintln(stdout, reply.GetKind()) return nil } func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.SubscribeResult, err error) error { if err != nil { return err } if jsonOutput { return writeJSON(stdout, map[string]any{ "command": command, "options": options, "results": results, }) } fmt.Fprintln(stdout, len(results)) return nil } func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error { if err != nil { return err } if jsonOutput { return writeJSON(stdout, map[string]any{ "command": command, "options": options, "results": results, }) } fmt.Fprintln(stdout, len(results)) return nil } func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error { if err != nil { return err } if jsonOutput { return writeJSON(stdout, map[string]any{ "command": command, "options": options, "results": results, }) } fmt.Fprintln(stdout, len(results)) return nil } func writeJSON(writer io.Writer, value any) error { encoder := json.NewEncoder(writer) encoder.SetIndent("", " ") return encoder.Encode(value) } func mustMarshalProto(message protojsonMessage) json.RawMessage { data, err := protojson.MarshalOptions{UseProtoNames: false}.Marshal(message) if err != nil { panic(err) } return data } type protojsonMessage interface { ProtoReflect() protoreflect.Message } func writeUsage(writer io.Writer) { fmt.Fprintln(writer, "usage: mxgw-go ") } // batchEOR is the end-of-result sentinel emitted to stdout after every command // in batch mode, regardless of success or failure. const batchEOR = "__MXGW_BATCH_EOR__" // runBatch reads one command line at a time from in, dispatches each via the // normal runWithIO routing, and writes a batchEOR sentinel to stdout after // every result. Errors are serialised as JSON to stdout (not stderr) so the // harness can parse them without interleaving stderr. Blank lines are // skipped; only stdin EOF ends the session. // // The scanner buffer is widened to 16 MiB so a single long command line // (e.g. a bulk-write with several thousand handles) does not trip the // default 64 KiB bufio.Scanner token-too-long error and abort the session. // If a line still exceeds the cap, the error is surfaced as a per-command // error-with-sentinel and the session continues. func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error { bw := bufio.NewWriter(stdout) scanner := bufio.NewScanner(in) scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) for { if !scanner.Scan() { break } line := scanner.Text() args := strings.Fields(line) if len(args) == 0 { // Skip blank / whitespace-only lines; do NOT terminate. The // session ends only on stdin EOF so a stray blank line in a // PowerShell here-string does not silently drop later commands. continue } if err := runWithIO(ctx, args, bw, stderr); err != nil { // Write error as JSON to stdout (bw) so the harness sees it in the // same stream as normal output, framed by the EOR sentinel. errPayload := map[string]string{ "error": err.Error(), "type": "error", } _ = writeJSON(bw, errPayload) } _, _ = fmt.Fprintln(bw, batchEOR) _ = bw.Flush() } if err := scanner.Err(); err != nil { // Emit the scanner failure as a final error-with-sentinel so the // harness sees the failure framed, then return the error so the // process exit reflects it. This handles bufio.ErrTooLong for any // pathological line above the 16 MiB cap. errPayload := map[string]string{ "error": err.Error(), "type": "error", } _ = writeJSON(bw, errPayload) _, _ = fmt.Fprintln(bw, batchEOR) _ = bw.Flush() return err } return nil } func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) { options, err := common.resolved() if err != nil { return nil, options, err } client, err := mxgateway.DialGalaxy(ctx, mxgateway.Options{ Endpoint: options.Endpoint, APIKey: options.apiKeyValue, Plaintext: options.Plaintext, CACertFile: options.CACertFile, ServerNameOverride: options.ServerName, CallTimeout: options.timeout, }) return client, options, err } func runGalaxyTestConnection(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("galaxy-test-connection", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") if err := flags.Parse(args); err != nil { return err } client, options, err := dialGalaxyForCommand(ctx, common) if err != nil { return err } defer client.Close() ok, err := client.TestConnection(ctx) if err != nil { return err } if *jsonOutput { return writeJSON(stdout, map[string]any{ "command": "galaxy-test-connection", "options": options, "ok": ok, }) } fmt.Fprintln(stdout, ok) return nil } func runGalaxyLastDeploy(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("galaxy-last-deploy", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") if err := flags.Parse(args); err != nil { return err } client, options, err := dialGalaxyForCommand(ctx, common) if err != nil { return err } defer client.Close() deployTime, present, err := client.GetLastDeployTime(ctx) if err != nil { return err } if *jsonOutput { payload := map[string]any{ "command": "galaxy-last-deploy", "options": options, "present": present, } if present { payload["timeOfLastDeploy"] = deployTime.UTC().Format(time.RFC3339Nano) } return writeJSON(stdout, payload) } if !present { fmt.Fprintln(stdout, "absent") return nil } fmt.Fprintln(stdout, deployTime.UTC().Format(time.RFC3339Nano)) return nil } func runGalaxyDiscover(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("galaxy-discover", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") if err := flags.Parse(args); err != nil { return err } client, options, err := dialGalaxyForCommand(ctx, common) if err != nil { return err } defer client.Close() objects, err := client.DiscoverHierarchy(ctx) if err != nil { return err } if *jsonOutput { marshaled := make([]json.RawMessage, 0, len(objects)) for _, obj := range objects { marshaled = append(marshaled, mustMarshalProto(obj)) } return writeJSON(stdout, map[string]any{ "command": "galaxy-discover", "options": options, "objects": marshaled, }) } for _, obj := range objects { fmt.Fprintf(stdout, "%d\t%s\t%s\t(attrs=%d)\n", obj.GetGobjectId(), obj.GetTagName(), obj.GetContainedName(), len(obj.GetAttributes())) } return nil } func runGalaxyWatch(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("galaxy-watch", flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) jsonOutput := flags.Bool("json", false, "write JSON output") lastSeen := flags.String("last-seen-deploy-time", "", "RFC3339 timestamp; when set, suppresses the bootstrap event") limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded (Ctrl+C to stop)") if err := flags.Parse(args); err != nil { return err } var lastSeenPtr *time.Time if *lastSeen != "" { parsed, err := time.Parse(time.RFC3339, *lastSeen) if err != nil { return fmt.Errorf("invalid -last-seen-deploy-time: %w", err) } lastSeenPtr = &parsed } client, _, err := dialGalaxyForCommand(ctx, common) if err != nil { return err } defer client.Close() signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) defer stopSignals() streamCtx, cancelStream := context.WithCancel(signalCtx) defer cancelStream() events, errs, err := client.WatchDeployEvents(streamCtx, lastSeenPtr) if err != nil { return err } count := 0 for { select { case event, ok := <-events: if !ok { // Drain any terminal error before returning. if streamErr, errOk := <-errs; errOk && streamErr != nil { return streamErr } return nil } if *jsonOutput { fmt.Fprintln(stdout, string(mustMarshalProto(event))) } else { fmt.Fprintln(stdout, formatDeployEvent(event)) } count++ if *limit > 0 && count >= *limit { cancelStream() return nil } case streamErr, ok := <-errs: if !ok { return nil } if streamErr != nil { return streamErr } case <-signalCtx.Done(): cancelStream() // Allow goroutine to drain. for range events { } return nil } } } func formatDeployEvent(event *mxgateway.DeployEvent) string { observed := "" if ts := event.GetObservedAt(); ts != nil { observed = ts.AsTime().UTC().Format(time.RFC3339Nano) } deploy := "absent" if event.GetTimeOfLastDeployPresent() { if ts := event.GetTimeOfLastDeploy(); ts != nil { deploy = ts.AsTime().UTC().Format(time.RFC3339Nano) } } return fmt.Sprintf( "seq=%d observed=%s deploy=%s objects=%d attributes=%d", event.GetSequence(), observed, deploy, event.GetObjectCount(), event.GetAttributeCount(), ) }