Changeset - e1baadec6217
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 13 years ago 2013-01-16 00:20:45
marcin@python-works.com
fixes issue #702 API methods without arguments fail when "args":null
2 files changed with 32 insertions and 0 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/api/__init__.py
Show inline comments
 
@@ -87,176 +87,180 @@ class JSONRPCController(WSGIController):
 
    def _get_ip_addr(self, environ):
 
        return _get_ip_addr(environ)
 

	
 
    def _get_method_args(self):
 
        """
 
        Return `self._rpc_args` to dispatched controller method
 
        chosen by __call__
 
        """
 
        return self._rpc_args
 

	
 
    def __call__(self, environ, start_response):
 
        """
 
        Parse the request body as JSON, look up the method on the
 
        controller and if it exists, dispatch to it.
 
        """
 
        start = time.time()
 
        ip_addr = self.ip_addr = self._get_ip_addr(environ)
 
        self._req_id = None
 
        if 'CONTENT_LENGTH' not in environ:
 
            log.debug("No Content-Length")
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message="No Content-Length in request")
 
        else:
 
            length = environ['CONTENT_LENGTH'] or 0
 
            length = int(environ['CONTENT_LENGTH'])
 
            log.debug('Content-Length: %s' % length)
 

	
 
        if length == 0:
 
            log.debug("Content-Length is 0")
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message="Content-Length is 0")
 

	
 
        raw_body = environ['wsgi.input'].read(length)
 

	
 
        try:
 
            json_body = json.loads(urllib.unquote_plus(raw_body))
 
        except ValueError, e:
 
            # catch JSON errors Here
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message="JSON parse error ERR:%s RAW:%r" \
 
                                 % (e, urllib.unquote_plus(raw_body)))
 

	
 
        # check AUTH based on API KEY
 
        try:
 
            self._req_api_key = json_body['api_key']
 
            self._req_id = json_body['id']
 
            self._req_method = json_body['method']
 
            self._request_params = json_body['args']
 
            if not isinstance(self._request_params, dict):
 
                self._request_params = {}
 

	
 
            log.debug(
 
                'method: %s, params: %s' % (self._req_method,
 
                                            self._request_params)
 
            )
 
        except KeyError, e:
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message='Incorrect JSON query missing %s' % e)
 

	
 
        # check if we can find this session using api_key
 
        try:
 
            u = User.get_by_api_key(self._req_api_key)
 
            if u is None:
 
                return jsonrpc_error(retid=self._req_id,
 
                                     message='Invalid API KEY')
 

	
 
            #check if we are allowed to use this IP
 
            auth_u = AuthUser(u.user_id, self._req_api_key, ip_addr=ip_addr)
 
            if not auth_u.ip_allowed:
 
                return jsonrpc_error(retid=self._req_id,
 
                        message='request from IP:%s not allowed' % (ip_addr))
 
            else:
 
                log.info('Access for IP:%s allowed' % (ip_addr))
 

	
 
        except Exception, e:
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message='Invalid API KEY')
 

	
 
        self._error = None
 
        try:
 
            self._func = self._find_method()
 
        except AttributeError, e:
 
            return jsonrpc_error(retid=self._req_id,
 
                                 message=str(e))
 

	
 
        # now that we have a method, add self._req_params to
 
        # self.kargs and dispatch control to WGIController
 
        argspec = inspect.getargspec(self._func)
 
        arglist = argspec[0][1:]
 
        defaults = map(type, argspec[3] or [])
 
        default_empty = types.NotImplementedType
 

	
 
        # kw arguments required by this method
 
        func_kwargs = dict(izip_longest(reversed(arglist), reversed(defaults),
 
                                        fillvalue=default_empty))
 

	
 
        # this is little trick to inject logged in user for
 
        # perms decorators to work they expect the controller class to have
 
        # rhodecode_user attribute set
 
        self.rhodecode_user = auth_u
 

	
 
        # This attribute will need to be first param of a method that uses
 
        # api_key, which is translated to instance of user at that name
 
        USER_SESSION_ATTR = 'apiuser'
 

	
 
        if USER_SESSION_ATTR not in arglist:
 
            return jsonrpc_error(
 
                retid=self._req_id,
 
                message='This method [%s] does not support '
 
                         'authentication (missing %s param)' % (
 
                                    self._func.__name__, USER_SESSION_ATTR)
 
            )
 

	
 
        # get our arglist and check if we provided them as args
 
        for arg, default in func_kwargs.iteritems():
 
            if arg == USER_SESSION_ATTR:
 
                # USER_SESSION_ATTR is something translated from api key and
 
                # this is checked before so we don't need validate it
 
                continue
 

	
 
            # skip the required param check if it's default value is
 
            # NotImplementedType (default_empty)
 
            if (default == default_empty and arg not in self._request_params):
 
                return jsonrpc_error(
 
                    retid=self._req_id,
 
                    message=(
 
                        'Missing non optional `%s` arg in JSON DATA' % arg
 
                    )
 
                )
 

	
 
        self._rpc_args = {USER_SESSION_ATTR: u}
 

	
 
        self._rpc_args.update(self._request_params)
 

	
 
        self._rpc_args['action'] = self._req_method
 
        self._rpc_args['environ'] = environ
 
        self._rpc_args['start_response'] = start_response
 

	
 
        status = []
 
        headers = []
 
        exc_info = []
 

	
 
        def change_content(new_status, new_headers, new_exc_info=None):
 
            status.append(new_status)
 
            headers.extend(new_headers)
 
            exc_info.append(new_exc_info)
 

	
 
        output = WSGIController.__call__(self, environ, change_content)
 
        output = list(output)
 
        headers.append(('Content-Length', str(len(output[0]))))
 
        replace_header(headers, 'Content-Type', 'application/json')
 
        start_response(status[0], headers, exc_info[0])
 
        log.info('IP: %s Request to %s time: %.3fs' % (
 
            _get_ip_addr(environ),
 
            safe_unicode(_get_access_path(environ)), time.time() - start)
 
        )
 
        return output
 

	
 
    def _dispatch_call(self):
 
        """
 
        Implement dispatch interface specified by WSGIController
 
        """
 
        try:
 
            raw_response = self._inspect_call(self._func)
 
            if isinstance(raw_response, HTTPError):
 
                self._error = str(raw_response)
 
        except JSONRPCError, e:
 
            self._error = str(e)
 
        except Exception, e:
 
            log.error('Encountered unhandled exception: %s' \
 
                      % traceback.format_exc())
 
            json_exc = JSONRPCError('Internal server error')
 
            self._error = str(json_exc)
 

	
 
        if self._error is not None:
 
            raw_response = None
 

	
 
        response = dict(id=self._req_id, result=raw_response,
 
                        error=self._error)
 

	
rhodecode/tests/api/api_base.py
Show inline comments
 
@@ -110,96 +110,124 @@ class BaseTestApi(object):
 

	
 
    def setUp(self):
 
        self.maxDiff = None
 
        make_users_group()
 

	
 
    def tearDown(self):
 
        destroy_users_group()
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
#    def test_Optional(self):
 
#        from rhodecode.controllers.api.api import Optional
 
#        option1 = Optional(None)
 
#        self.assertEqual('<Optional:%s>' % None, repr(option1))
 
#
 
#        self.assertEqual(1, Optional.extract(Optional(1)))
 
#        self.assertEqual('trololo', Optional.extract('trololo'))
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API KEY'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users',)
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users',)
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users',)
 
        response = api_call(self, params)
 
        ret_all = []
 
        for usr in UserModel().get_all():
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
0 comments (0 inline, 0 general)