58259016b0
stream-alarms attaches to the gateway's central alarm feed (mirrors stream-events: --limit cap, --json); acknowledge-alarm is a unary session-less ack (--reference required, --comment, --operator). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1470 lines
46 KiB
Go
1470 lines
46 KiB
Go
// Command mxgw-go is the reference Go CLI for the MXAccess Gateway.
|
|
//
|
|
// It exposes versioning, session lifecycle, command invocation, event
|
|
// streaming, a smoke-test workflow, and Galaxy Repository browse subcommands
|
|
// that exercise the same gRPC contract used by the mxgateway library.
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/signal"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway"
|
|
"google.golang.org/protobuf/encoding/protojson"
|
|
"google.golang.org/protobuf/reflect/protoreflect"
|
|
)
|
|
|
|
type versionOutput struct {
|
|
ClientVersion string `json:"clientVersion"`
|
|
GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"`
|
|
WorkerProtocolVersion uint32 `json:"workerProtocolVersion"`
|
|
}
|
|
|
|
type commonOptions struct {
|
|
Endpoint string `json:"endpoint"`
|
|
APIKey string `json:"apiKey"`
|
|
APIKeyEnv string `json:"apiKeyEnv,omitempty"`
|
|
Plaintext bool `json:"plaintext"`
|
|
CACertFile string `json:"caCertFile,omitempty"`
|
|
ServerName string `json:"serverNameOverride,omitempty"`
|
|
CallTimeout string `json:"callTimeout,omitempty"`
|
|
|
|
apiKeyValue string
|
|
timeout time.Duration
|
|
}
|
|
|
|
type openSessionOutput struct {
|
|
Command string `json:"command"`
|
|
Options commonOptions `json:"options"`
|
|
Reply json.RawMessage `json:"reply"`
|
|
}
|
|
|
|
type commandReplyOutput struct {
|
|
Command string `json:"command"`
|
|
Options commonOptions `json:"options"`
|
|
Reply json.RawMessage `json:"reply"`
|
|
}
|
|
|
|
func main() {
|
|
if err := runWithIO(context.Background(), os.Args[1:], os.Stdout, os.Stderr); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(2)
|
|
}
|
|
}
|
|
|
|
func run(args []string) error {
|
|
return runWithIO(context.Background(), args, os.Stdout, os.Stderr)
|
|
}
|
|
|
|
func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
if len(args) == 0 {
|
|
writeUsage(stderr)
|
|
return errors.New("missing command")
|
|
}
|
|
|
|
switch args[0] {
|
|
case "version":
|
|
return runVersion(args[1:], stdout, stderr)
|
|
case "open-session":
|
|
return runOpenSession(ctx, args[1:], stdout, stderr)
|
|
case "close-session":
|
|
return runCloseSession(ctx, args[1:], stdout, stderr)
|
|
case "register":
|
|
return runRegister(ctx, args[1:], stdout, stderr)
|
|
case "add-item":
|
|
return runAddItem(ctx, args[1:], stdout, stderr)
|
|
case "advise":
|
|
return runAdvise(ctx, args[1:], stdout, stderr)
|
|
case "subscribe-bulk":
|
|
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
|
|
case "unsubscribe-bulk":
|
|
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
|
|
case "read-bulk":
|
|
return runReadBulk(ctx, args[1:], stdout, stderr)
|
|
case "write-bulk":
|
|
return runWriteBulk(ctx, args[1:], stdout, stderr)
|
|
case "write2-bulk":
|
|
return runWrite2Bulk(ctx, args[1:], stdout, stderr)
|
|
case "write-secured-bulk":
|
|
return runWriteSecuredBulk(ctx, args[1:], stdout, stderr)
|
|
case "write-secured2-bulk":
|
|
return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr)
|
|
case "bench-read-bulk":
|
|
return runBenchReadBulk(ctx, args[1:], stdout, stderr)
|
|
case "write":
|
|
return runWrite(ctx, args[1:], stdout, stderr)
|
|
case "stream-events":
|
|
return runStreamEvents(ctx, args[1:], stdout, stderr)
|
|
case "stream-alarms":
|
|
return runStreamAlarms(ctx, args[1:], stdout, stderr)
|
|
case "acknowledge-alarm":
|
|
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
|
|
case "smoke":
|
|
return runSmoke(ctx, args[1:], stdout, stderr)
|
|
case "galaxy-test-connection":
|
|
return runGalaxyTestConnection(ctx, args[1:], stdout, stderr)
|
|
case "galaxy-last-deploy":
|
|
return runGalaxyLastDeploy(ctx, args[1:], stdout, stderr)
|
|
case "galaxy-discover":
|
|
return runGalaxyDiscover(ctx, args[1:], stdout, stderr)
|
|
case "galaxy-watch":
|
|
return runGalaxyWatch(ctx, args[1:], stdout, stderr)
|
|
case "batch":
|
|
return runBatch(ctx, os.Stdin, stdout, stderr)
|
|
default:
|
|
writeUsage(stderr)
|
|
return fmt.Errorf("unknown command %q", args[0])
|
|
}
|
|
}
|
|
|
|
func runVersion(args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("version", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
output := versionOutput{
|
|
ClientVersion: mxgateway.ClientVersion,
|
|
GatewayProtocolVersion: mxgateway.GatewayProtocolVersion,
|
|
WorkerProtocolVersion: mxgateway.WorkerProtocolVersion,
|
|
}
|
|
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, output)
|
|
}
|
|
|
|
fmt.Fprintf(stdout, "mxgw-go %s\n", output.ClientVersion)
|
|
fmt.Fprintf(stdout, "gateway protocol %d\n", output.GatewayProtocolVersion)
|
|
fmt.Fprintf(stdout, "worker protocol %d\n", output.WorkerProtocolVersion)
|
|
return nil
|
|
}
|
|
|
|
func runOpenSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("open-session", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
clientName := flags.String("client-session-name", "", "client session name")
|
|
backend := flags.String("backend", "", "requested backend")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
reply, err := client.OpenSessionRaw(ctx, (&mxgateway.OpenSessionOptions{
|
|
RequestedBackend: *backend,
|
|
ClientSessionName: *clientName,
|
|
}).Request())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, openSessionOutput{
|
|
Command: "open-session",
|
|
Options: options,
|
|
Reply: mustMarshalProto(reply),
|
|
})
|
|
}
|
|
|
|
fmt.Fprintln(stdout, reply.GetSessionId())
|
|
return nil
|
|
}
|
|
|
|
func runCloseSession(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("close-session", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" {
|
|
return errors.New("session-id is required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
reply, err := client.CloseSessionRaw(ctx, &mxgateway.CloseSessionRequest{SessionId: *sessionID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, commandReplyOutput{
|
|
Command: "close-session",
|
|
Options: options,
|
|
Reply: mustMarshalProto(reply),
|
|
})
|
|
}
|
|
|
|
fmt.Fprintln(stdout, reply.GetFinalState())
|
|
return nil
|
|
}
|
|
|
|
func runRegister(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("register", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
clientName := flags.String("client-name", "", "MXAccess client name")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *clientName == "" {
|
|
return errors.New("session-id and client-name are required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
reply, err := session.RegisterRaw(ctx, *clientName)
|
|
return writeCommandOutput(stdout, *jsonOutput, "register", options, reply, err)
|
|
}
|
|
|
|
func runAddItem(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("add-item", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
item := flags.String("item", "", "item definition")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *item == "" {
|
|
return errors.New("session-id and item are required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
reply, err := session.AddItemRaw(ctx, int32(*serverHandle), *item)
|
|
return writeCommandOutput(stdout, *jsonOutput, "add-item", options, reply, err)
|
|
}
|
|
|
|
func runAdvise(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("advise", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" {
|
|
return errors.New("session-id is required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
reply, err := session.AdviseRaw(ctx, int32(*serverHandle), int32(*itemHandle))
|
|
return writeCommandOutput(stdout, *jsonOutput, "advise", options, reply, err)
|
|
}
|
|
|
|
func runSubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("subscribe-bulk", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
items := flags.String("items", "", "comma-separated item definitions")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *items == "" {
|
|
return errors.New("session-id and items are required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
results, err := session.SubscribeBulk(ctx, int32(*serverHandle), parseStringList(*items))
|
|
return writeBulkOutput(stdout, *jsonOutput, "subscribe-bulk", options, results, err)
|
|
}
|
|
|
|
func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("unsubscribe-bulk", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *itemHandles == "" {
|
|
return errors.New("session-id and item-handles are required")
|
|
}
|
|
|
|
handles, err := parseInt32List(*itemHandles)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles)
|
|
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
|
|
}
|
|
|
|
func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
items := flags.String("items", "", "comma-separated tag addresses")
|
|
timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *items == "" {
|
|
return errors.New("session-id and items are required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond)
|
|
return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err)
|
|
}
|
|
|
|
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false)
|
|
}
|
|
|
|
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true)
|
|
}
|
|
|
|
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false)
|
|
}
|
|
|
|
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true)
|
|
}
|
|
|
|
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
|
|
// the four bulk-write families. command selects which of the four routes
|
|
// runs; withTimestamp adds a --timestamp-value flag for the Write2 / Secured2
|
|
// variants. Secured-only flags (--current-user-id / --verifier-user-id) are
|
|
// only registered for the secured variants and the non-secured -user-id flag
|
|
// is only registered for Write/Write2, so a wrong-variant flag becomes a
|
|
// clean "flag provided but not defined" error instead of silently no-op'ing.
|
|
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool) error {
|
|
secured := command == "write-secured-bulk" || command == "write-secured2-bulk"
|
|
flags := flag.NewFlagSet(command, flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
|
|
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
|
|
values := flags.String("values", "", "comma-separated values (one per item handle)")
|
|
var userID, currentUserID, verifierUserID *int
|
|
if secured {
|
|
currentUserID = flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
|
|
verifierUserID = flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
|
|
} else {
|
|
userID = flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
|
|
}
|
|
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" || *itemHandles == "" || *values == "" {
|
|
return errors.New("session-id, item-handles, and values are required")
|
|
}
|
|
|
|
handles, err := parseInt32List(*itemHandles)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
valueTexts := parseStringList(*values)
|
|
if len(handles) != len(valueTexts) {
|
|
return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts))
|
|
}
|
|
|
|
parsedValues := make([]*mxgateway.MxValue, len(handles))
|
|
for i, text := range valueTexts {
|
|
v, err := parseValue(*valueType, text)
|
|
if err != nil {
|
|
return fmt.Errorf("entry %d: %w", i, err)
|
|
}
|
|
parsedValues[i] = v
|
|
}
|
|
|
|
var tsValue *mxgateway.MxValue
|
|
if withTimestamp {
|
|
if *timestampValue == "" {
|
|
return errors.New("timestamp-value is required for write2/write-secured2 bulk variants")
|
|
}
|
|
parsed, err := parseRfc3339Timestamp(*timestampValue)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tsValue = parsed
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
|
|
var results []*mxgateway.BulkWriteResult
|
|
switch command {
|
|
case "write-bulk":
|
|
entries := make([]*mxgateway.WriteBulkEntry, len(handles))
|
|
for i := range handles {
|
|
entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)}
|
|
}
|
|
results, err = session.WriteBulk(ctx, int32(*serverHandle), entries)
|
|
case "write2-bulk":
|
|
entries := make([]*mxgateway.Write2BulkEntry, len(handles))
|
|
for i := range handles {
|
|
entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)}
|
|
}
|
|
results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries)
|
|
case "write-secured-bulk":
|
|
entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles))
|
|
for i := range handles {
|
|
entries[i] = &mxgateway.WriteSecuredBulkEntry{
|
|
ItemHandle: handles[i],
|
|
Value: parsedValues[i],
|
|
CurrentUserId: int32(*currentUserID),
|
|
VerifierUserId: int32(*verifierUserID),
|
|
}
|
|
}
|
|
results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries)
|
|
case "write-secured2-bulk":
|
|
entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles))
|
|
for i := range handles {
|
|
entries[i] = &mxgateway.WriteSecured2BulkEntry{
|
|
ItemHandle: handles[i],
|
|
Value: parsedValues[i],
|
|
TimestampValue: tsValue,
|
|
CurrentUserId: int32(*currentUserID),
|
|
VerifierUserId: int32(*verifierUserID),
|
|
}
|
|
}
|
|
results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries)
|
|
default:
|
|
return fmt.Errorf("unsupported bulk write command %q", command)
|
|
}
|
|
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
|
|
}
|
|
|
|
// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go:
|
|
// opens its own session, subscribes to bulk-size tags so the worker value cache
|
|
// populates from real OnDataChange events, runs ReadBulk in a tight loop for
|
|
// duration-seconds with per-call timing, and emits the shared JSON schema the
|
|
// scripts/bench-read-bulk.ps1 driver collates across all five clients.
|
|
func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
clientName := flags.String("client-name", "mxgw-go-bench", "session client name")
|
|
durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds")
|
|
warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds")
|
|
bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call")
|
|
tagStart := flags.Int("tag-start", 1, "first machine number")
|
|
tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)")
|
|
tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix")
|
|
timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *bulkSize < 1 {
|
|
return errors.New("bulk-size must be positive")
|
|
}
|
|
if *durationSeconds < 1 {
|
|
return errors.New("duration-seconds must be positive")
|
|
}
|
|
|
|
tags := make([]string, *bulkSize)
|
|
for i := 0; i < *bulkSize; i++ {
|
|
tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute)
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
_, _ = session.Close(context.Background())
|
|
}()
|
|
|
|
serverHandle, err := session.Register(ctx, *clientName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
itemHandles := make([]int32, 0, len(subscribeResults))
|
|
for _, result := range subscribeResults {
|
|
if result.GetWasSuccessful() {
|
|
itemHandles = append(itemHandles, result.GetItemHandle())
|
|
}
|
|
}
|
|
defer func() {
|
|
if len(itemHandles) > 0 {
|
|
_, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles)
|
|
}
|
|
}()
|
|
|
|
// Warm-up: drive identical calls so any first-call JIT / connection-pool
|
|
// setup is amortised before the measurement window opens. Honor ctx so
|
|
// Ctrl+C or a parent-cancel (e.g. the cross-language bench driver killing
|
|
// the child early) exits promptly rather than spinning failing calls until
|
|
// the wall-clock deadline.
|
|
warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second)
|
|
timeout := time.Duration(*timeoutMs) * time.Millisecond
|
|
for time.Now().Before(warmupDeadline) && ctx.Err() == nil {
|
|
_, _ = session.ReadBulk(ctx, serverHandle, tags, timeout)
|
|
}
|
|
|
|
// Steady state: per-call latency captured via time.Now() deltas. Same ctx
|
|
// guard as warm-up; on cancel we stop the loop and report the truncated
|
|
// window faithfully.
|
|
latenciesMs := make([]float64, 0, 65536)
|
|
var totalReadResults int64
|
|
var cachedReadResults int64
|
|
var successfulCalls, failedCalls int
|
|
steadyStart := time.Now()
|
|
steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second)
|
|
|
|
for time.Now().Before(steadyDeadline) && ctx.Err() == nil {
|
|
callStart := time.Now()
|
|
results, err := session.ReadBulk(ctx, serverHandle, tags, timeout)
|
|
elapsed := time.Since(callStart)
|
|
latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6)
|
|
if err != nil {
|
|
failedCalls++
|
|
continue
|
|
}
|
|
successfulCalls++
|
|
for _, r := range results {
|
|
totalReadResults++
|
|
if r.GetWasCached() {
|
|
cachedReadResults++
|
|
}
|
|
}
|
|
}
|
|
steadyElapsed := time.Since(steadyStart)
|
|
totalCalls := successfulCalls + failedCalls
|
|
|
|
callsPerSecond := 0.0
|
|
if steadyElapsed.Seconds() > 0 {
|
|
callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds()
|
|
}
|
|
|
|
stats := map[string]any{
|
|
"language": "go",
|
|
"command": "bench-read-bulk",
|
|
"endpoint": options.Endpoint,
|
|
"clientName": *clientName,
|
|
"bulkSize": *bulkSize,
|
|
"durationSeconds": *durationSeconds,
|
|
"warmupSeconds": *warmupSeconds,
|
|
"durationMs": steadyElapsed.Milliseconds(),
|
|
"tags": tags,
|
|
"totalCalls": totalCalls,
|
|
"successfulCalls": successfulCalls,
|
|
"failedCalls": failedCalls,
|
|
"totalReadResults": totalReadResults,
|
|
"cachedReadResults": cachedReadResults,
|
|
"callsPerSecond": roundTo(callsPerSecond, 2),
|
|
"latencyMs": percentileSummary(latenciesMs),
|
|
}
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, stats)
|
|
}
|
|
fmt.Fprintln(stdout, callsPerSecond)
|
|
return nil
|
|
}
|
|
|
|
// percentileSummary returns the same { p50, p95, p99, max, mean } shape every
|
|
// language bench emits, rounded to 3 decimal places so the PowerShell driver
|
|
// sees one schema across all five clients.
|
|
func percentileSummary(sample []float64) map[string]float64 {
|
|
if len(sample) == 0 {
|
|
return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0}
|
|
}
|
|
sorted := append([]float64(nil), sample...)
|
|
sort.Float64s(sorted)
|
|
mean := 0.0
|
|
max := sorted[len(sorted)-1]
|
|
for _, v := range sample {
|
|
mean += v
|
|
}
|
|
mean /= float64(len(sample))
|
|
return map[string]float64{
|
|
"p50": roundTo(percentile(sorted, 0.50), 3),
|
|
"p95": roundTo(percentile(sorted, 0.95), 3),
|
|
"p99": roundTo(percentile(sorted, 0.99), 3),
|
|
"max": roundTo(max, 3),
|
|
"mean": roundTo(mean, 3),
|
|
}
|
|
}
|
|
|
|
// percentile uses nearest-rank with linear interpolation; matches the .NET
|
|
// implementation so cross-language comparisons are apples-to-apples.
|
|
func percentile(sorted []float64, quantile float64) float64 {
|
|
if len(sorted) == 0 {
|
|
return 0
|
|
}
|
|
if len(sorted) == 1 {
|
|
return sorted[0]
|
|
}
|
|
rank := quantile * float64(len(sorted)-1)
|
|
lower := int(rank)
|
|
upper := lower + 1
|
|
if upper >= len(sorted) {
|
|
return sorted[lower]
|
|
}
|
|
fraction := rank - float64(lower)
|
|
return sorted[lower] + (sorted[upper]-sorted[lower])*fraction
|
|
}
|
|
|
|
func roundTo(value float64, digits int) float64 {
|
|
shift := 1.0
|
|
for i := 0; i < digits; i++ {
|
|
shift *= 10
|
|
}
|
|
return float64(int64(value*shift+0.5)) / shift
|
|
}
|
|
|
|
// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the
|
|
// MxValue protobuf representation used for the timestamped write families.
|
|
func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) {
|
|
t, err := time.Parse(time.RFC3339Nano, text)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err)
|
|
}
|
|
return mxgateway.TimestampValue(t), nil
|
|
}
|
|
|
|
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("write", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
|
itemHandle := flags.Int("item-handle", 0, "MXAccess item handle")
|
|
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
|
|
valueText := flags.String("value", "", "value text")
|
|
userID := flags.Int("user-id", 0, "MXAccess user id")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" {
|
|
return errors.New("session-id is required")
|
|
}
|
|
|
|
value, err := parseValue(*valueType, *valueText)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
reply, err := session.WriteRaw(ctx, int32(*serverHandle), int32(*itemHandle), value, int32(*userID))
|
|
return writeCommandOutput(stdout, *jsonOutput, "write", options, reply, err)
|
|
}
|
|
|
|
func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("stream-events", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
sessionID := flags.String("session-id", "", "gateway session id")
|
|
after := flags.Uint64("after-worker-sequence", 0, "first worker sequence to read after")
|
|
limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *sessionID == "" {
|
|
return errors.New("session-id is required")
|
|
}
|
|
|
|
client, _, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
// Mirror runGalaxyWatch so Ctrl+C on a long-running stream-events command
|
|
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
|
|
// than a torn TCP connection) and the deferred subscription.Close() /
|
|
// client.Close() actually run.
|
|
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
|
defer stopSignals()
|
|
|
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
|
streamCtx, cancelStream := context.WithCancel(signalCtx)
|
|
defer cancelStream()
|
|
subscription, err := session.SubscribeEventsAfter(streamCtx, *after)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer subscription.Close()
|
|
events := subscription.Events()
|
|
|
|
count := 0
|
|
for result := range events {
|
|
if result.Err != nil {
|
|
return result.Err
|
|
}
|
|
if *jsonOutput {
|
|
fmt.Fprintln(stdout, string(mustMarshalProto(result.Event)))
|
|
} else {
|
|
fmt.Fprintf(stdout, "%d %s\n", result.Event.GetWorkerSequence(), result.Event.GetFamily())
|
|
}
|
|
count++
|
|
if *limit > 0 && count >= *limit {
|
|
cancelStream()
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
|
|
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
client, _, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
|
|
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
|
|
// than a torn TCP connection) and the deferred client.Close() actually runs.
|
|
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
|
defer stopSignals()
|
|
|
|
streamCtx, cancelStream := context.WithCancel(signalCtx)
|
|
defer cancelStream()
|
|
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
count := 0
|
|
for {
|
|
message, err := stream.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
|
|
} else {
|
|
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
|
|
}
|
|
count++
|
|
if *limit > 0 && count >= *limit {
|
|
cancelStream()
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
|
|
// output style, distinguishing the active-alarm snapshot, snapshot-complete
|
|
// sentinel, and transition cases of the message's payload oneof.
|
|
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
|
|
switch {
|
|
case message.GetActiveAlarm() != nil:
|
|
alarm := message.GetActiveAlarm()
|
|
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
|
|
case message.GetSnapshotComplete():
|
|
return "snapshot-complete"
|
|
case message.GetTransition() != nil:
|
|
transition := message.GetTransition()
|
|
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
reference := flags.String("reference", "", "full alarm reference to acknowledge")
|
|
comment := flags.String("comment", "", "operator acknowledge comment")
|
|
operator := flags.String("operator", "", "operator user performing the acknowledge")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *reference == "" {
|
|
return errors.New("reference is required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
|
|
AlarmFullReference: *reference,
|
|
Comment: *comment,
|
|
OperatorUser: *operator,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, commandReplyOutput{
|
|
Command: "acknowledge-alarm",
|
|
Options: options,
|
|
Reply: mustMarshalProto(reply),
|
|
})
|
|
}
|
|
|
|
fmt.Fprintln(stdout, reply.GetHresult())
|
|
return nil
|
|
}
|
|
|
|
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
clientName := flags.String("client-name", "mxgw-go-smoke", "MXAccess client name")
|
|
item := flags.String("item", "", "item definition")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
if *item == "" {
|
|
return errors.New("item is required")
|
|
}
|
|
|
|
client, options, err := dialForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serverHandle, err := session.Register(ctx, *clientName)
|
|
if err != nil {
|
|
return closeSmokeSession(ctx, session, err)
|
|
}
|
|
itemHandle, err := session.AddItem(ctx, serverHandle, *item)
|
|
if err != nil {
|
|
return closeSmokeSession(ctx, session, err)
|
|
}
|
|
if err := session.Advise(ctx, serverHandle, itemHandle); err != nil {
|
|
return closeSmokeSession(ctx, session, err)
|
|
}
|
|
if err := closeSmokeSession(ctx, session, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
output := map[string]any{
|
|
"command": "smoke",
|
|
"options": options,
|
|
"sessionId": session.ID(),
|
|
"serverHandle": serverHandle,
|
|
"itemHandle": itemHandle,
|
|
}
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, output)
|
|
}
|
|
|
|
fmt.Fprintf(stdout, "session=%s server=%d item=%d\n", session.ID(), serverHandle, itemHandle)
|
|
return nil
|
|
}
|
|
|
|
func closeSmokeSession(ctx context.Context, session *mxgateway.Session, primaryErr error) error {
|
|
closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
if until := time.Until(deadline); until > 0 && until < 5*time.Second {
|
|
cancel()
|
|
closeCtx, cancel = context.WithTimeout(context.Background(), until)
|
|
defer cancel()
|
|
}
|
|
}
|
|
|
|
_, closeErr := session.Close(closeCtx)
|
|
if primaryErr != nil {
|
|
return primaryErr
|
|
}
|
|
return closeErr
|
|
}
|
|
|
|
func parseStringList(value string) []string {
|
|
parts := strings.Split(value, ",")
|
|
items := make([]string, 0, len(parts))
|
|
for _, part := range parts {
|
|
item := strings.TrimSpace(part)
|
|
if item != "" {
|
|
items = append(items, item)
|
|
}
|
|
}
|
|
return items
|
|
}
|
|
|
|
func parseInt32List(value string) ([]int32, error) {
|
|
parts := strings.Split(value, ",")
|
|
items := make([]int32, 0, len(parts))
|
|
for _, part := range parts {
|
|
item := strings.TrimSpace(part)
|
|
if item == "" {
|
|
continue
|
|
}
|
|
parsed, err := strconv.ParseInt(item, 10, 32)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid item handle %q: %w", item, err)
|
|
}
|
|
items = append(items, int32(parsed))
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
|
|
common := &commonOptions{}
|
|
flags.StringVar(&common.Endpoint, "endpoint", "localhost:5000", "gateway endpoint")
|
|
flags.StringVar(&common.APIKey, "api-key", "", "gateway API key")
|
|
flags.StringVar(&common.APIKeyEnv, "api-key-env", "MXGATEWAY_API_KEY", "environment variable containing the API key")
|
|
flags.BoolVar(&common.Plaintext, "plaintext", false, "use plaintext transport")
|
|
flags.StringVar(&common.CACertFile, "ca-cert", "", "CA certificate file")
|
|
flags.StringVar(&common.ServerName, "server-name-override", "", "TLS server name override")
|
|
flags.StringVar(&common.CallTimeout, "call-timeout", "30s", "per-call timeout")
|
|
return common
|
|
}
|
|
|
|
func dialForCommand(ctx context.Context, common *commonOptions) (*mxgateway.Client, commonOptions, error) {
|
|
options, err := common.resolved()
|
|
if err != nil {
|
|
return nil, options, err
|
|
}
|
|
|
|
client, err := mxgateway.Dial(ctx, mxgateway.Options{
|
|
Endpoint: options.Endpoint,
|
|
APIKey: options.apiKeyValue,
|
|
Plaintext: options.Plaintext,
|
|
CACertFile: options.CACertFile,
|
|
ServerNameOverride: options.ServerName,
|
|
CallTimeout: options.timeout,
|
|
})
|
|
return client, options, err
|
|
}
|
|
|
|
func (o *commonOptions) resolved() (commonOptions, error) {
|
|
resolved := *o
|
|
if resolved.APIKey == "" && resolved.APIKeyEnv != "" {
|
|
resolved.apiKeyValue = os.Getenv(resolved.APIKeyEnv)
|
|
} else {
|
|
resolved.apiKeyValue = resolved.APIKey
|
|
}
|
|
resolved.APIKey = mxgateway.RedactAPIKey(resolved.apiKeyValue)
|
|
if resolved.CallTimeout != "" {
|
|
timeout, err := time.ParseDuration(resolved.CallTimeout)
|
|
if err != nil {
|
|
return resolved, err
|
|
}
|
|
resolved.timeout = timeout
|
|
}
|
|
return resolved, nil
|
|
}
|
|
|
|
func parseValue(valueType, valueText string) (*mxgateway.MxValue, error) {
|
|
switch valueType {
|
|
case "bool":
|
|
value, err := strconv.ParseBool(valueText)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
|
|
}
|
|
return mxgateway.BoolValue(value), nil
|
|
case "int32":
|
|
value, err := strconv.ParseInt(valueText, 10, 32)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
|
|
}
|
|
return mxgateway.Int32Value(int32(value)), nil
|
|
case "int64":
|
|
value, err := strconv.ParseInt(valueText, 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
|
|
}
|
|
return mxgateway.Int64Value(value), nil
|
|
case "float":
|
|
value, err := strconv.ParseFloat(valueText, 32)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
|
|
}
|
|
return mxgateway.FloatValue(float32(value)), nil
|
|
case "double":
|
|
value, err := strconv.ParseFloat(valueText, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid -value for -type %s: %q: %w", valueType, valueText, err)
|
|
}
|
|
return mxgateway.DoubleValue(value), nil
|
|
case "string":
|
|
return mxgateway.StringValue(valueText), nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported value type %q", valueType)
|
|
}
|
|
}
|
|
|
|
func writeCommandOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, reply *mxgateway.MxCommandReply, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if jsonOutput {
|
|
return writeJSON(stdout, commandReplyOutput{
|
|
Command: command,
|
|
Options: options,
|
|
Reply: mustMarshalProto(reply),
|
|
})
|
|
}
|
|
fmt.Fprintln(stdout, reply.GetKind())
|
|
return nil
|
|
}
|
|
|
|
func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.SubscribeResult, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if jsonOutput {
|
|
return writeJSON(stdout, map[string]any{
|
|
"command": command,
|
|
"options": options,
|
|
"results": results,
|
|
})
|
|
}
|
|
fmt.Fprintln(stdout, len(results))
|
|
return nil
|
|
}
|
|
|
|
func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if jsonOutput {
|
|
return writeJSON(stdout, map[string]any{
|
|
"command": command,
|
|
"options": options,
|
|
"results": results,
|
|
})
|
|
}
|
|
fmt.Fprintln(stdout, len(results))
|
|
return nil
|
|
}
|
|
|
|
func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if jsonOutput {
|
|
return writeJSON(stdout, map[string]any{
|
|
"command": command,
|
|
"options": options,
|
|
"results": results,
|
|
})
|
|
}
|
|
fmt.Fprintln(stdout, len(results))
|
|
return nil
|
|
}
|
|
|
|
func writeJSON(writer io.Writer, value any) error {
|
|
encoder := json.NewEncoder(writer)
|
|
encoder.SetIndent("", " ")
|
|
return encoder.Encode(value)
|
|
}
|
|
|
|
func mustMarshalProto(message protojsonMessage) json.RawMessage {
|
|
data, err := protojson.MarshalOptions{UseProtoNames: false}.Marshal(message)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return data
|
|
}
|
|
|
|
type protojsonMessage interface {
|
|
ProtoReflect() protoreflect.Message
|
|
}
|
|
|
|
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
|
// in batch mode, regardless of success or failure.
|
|
const batchEOR = "__MXGW_BATCH_EOR__"
|
|
|
|
// runBatch reads one command line at a time from in, dispatches each via the
|
|
// normal runWithIO routing, and writes a batchEOR sentinel to stdout after
|
|
// every result. Errors are serialised as JSON to stdout (not stderr) so the
|
|
// harness can parse them without interleaving stderr. The loop never terminates
|
|
// on command error; only stdin EOF (or an empty line) ends the session.
|
|
func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error {
|
|
bw := bufio.NewWriter(stdout)
|
|
scanner := bufio.NewScanner(in)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line == "" {
|
|
break
|
|
}
|
|
args := strings.Fields(line)
|
|
if len(args) == 0 {
|
|
continue
|
|
}
|
|
if err := runWithIO(ctx, args, bw, stderr); err != nil {
|
|
// Write error as JSON to stdout (bw) so the harness sees it in the
|
|
// same stream as normal output, framed by the EOR sentinel.
|
|
errPayload := map[string]string{
|
|
"error": err.Error(),
|
|
"type": "error",
|
|
}
|
|
_ = writeJSON(bw, errPayload)
|
|
}
|
|
_, _ = fmt.Fprintln(bw, batchEOR)
|
|
_ = bw.Flush()
|
|
}
|
|
return scanner.Err()
|
|
}
|
|
|
|
func writeUsage(writer io.Writer) {
|
|
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
|
}
|
|
|
|
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
|
|
options, err := common.resolved()
|
|
if err != nil {
|
|
return nil, options, err
|
|
}
|
|
|
|
client, err := mxgateway.DialGalaxy(ctx, mxgateway.Options{
|
|
Endpoint: options.Endpoint,
|
|
APIKey: options.apiKeyValue,
|
|
Plaintext: options.Plaintext,
|
|
CACertFile: options.CACertFile,
|
|
ServerNameOverride: options.ServerName,
|
|
CallTimeout: options.timeout,
|
|
})
|
|
return client, options, err
|
|
}
|
|
|
|
func runGalaxyTestConnection(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("galaxy-test-connection", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialGalaxyForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
ok, err := client.TestConnection(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
return writeJSON(stdout, map[string]any{
|
|
"command": "galaxy-test-connection",
|
|
"options": options,
|
|
"ok": ok,
|
|
})
|
|
}
|
|
fmt.Fprintln(stdout, ok)
|
|
return nil
|
|
}
|
|
|
|
func runGalaxyLastDeploy(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("galaxy-last-deploy", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialGalaxyForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
deployTime, present, err := client.GetLastDeployTime(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
payload := map[string]any{
|
|
"command": "galaxy-last-deploy",
|
|
"options": options,
|
|
"present": present,
|
|
}
|
|
if present {
|
|
payload["timeOfLastDeploy"] = deployTime.UTC().Format(time.RFC3339Nano)
|
|
}
|
|
return writeJSON(stdout, payload)
|
|
}
|
|
if !present {
|
|
fmt.Fprintln(stdout, "absent")
|
|
return nil
|
|
}
|
|
fmt.Fprintln(stdout, deployTime.UTC().Format(time.RFC3339Nano))
|
|
return nil
|
|
}
|
|
|
|
func runGalaxyDiscover(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("galaxy-discover", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
client, options, err := dialGalaxyForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
objects, err := client.DiscoverHierarchy(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if *jsonOutput {
|
|
marshaled := make([]json.RawMessage, 0, len(objects))
|
|
for _, obj := range objects {
|
|
marshaled = append(marshaled, mustMarshalProto(obj))
|
|
}
|
|
return writeJSON(stdout, map[string]any{
|
|
"command": "galaxy-discover",
|
|
"options": options,
|
|
"objects": marshaled,
|
|
})
|
|
}
|
|
for _, obj := range objects {
|
|
fmt.Fprintf(stdout, "%d\t%s\t%s\t(attrs=%d)\n", obj.GetGobjectId(), obj.GetTagName(), obj.GetContainedName(), len(obj.GetAttributes()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runGalaxyWatch(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
|
flags := flag.NewFlagSet("galaxy-watch", flag.ContinueOnError)
|
|
flags.SetOutput(stderr)
|
|
common := bindCommonFlags(flags)
|
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
|
lastSeen := flags.String("last-seen-deploy-time", "", "RFC 3339 timestamp (with optional fractional seconds); when set, suppresses the bootstrap event")
|
|
limit := flags.Int("limit", 0, "maximum events to read; 0 means unbounded (Ctrl+C to stop)")
|
|
|
|
if err := flags.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
var lastSeenPtr *time.Time
|
|
if *lastSeen != "" {
|
|
// Use RFC3339Nano so values copy-pasted from galaxy-watch -json output
|
|
// (which formatDeployEvent emits with fractional seconds) round-trip;
|
|
// RFC3339Nano also accepts whole-second values, so the layout switch is
|
|
// strictly broader than the previous time.RFC3339 parse.
|
|
parsed, err := time.Parse(time.RFC3339Nano, *lastSeen)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid -last-seen-deploy-time: %w", err)
|
|
}
|
|
lastSeenPtr = &parsed
|
|
}
|
|
|
|
client, _, err := dialGalaxyForCommand(ctx, common)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
|
defer stopSignals()
|
|
|
|
streamCtx, cancelStream := context.WithCancel(signalCtx)
|
|
defer cancelStream()
|
|
|
|
events, errs, err := client.WatchDeployEvents(streamCtx, lastSeenPtr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
count := 0
|
|
for {
|
|
select {
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
// Drain any terminal error before returning.
|
|
if streamErr, errOk := <-errs; errOk && streamErr != nil {
|
|
return streamErr
|
|
}
|
|
return nil
|
|
}
|
|
if *jsonOutput {
|
|
fmt.Fprintln(stdout, string(mustMarshalProto(event)))
|
|
} else {
|
|
fmt.Fprintln(stdout, formatDeployEvent(event))
|
|
}
|
|
count++
|
|
if *limit > 0 && count >= *limit {
|
|
cancelStream()
|
|
// Allow goroutine to drain.
|
|
for range events {
|
|
}
|
|
return nil
|
|
}
|
|
case streamErr, ok := <-errs:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if streamErr != nil {
|
|
return streamErr
|
|
}
|
|
case <-signalCtx.Done():
|
|
cancelStream()
|
|
// Allow goroutine to drain.
|
|
for range events {
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func formatDeployEvent(event *mxgateway.DeployEvent) string {
|
|
observed := ""
|
|
if ts := event.GetObservedAt(); ts != nil {
|
|
observed = ts.AsTime().UTC().Format(time.RFC3339Nano)
|
|
}
|
|
deploy := "absent"
|
|
if event.GetTimeOfLastDeployPresent() {
|
|
if ts := event.GetTimeOfLastDeploy(); ts != nil {
|
|
deploy = ts.AsTime().UTC().Format(time.RFC3339Nano)
|
|
}
|
|
}
|
|
return fmt.Sprintf(
|
|
"seq=%d observed=%s deploy=%s objects=%d attributes=%d",
|
|
event.GetSequence(),
|
|
observed,
|
|
deploy,
|
|
event.GetObjectCount(),
|
|
event.GetAttributeCount(),
|
|
)
|
|
}
|