package mxgateway import ( "context" "errors" "io" "net" "testing" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" ) // Pins the Go SDK surface for the alarm RPCs: AcknowledgeAlarm + StreamAlarms. func TestAcknowledgeAlarmSendsRequestAndReturnsReply(t *testing.T) { fake := &fakeGatewayWithAlarms{ acknowledgeReply: &pb.AcknowledgeAlarmReply{ CorrelationId: "corr-1", ProtocolStatus: &pb.ProtocolStatus{ Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, }, Status: &pb.MxStatusProxy{ Success: 1, Category: pb.MxStatusCategory_MX_STATUS_CATEGORY_OK, }, }, } client, cleanup := newBufconnClientWithAlarms(t, fake) defer cleanup() reply, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{ ClientCorrelationId: "corr-1", AlarmFullReference: "Tank01.Level.HiHi", Comment: "investigating", OperatorUser: "alice", }) if err != nil { t.Fatalf("AcknowledgeAlarm() error = %v", err) } if reply.GetProtocolStatus().GetCode() != pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK { t.Fatalf("protocol status = %v", reply.GetProtocolStatus().GetCode()) } if got := fake.acknowledgeRequest.GetAlarmFullReference(); got != "Tank01.Level.HiHi" { t.Fatalf("captured alarm reference = %q", got) } if got := fake.acknowledgeRequest.GetComment(); got != "investigating" { t.Fatalf("captured comment = %q", got) } if got := fake.acknowledgeAuth; got != "Bearer test-api-key" { t.Fatalf("authorization metadata = %q", got) } } func TestAcknowledgeAlarmRejectsNilRequest(t *testing.T) { fake := &fakeGatewayWithAlarms{} client, cleanup := newBufconnClientWithAlarms(t, fake) defer cleanup() _, err := client.AcknowledgeAlarm(context.Background(), nil) if err == nil { t.Fatalf("AcknowledgeAlarm(nil) returned no error") } } func TestAcknowledgeAlarmMapsUnauthenticated(t *testing.T) { fake := &fakeGatewayWithAlarms{ acknowledgeError: status.Error(codes.Unauthenticated, "expired key"), } client, cleanup := newBufconnClientWithAlarms(t, fake) defer cleanup() _, err := client.AcknowledgeAlarm(context.Background(), &pb.AcknowledgeAlarmRequest{ AlarmFullReference: "Tank01.Level.HiHi", OperatorUser: "alice", }) if err == nil { t.Fatalf("AcknowledgeAlarm() returned no error on Unauthenticated") } var gwErr *GatewayError if !errors.As(err, &gwErr) { t.Fatalf("error %T does not unwrap to *GatewayError", err) } if got, _ := status.FromError(gwErr.Err); got.Code() != codes.Unauthenticated { t.Fatalf("inner status code = %v", got.Code()) } } func TestStreamAlarmsStreamsSnapshotThenSnapshotComplete(t *testing.T) { fake := &fakeGatewayWithAlarms{ activeSnapshots: []*pb.ActiveAlarmSnapshot{ { AlarmFullReference: "Tank01.Level.HiHi", CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE, Severity: 750, }, { AlarmFullReference: "Tank02.Level.HiHi", CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED, Severity: 750, }, }, } client, cleanup := newBufconnClientWithAlarms(t, fake) defer cleanup() stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{}) if err != nil { t.Fatalf("StreamAlarms() error = %v", err) } var received []*pb.AlarmFeedMessage for { msg, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { t.Fatalf("stream.Recv() error = %v", err) } received = append(received, msg) } if len(received) != 3 { t.Fatalf("message count = %d, want 3", len(received)) } if received[0].GetActiveAlarm().GetAlarmFullReference() != "Tank01.Level.HiHi" { t.Fatalf("message[0] ref = %q", received[0].GetActiveAlarm().GetAlarmFullReference()) } if received[1].GetActiveAlarm().GetCurrentState() != pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE_ACKED { t.Fatalf("message[1] state = %v", received[1].GetActiveAlarm().GetCurrentState()) } if !received[2].GetSnapshotComplete() { t.Fatalf("final message is not snapshot_complete") } } func TestStreamAlarmsPassesFilterPrefix(t *testing.T) { fake := &fakeGatewayWithAlarms{} client, cleanup := newBufconnClientWithAlarms(t, fake) defer cleanup() stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{ AlarmFilterPrefix: "Tank01.", }) if err != nil { t.Fatalf("StreamAlarms() error = %v", err) } for { _, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { t.Fatalf("stream.Recv() error = %v", err) } } if got := fake.streamRequest.GetAlarmFilterPrefix(); got != "Tank01." { t.Fatalf("captured filter prefix = %q", got) } } type fakeGatewayWithAlarms struct { pb.UnimplementedMxAccessGatewayServer acknowledgeRequest *pb.AcknowledgeAlarmRequest acknowledgeReply *pb.AcknowledgeAlarmReply acknowledgeError error acknowledgeAuth string streamRequest *pb.StreamAlarmsRequest activeSnapshots []*pb.ActiveAlarmSnapshot } func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.AcknowledgeAlarmRequest) (*pb.AcknowledgeAlarmReply, error) { s.acknowledgeRequest = req s.acknowledgeAuth = authorizationFromContext(ctx) if s.acknowledgeError != nil { return nil, s.acknowledgeError } if s.acknowledgeReply != nil { return s.acknowledgeReply, nil } return &pb.AcknowledgeAlarmReply{ ProtocolStatus: &pb.ProtocolStatus{ Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, }, }, nil } func (s *fakeGatewayWithAlarms) StreamAlarms(req *pb.StreamAlarmsRequest, stream grpc.ServerStreamingServer[pb.AlarmFeedMessage]) error { s.streamRequest = req for _, snap := range s.activeSnapshots { if err := stream.Send(&pb.AlarmFeedMessage{ Payload: &pb.AlarmFeedMessage_ActiveAlarm{ActiveAlarm: snap}, }); err != nil { return err } } return stream.Send(&pb.AlarmFeedMessage{ Payload: &pb.AlarmFeedMessage_SnapshotComplete{SnapshotComplete: true}, }) } func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) { t.Helper() listener := bufconn.Listen(bufSize) server := grpc.NewServer() pb.RegisterMxAccessGatewayServer(server, fake) go func() { _ = server.Serve(listener) }() dialer := func(ctx context.Context, _ string) (net.Conn, error) { return listener.DialContext(ctx) } // grpc.NewClient defaults to the dns scheme; use passthrough so the // bufconn fake target reaches the context dialer unresolved. client, err := Dial(context.Background(), Options{ Endpoint: "passthrough:///bufnet", APIKey: "test-api-key", Plaintext: true, DialOptions: []grpc.DialOption{grpc.WithContextDialer(dialer)}, }) if err != nil { t.Fatalf("Dial() error = %v", err) } return client, func() { client.Close() server.Stop() listener.Close() } }