diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index a3836b7..0af142b 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": return runStreamEvents(ctx, args[1:], stdout, stderr) + case "stream-alarms": + return runStreamAlarms(ctx, args[1:], stdout, stderr) + case "acknowledge-alarm": + return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr) case "smoke": return runSmoke(ctx, args[1:], stdout, stderr) case "galaxy-test-connection": @@ -816,6 +820,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write return nil } +func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped") + limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded") + + if err := flags.Parse(args); err != nil { + return err + } + + client, _, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + // Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command + // cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather + // than a torn TCP connection) and the deferred client.Close() actually runs. + signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stopSignals() + + streamCtx, cancelStream := context.WithCancel(signalCtx) + defer cancelStream() + stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix}) + if err != nil { + return err + } + + count := 0 + for { + message, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if *jsonOutput { + fmt.Fprintln(stdout, string(mustMarshalProto(message))) + } else { + fmt.Fprintln(stdout, formatAlarmFeedMessage(message)) + } + count++ + if *limit > 0 && count >= *limit { + cancelStream() + return nil + } + } +} + +// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text +// output style, distinguishing the active-alarm snapshot, snapshot-complete +// sentinel, and transition cases of the message's payload oneof. +func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string { + switch { + case message.GetActiveAlarm() != nil: + alarm := message.GetActiveAlarm() + return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity()) + case message.GetSnapshotComplete(): + return "snapshot-complete" + case message.GetTransition() != nil: + transition := message.GetTransition() + return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity()) + default: + return "unknown" + } +} + +func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + reference := flags.String("reference", "", "full alarm reference to acknowledge") + comment := flags.String("comment", "", "operator acknowledge comment") + operator := flags.String("operator", "", "operator user performing the acknowledge") + + if err := flags.Parse(args); err != nil { + return err + } + if *reference == "" { + return errors.New("reference is required") + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{ + AlarmFullReference: *reference, + Comment: *comment, + OperatorUser: *operator, + }) + if err != nil { + return err + } + if *jsonOutput { + return writeJSON(stdout, commandReplyOutput{ + Command: "acknowledge-alarm", + Options: options, + Reply: mustMarshalProto(reply), + }) + } + + fmt.Fprintln(stdout, reply.GetHresult()) + return nil +} + func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("smoke", flag.ContinueOnError) flags.SetOutput(stderr) @@ -1120,7 +1237,7 @@ func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error } func writeUsage(writer io.Writer) { - fmt.Fprintln(writer, "usage: mxgw-go ") + fmt.Fprintln(writer, "usage: mxgw-go ") } func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {