Point the Go client at the StreamAlarms alarm feed
Regenerate the Go protobuf stubs and replace the session-scoped QueryActiveAlarms surface with the session-less StreamAlarms feed: snapshot-then-live AlarmFeedMessage fan-out served by the gateway's central alarm monitor. Drops session_id from the acknowledge surface. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -31,22 +31,24 @@ func (c *Client) AcknowledgeAlarm(ctx context.Context, req *AcknowledgeAlarmRequ
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
// QueryActiveAlarms streams a snapshot of all alarms currently Active or
|
||||
// ActiveAcked — the gateway's ConditionRefresh equivalent. Used after reconnect
|
||||
// to seed local Part 9 state, or to reconcile alarms that may have been missed
|
||||
// during a transport blip.
|
||||
// StreamAlarms attaches to the gateway's central alarm feed. The stream opens
|
||||
// with one AlarmFeedMessage per currently-active alarm (the ConditionRefresh
|
||||
// snapshot), then a single snapshot-complete sentinel, then a transition for
|
||||
// every subsequent raise / acknowledge / clear. It is served by the gateway's
|
||||
// always-on alarm monitor — no worker session is opened — so any number of
|
||||
// clients may attach.
|
||||
//
|
||||
// The returned stream is owned by the caller; cancel ctx to release it.
|
||||
// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the
|
||||
// stream to a sub-tree.
|
||||
func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRequest) (QueryActiveAlarmsClient, error) {
|
||||
func (c *Client) StreamAlarms(ctx context.Context, req *StreamAlarmsRequest) (StreamAlarmsClient, error) {
|
||||
if req == nil {
|
||||
return nil, errors.New("mxgateway: query active alarms request is required")
|
||||
return nil, errors.New("mxgateway: stream alarms request is required")
|
||||
}
|
||||
|
||||
stream, err := c.raw.QueryActiveAlarms(ctx, req)
|
||||
stream, err := c.raw.StreamAlarms(ctx, req)
|
||||
if err != nil {
|
||||
return nil, &GatewayError{Op: "query active alarms", Err: err}
|
||||
return nil, &GatewayError{Op: "stream alarms", Err: err}
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
|
||||
@@ -14,13 +14,11 @@ import (
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
)
|
||||
|
||||
// PR E.4 — pins the Go SDK surface for the new alarm RPCs:
|
||||
// AcknowledgeAlarm + QueryActiveAlarms.
|
||||
// Pins the Go SDK surface for the alarm RPCs: AcknowledgeAlarm + StreamAlarms.
|
||||
|
||||
func TestAcknowledgeAlarmSendsRequestAndReturnsReply(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
acknowledgeReply: &pb.AcknowledgeAlarmReply{
|
||||
SessionId: "session-1",
|
||||
CorrelationId: "corr-1",
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
@@ -35,7 +33,6 @@ func TestAcknowledgeAlarmSendsRequestAndReturnsReply(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
reply, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{
|
||||
SessionId: "session-1",
|
||||
ClientCorrelationId: "corr-1",
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
Comment: "investigating",
|
||||
@@ -77,7 +74,6 @@ func TestAcknowledgeAlarmMapsUnauthenticated(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
_, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{
|
||||
SessionId: "session-1",
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
OperatorUser: "alice",
|
||||
})
|
||||
@@ -93,7 +89,7 @@ func TestAcknowledgeAlarmMapsUnauthenticated(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryActiveAlarmsStreamsSnapshots(t *testing.T) {
|
||||
func TestStreamAlarmsStreamsSnapshotThenSnapshotComplete(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
activeSnapshots: []*pb.ActiveAlarmSnapshot{
|
||||
{
|
||||
@@ -111,46 +107,46 @@ func TestQueryActiveAlarmsStreamsSnapshots(t *testing.T) {
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
||||
SessionId: "session-1",
|
||||
})
|
||||
stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{})
|
||||
if err != nil {
|
||||
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
||||
t.Fatalf("StreamAlarms() error = %v", err)
|
||||
}
|
||||
|
||||
var received []*pb.ActiveAlarmSnapshot
|
||||
var received []*pb.AlarmFeedMessage
|
||||
for {
|
||||
snap, err := stream.Recv()
|
||||
msg, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("stream.Recv() error = %v", err)
|
||||
}
|
||||
received = append(received, snap)
|
||||
received = append(received, msg)
|
||||
}
|
||||
if len(received) != 2 {
|
||||
t.Fatalf("snapshot count = %d, want 2", len(received))
|
||||
if len(received) != 3 {
|
||||
t.Fatalf("message count = %d, want 3", len(received))
|
||||
}
|
||||
if received[0].GetAlarmFullReference() != "Tank01.Level.HiHi" {
|
||||
t.Fatalf("snapshot[0] ref = %q", received[0].GetAlarmFullReference())
|
||||
if received[0].GetActiveAlarm().GetAlarmFullReference() != "Tank01.Level.HiHi" {
|
||||
t.Fatalf("message[0] ref = %q", received[0].GetActiveAlarm().GetAlarmFullReference())
|
||||
}
|
||||
if received[1].GetCurrentState() != pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED {
|
||||
t.Fatalf("snapshot[1] state = %v", received[1].GetCurrentState())
|
||||
if received[1].GetActiveAlarm().GetCurrentState() != pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED {
|
||||
t.Fatalf("message[1] state = %v", received[1].GetActiveAlarm().GetCurrentState())
|
||||
}
|
||||
if !received[2].GetSnapshotComplete() {
|
||||
t.Fatalf("final message is not snapshot_complete")
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) {
|
||||
func TestStreamAlarmsPassesFilterPrefix(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{
|
||||
SessionId: "session-1",
|
||||
stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{
|
||||
AlarmFilterPrefix: "Tank01.",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("QueryActiveAlarms() error = %v", err)
|
||||
t.Fatalf("StreamAlarms() error = %v", err)
|
||||
}
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
@@ -162,7 +158,7 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if got := fake.queryRequest.GetAlarmFilterPrefix(); got != "Tank01." {
|
||||
if got := fake.streamRequest.GetAlarmFilterPrefix(); got != "Tank01." {
|
||||
t.Fatalf("captured filter prefix = %q", got)
|
||||
}
|
||||
}
|
||||
@@ -175,7 +171,7 @@ type fakeGatewayWithAlarms struct {
|
||||
acknowledgeError error
|
||||
acknowledgeAuth string
|
||||
|
||||
queryRequest *pb.QueryActiveAlarmsRequest
|
||||
streamRequest *pb.StreamAlarmsRequest
|
||||
activeSnapshots []*pb.ActiveAlarmSnapshot
|
||||
}
|
||||
|
||||
@@ -189,21 +185,24 @@ func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.Ac
|
||||
return s.acknowledgeReply, nil
|
||||
}
|
||||
return &pb.AcknowledgeAlarmReply{
|
||||
SessionId: req.GetSessionId(),
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *fakeGatewayWithAlarms) QueryActiveAlarms(req *pb.QueryActiveAlarmsRequest, stream grpc.ServerStreamingServer[pb.ActiveAlarmSnapshot]) error {
|
||||
s.queryRequest = req
|
||||
func (s *fakeGatewayWithAlarms) StreamAlarms(req *pb.StreamAlarmsRequest, stream grpc.ServerStreamingServer[pb.AlarmFeedMessage]) error {
|
||||
s.streamRequest = req
|
||||
for _, snap := range s.activeSnapshots {
|
||||
if err := stream.Send(snap); err != nil {
|
||||
if err := stream.Send(&pb.AlarmFeedMessage{
|
||||
Payload: &pb.AlarmFeedMessage_ActiveAlarm{ActiveAlarm: snap},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return stream.Send(&pb.AlarmFeedMessage{
|
||||
Payload: &pb.AlarmFeedMessage_SnapshotComplete{SnapshotComplete: true},
|
||||
})
|
||||
}
|
||||
|
||||
func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) {
|
||||
|
||||
@@ -110,9 +110,12 @@ type (
|
||||
AcknowledgeAlarmRequest = pb.AcknowledgeAlarmRequest
|
||||
// AcknowledgeAlarmReply is the gateway AcknowledgeAlarm reply message.
|
||||
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
||||
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
||||
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
||||
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
||||
// StreamAlarmsRequest is the gateway StreamAlarms request message.
|
||||
StreamAlarmsRequest = pb.StreamAlarmsRequest
|
||||
// AlarmFeedMessage is one message on the StreamAlarms feed — an
|
||||
// active-alarm snapshot row, a snapshot-complete sentinel, or a transition.
|
||||
AlarmFeedMessage = pb.AlarmFeedMessage
|
||||
// ActiveAlarmSnapshot is one currently-active alarm in the feed snapshot.
|
||||
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
||||
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
||||
OnAlarmTransitionEvent = pb.OnAlarmTransitionEvent
|
||||
@@ -126,9 +129,9 @@ type AlarmTransitionKind = pb.AlarmTransitionKind
|
||||
// ConditionRefresh snapshot.
|
||||
type AlarmConditionState = pb.AlarmConditionState
|
||||
|
||||
// QueryActiveAlarmsClient is the generated server-streaming client for the
|
||||
// QueryActiveAlarms RPC.
|
||||
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
||||
// StreamAlarmsClient is the generated server-streaming client for the
|
||||
// StreamAlarms RPC.
|
||||
type StreamAlarmsClient = pb.MxAccessGateway_StreamAlarmsClient
|
||||
|
||||
// Enumerations from the generated contract re-exported for client callers.
|
||||
type (
|
||||
|
||||
Reference in New Issue
Block a user