diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index 4f92a90..b736cd5 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": return runStreamEvents(ctx, args[1:], stdout, stderr) + case "stream-alarms": + return runStreamAlarms(ctx, args[1:], stdout, stderr) + case "acknowledge-alarm": + return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr) case "smoke": return runSmoke(ctx, args[1:], stdout, stderr) case "galaxy-test-connection": @@ -796,6 +800,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write return nil } +func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped") + limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded") + + if err := flags.Parse(args); err != nil { + return err + } + + client, _, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + // Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command + // cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather + // than a torn TCP connection) and the deferred client.Close() actually runs. + signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stopSignals() + + streamCtx, cancelStream := context.WithCancel(signalCtx) + defer cancelStream() + stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix}) + if err != nil { + return err + } + + count := 0 + for { + message, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if *jsonOutput { + fmt.Fprintln(stdout, string(mustMarshalProto(message))) + } else { + fmt.Fprintln(stdout, formatAlarmFeedMessage(message)) + } + count++ + if *limit > 0 && count >= *limit { + cancelStream() + return nil + } + } +} + +// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text +// output style, distinguishing the active-alarm snapshot, snapshot-complete +// sentinel, and transition cases of the message's payload oneof. +func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string { + switch { + case message.GetActiveAlarm() != nil: + alarm := message.GetActiveAlarm() + return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity()) + case message.GetSnapshotComplete(): + return "snapshot-complete" + case message.GetTransition() != nil: + transition := message.GetTransition() + return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity()) + default: + return "unknown" + } +} + +func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + reference := flags.String("reference", "", "full alarm reference to acknowledge") + comment := flags.String("comment", "", "operator acknowledge comment") + operator := flags.String("operator", "", "operator user performing the acknowledge") + + if err := flags.Parse(args); err != nil { + return err + } + if *reference == "" { + return errors.New("reference is required") + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{ + AlarmFullReference: *reference, + Comment: *comment, + OperatorUser: *operator, + }) + if err != nil { + return err + } + if *jsonOutput { + return writeJSON(stdout, commandReplyOutput{ + Command: "acknowledge-alarm", + Options: options, + Reply: mustMarshalProto(reply), + }) + } + + fmt.Fprintln(stdout, reply.GetHresult()) + return nil +} + func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("smoke", flag.ContinueOnError) flags.SetOutput(stderr) @@ -1064,7 +1181,7 @@ type protojsonMessage interface { } func writeUsage(writer io.Writer) { - fmt.Fprintln(writer, "usage: mxgw-go ") + fmt.Fprintln(writer, "usage: mxgw-go ") } // batchEOR is the end-of-result sentinel emitted to stdout after every command diff --git a/clients/go/mxgateway/alarms.go b/clients/go/mxgateway/alarms.go index 9fbd911..bc7993b 100644 --- a/clients/go/mxgateway/alarms.go +++ b/clients/go/mxgateway/alarms.go @@ -51,3 +51,26 @@ func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRe return stream, nil } + +// StreamAlarms attaches to the gateway's central alarm feed. The stream opens +// with one AlarmFeedMessage per currently-active alarm (the ConditionRefresh +// snapshot), then a single snapshot-complete sentinel, then a transition for +// every subsequent raise / acknowledge / clear. It is served by the gateway's +// always-on alarm monitor — no worker session is opened — so any number of +// clients may attach. +// +// The returned stream is owned by the caller; cancel ctx to release it. +// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the +// stream to a sub-tree. +func (c *Client) StreamAlarms(ctx context.Context, req *StreamAlarmsRequest) (StreamAlarmsClient, error) { + if req == nil { + return nil, errors.New("mxgateway: stream alarms request is required") + } + + stream, err := c.raw.StreamAlarms(ctx, req) + if err != nil { + return nil, &GatewayError{Op: "stream alarms", Err: err} + } + + return stream, nil +} diff --git a/clients/go/mxgateway/alarms_test.go b/clients/go/mxgateway/alarms_test.go index 95d61ef..3a63902 100644 --- a/clients/go/mxgateway/alarms_test.go +++ b/clients/go/mxgateway/alarms_test.go @@ -147,8 +147,8 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) { defer cleanup() stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{ - SessionId: "session-1", - AlarmFilterPrefix: "Tank01.", + SessionId: "session-1", + AlarmFilterPrefix: "Tank01.", }) if err != nil { t.Fatalf("QueryActiveAlarms() error = %v", err) diff --git a/clients/go/mxgateway/galaxy_test.go b/clients/go/mxgateway/galaxy_test.go index bffe014..842a353 100644 --- a/clients/go/mxgateway/galaxy_test.go +++ b/clients/go/mxgateway/galaxy_test.go @@ -55,8 +55,8 @@ func TestGalaxyGetLastDeployTimeReturnsTimestampWhenPresent(t *testing.T) { want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC) fake := &fakeGalaxyServer{ deployReply: &pb.GetLastDeployTimeReply{ - Present: true, - TimeOfLastDeploy: timestamppb.New(want), + Present: true, + TimeOfLastDeploy: timestamppb.New(want), }, } client, cleanup := newGalaxyBufconnClient(t, fake) diff --git a/clients/go/mxgateway/types.go b/clients/go/mxgateway/types.go index 1682907..a856259 100644 --- a/clients/go/mxgateway/types.go +++ b/clients/go/mxgateway/types.go @@ -112,6 +112,11 @@ type ( AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply // QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message. QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest + // StreamAlarmsRequest is the gateway StreamAlarms request message. + StreamAlarmsRequest = pb.StreamAlarmsRequest + // AlarmFeedMessage is one message on the StreamAlarms feed — an + // active-alarm snapshot row, a snapshot-complete sentinel, or a transition. + AlarmFeedMessage = pb.AlarmFeedMessage // ActiveAlarmSnapshot is one row in a ConditionRefresh stream. ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot // OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents. @@ -130,6 +135,10 @@ type AlarmConditionState = pb.AlarmConditionState // QueryActiveAlarms RPC. type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient +// StreamAlarmsClient is the generated server-streaming client for the +// StreamAlarms RPC. +type StreamAlarmsClient = pb.MxAccessGateway_StreamAlarmsClient + // Enumerations from the generated contract re-exported for client callers. type ( // MxCommandKind discriminates which MXAccess command an MxCommand carries.