Go client: port stream-alarms and acknowledge-alarm
Adds the session-less alarm CLI subcommands to mxgw-go. stream-alarms attaches to the gateway's central alarm feed (--filter-prefix, --limit, --json — NDJSON, one AlarmFeedMessage per line); acknowledge-alarm is a unary ack (--reference required, --comment, --operator). StreamAlarms joins QueryActiveAlarms on the public Client and is wired through the existing batch dispatcher via runWithIO. SDK type aliases for StreamAlarmsRequest / AlarmFeedMessage / StreamAlarmsClient land alongside the existing alarm types. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
|
|||||||
return runWrite(ctx, args[1:], stdout, stderr)
|
return runWrite(ctx, args[1:], stdout, stderr)
|
||||||
case "stream-events":
|
case "stream-events":
|
||||||
return runStreamEvents(ctx, args[1:], stdout, stderr)
|
return runStreamEvents(ctx, args[1:], stdout, stderr)
|
||||||
|
case "stream-alarms":
|
||||||
|
return runStreamAlarms(ctx, args[1:], stdout, stderr)
|
||||||
|
case "acknowledge-alarm":
|
||||||
|
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
|
||||||
case "smoke":
|
case "smoke":
|
||||||
return runSmoke(ctx, args[1:], stdout, stderr)
|
return runSmoke(ctx, args[1:], stdout, stderr)
|
||||||
case "galaxy-test-connection":
|
case "galaxy-test-connection":
|
||||||
@@ -796,6 +800,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
|
||||||
|
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, _, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
|
||||||
|
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
|
||||||
|
// than a torn TCP connection) and the deferred client.Close() actually runs.
|
||||||
|
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
||||||
|
defer stopSignals()
|
||||||
|
|
||||||
|
streamCtx, cancelStream := context.WithCancel(signalCtx)
|
||||||
|
defer cancelStream()
|
||||||
|
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for {
|
||||||
|
message, err := stream.Recv()
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *jsonOutput {
|
||||||
|
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
|
||||||
|
} else {
|
||||||
|
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
if *limit > 0 && count >= *limit {
|
||||||
|
cancelStream()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
|
||||||
|
// output style, distinguishing the active-alarm snapshot, snapshot-complete
|
||||||
|
// sentinel, and transition cases of the message's payload oneof.
|
||||||
|
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
|
||||||
|
switch {
|
||||||
|
case message.GetActiveAlarm() != nil:
|
||||||
|
alarm := message.GetActiveAlarm()
|
||||||
|
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
|
||||||
|
case message.GetSnapshotComplete():
|
||||||
|
return "snapshot-complete"
|
||||||
|
case message.GetTransition() != nil:
|
||||||
|
transition := message.GetTransition()
|
||||||
|
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
reference := flags.String("reference", "", "full alarm reference to acknowledge")
|
||||||
|
comment := flags.String("comment", "", "operator acknowledge comment")
|
||||||
|
operator := flags.String("operator", "", "operator user performing the acknowledge")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *reference == "" {
|
||||||
|
return errors.New("reference is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
|
||||||
|
AlarmFullReference: *reference,
|
||||||
|
Comment: *comment,
|
||||||
|
OperatorUser: *operator,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *jsonOutput {
|
||||||
|
return writeJSON(stdout, commandReplyOutput{
|
||||||
|
Command: "acknowledge-alarm",
|
||||||
|
Options: options,
|
||||||
|
Reply: mustMarshalProto(reply),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintln(stdout, reply.GetHresult())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
|
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
|
||||||
flags.SetOutput(stderr)
|
flags.SetOutput(stderr)
|
||||||
@@ -1064,7 +1181,7 @@ type protojsonMessage interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func writeUsage(writer io.Writer) {
|
func writeUsage(writer io.Writer) {
|
||||||
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
||||||
}
|
}
|
||||||
|
|
||||||
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
||||||
|
|||||||
@@ -51,3 +51,26 @@ func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRe
|
|||||||
|
|
||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamAlarms attaches to the gateway's central alarm feed. The stream opens
|
||||||
|
// with one AlarmFeedMessage per currently-active alarm (the ConditionRefresh
|
||||||
|
// snapshot), then a single snapshot-complete sentinel, then a transition for
|
||||||
|
// every subsequent raise / acknowledge / clear. It is served by the gateway's
|
||||||
|
// always-on alarm monitor — no worker session is opened — so any number of
|
||||||
|
// clients may attach.
|
||||||
|
//
|
||||||
|
// The returned stream is owned by the caller; cancel ctx to release it.
|
||||||
|
// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the
|
||||||
|
// stream to a sub-tree.
|
||||||
|
func (c *Client) StreamAlarms(ctx context.Context, req *StreamAlarmsRequest) (StreamAlarmsClient, error) {
|
||||||
|
if req == nil {
|
||||||
|
return nil, errors.New("mxgateway: stream alarms request is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := c.raw.StreamAlarms(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &GatewayError{Op: "stream alarms", Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -147,8 +147,8 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) {
|
|||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
||||||
SessionId: "session-1",
|
SessionId: "session-1",
|
||||||
AlarmFilterPrefix: "Tank01.",
|
AlarmFilterPrefix: "Tank01.",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
||||||
|
|||||||
@@ -55,8 +55,8 @@ func TestGalaxyGetLastDeployTimeReturnsTimestampWhenPresent(t *testing.T) {
|
|||||||
want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC)
|
want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC)
|
||||||
fake := &fakeGalaxyServer{
|
fake := &fakeGalaxyServer{
|
||||||
deployReply: &pb.GetLastDeployTimeReply{
|
deployReply: &pb.GetLastDeployTimeReply{
|
||||||
Present: true,
|
Present: true,
|
||||||
TimeOfLastDeploy: timestamppb.New(want),
|
TimeOfLastDeploy: timestamppb.New(want),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
client, cleanup := newGalaxyBufconnClient(t, fake)
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
||||||
|
|||||||
@@ -112,6 +112,11 @@ type (
|
|||||||
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
||||||
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
||||||
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
||||||
|
// StreamAlarmsRequest is the gateway StreamAlarms request message.
|
||||||
|
StreamAlarmsRequest = pb.StreamAlarmsRequest
|
||||||
|
// AlarmFeedMessage is one message on the StreamAlarms feed — an
|
||||||
|
// active-alarm snapshot row, a snapshot-complete sentinel, or a transition.
|
||||||
|
AlarmFeedMessage = pb.AlarmFeedMessage
|
||||||
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
||||||
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
||||||
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
||||||
@@ -130,6 +135,10 @@ type AlarmConditionState = pb.AlarmConditionState
|
|||||||
// QueryActiveAlarms RPC.
|
// QueryActiveAlarms RPC.
|
||||||
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
||||||
|
|
||||||
|
// StreamAlarmsClient is the generated server-streaming client for the
|
||||||
|
// StreamAlarms RPC.
|
||||||
|
type StreamAlarmsClient = pb.MxAccessGateway_StreamAlarmsClient
|
||||||
|
|
||||||
// Enumerations from the generated contract re-exported for client callers.
|
// Enumerations from the generated contract re-exported for client callers.
|
||||||
type (
|
type (
|
||||||
// MxCommandKind discriminates which MXAccess command an MxCommand carries.
|
// MxCommandKind discriminates which MXAccess command an MxCommand carries.
|
||||||
|
|||||||
Reference in New Issue
Block a user