Changeset - 591effa1fc4d
[Not reviewed]
default
0 2 0
Mads Kiilerich - 9 years ago 2016-09-12 17:41:20
madski@unity3d.com
api: drop the old Api auth methods and use the normal methods for access control
2 files changed with 46 insertions and 124 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -33,8 +33,8 @@ from sqlalchemy import or_
 
from kallithea.controllers.api import JSONRPCController, JSONRPCError
 
from kallithea.lib.auth import (
 
    PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
 
    HasPermissionAnyDecorator, HasPermissionAnyApi, HasRepoPermissionAnyApi,
 
    HasRepoGroupPermissionAnyApi, HasUserGroupPermissionAny)
 
    HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionAny,
 
    HasRepoGroupPermissionAny, HasUserGroupPermissionAny)
 
from kallithea.lib.utils import map_groups, repo2db_mapper
 
from kallithea.lib.utils2 import (
 
    str2bool, time_to_datetime, safe_int, Optional, OAttr)
 
@@ -274,10 +274,10 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin',
 
                                           'repository.write')(
 
            if not HasRepoPermissionAny('repository.admin',
 
                                        'repository.write')(
 
                    repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
@@ -338,10 +338,10 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if HasPermissionAnyApi('hg.admin')():
 
        if HasPermissionAny('hg.admin')():
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write')(repo_name=repo.repo_name):
 
        elif HasRepoPermissionAny('repository.admin',
 
                                  'repository.write')(repo_name=repo.repo_name):
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != self.authuser.user_id:
 
@@ -428,7 +428,7 @@ class ApiController(JSONRPCController):
 
          error :  null
 
        """
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != self.authuser.user_id:
 
@@ -556,7 +556,7 @@ class ApiController(JSONRPCController):
 
            error:  null
 

	
 
        """
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != self.authuser.user_id:
 
@@ -821,7 +821,7 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
@@ -950,7 +950,7 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
@@ -1007,7 +1007,7 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
@@ -1066,7 +1066,7 @@ class ApiController(JSONRPCController):
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
@@ -1118,7 +1118,7 @@ class ApiController(JSONRPCController):
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
@@ -1201,10 +1201,10 @@ class ApiController(JSONRPCController):
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(repo_name=repo.repo_name):
 
            if not HasRepoPermissionAny(*perms)(repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        members = []
 
@@ -1269,7 +1269,7 @@ class ApiController(JSONRPCController):
 
            error:  null
 
        """
 
        result = []
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            repos = RepoModel().get_all_user_repos(user=self.authuser.user_id)
 
        else:
 
            repos = Repository.get_all()
 
@@ -1311,10 +1311,10 @@ class ApiController(JSONRPCController):
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(repo_name=repo.repo_name):
 
            if not HasRepoPermissionAny(*perms)(repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        ret_type = Optional.extract(ret_type)
 
@@ -1397,7 +1397,7 @@ class ApiController(JSONRPCController):
 
          }
 

	
 
        """
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
@@ -1489,13 +1489,13 @@ class ApiController(JSONRPCController):
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(repo_name=repo.repo_name):
 
            if not HasRepoPermissionAny('repository.admin')(repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAnyApi('hg.create.repository')()
 
                not HasPermissionAny('hg.create.repository')()
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
@@ -1586,18 +1586,18 @@ class ApiController(JSONRPCController):
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAnyApi('hg.admin')():
 
        if HasPermissionAny('hg.admin')():
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write',
 
                                     'repository.read')(repo_name=repo.repo_name):
 
        elif HasRepoPermissionAny('repository.admin',
 
                                  'repository.write',
 
                                  'repository.read')(repo_name=repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAnyApi('hg.create.repository')():
 
            if not HasPermissionAny('hg.create.repository')():
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
@@ -1666,9 +1666,9 @@ class ApiController(JSONRPCController):
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(repo_name=repo.repo_name):
 
            if not HasRepoPermissionAny('repository.admin')(repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
@@ -1818,10 +1818,10 @@ class ApiController(JSONRPCController):
 
        repo = get_repo_or_error(repoid)
 
        perm = get_perm_or_error(perm)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
            if not HasRepoPermissionAny(*_perms)(
 
                    repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
@@ -1874,10 +1874,10 @@ class ApiController(JSONRPCController):
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
            if not HasRepoPermissionAny(*_perms)(
 
                    repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
@@ -2126,9 +2126,9 @@ class ApiController(JSONRPCController):
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAnyApi('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
@@ -2190,9 +2190,9 @@ class ApiController(JSONRPCController):
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAnyApi('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
@@ -2258,10 +2258,10 @@ class ApiController(JSONRPCController):
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAnyApi(*_perms)(
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 
@@ -2334,10 +2334,10 @@ class ApiController(JSONRPCController):
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAnyApi(*_perms)(
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 
@@ -2379,7 +2379,7 @@ class ApiController(JSONRPCController):
 
        :type gistid: str
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.gist_owner != self.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
        return gist.get_api_data()
 
@@ -2392,7 +2392,7 @@ class ApiController(JSONRPCController):
 
        :param userid: user to get gists for
 
        :type userid: Optional(str or int)
 
        """
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != self.authuser.user_id:
 
@@ -2508,7 +2508,7 @@ class ApiController(JSONRPCController):
 

	
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAnyApi('hg.admin')():
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.gist_owner != self.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 

	
kallithea/lib/auth.py
Show inline comments
 
@@ -1065,84 +1065,6 @@ class HasPermissionAnyMiddleware(object)
 
        return False
 

	
 

	
 
#==============================================================================
 
# SPECIAL VERSION TO HANDLE API AUTH
 
#==============================================================================
 
class _BaseApiPerm(object):
 
    def __init__(self, *perms):
 
        self.required_perms = set(perms)
 

	
 
    def __call__(self, check_location=None, repo_name=None, group_name=None):
 
        user = request.user
 
        assert user
 
        assert isinstance(user, AuthUser), user
 

	
 
        cls_name = self.__class__.__name__
 
        check_scope = 'user:%s' % (user)
 
        if repo_name:
 
            check_scope += ', repo:%s' % (repo_name)
 

	
 
        if group_name:
 
            check_scope += ', repo group:%s' % (group_name)
 

	
 
        log.debug('checking cls:%s %s %s @ %s',
 
                  cls_name, self.required_perms, check_scope, check_location)
 

	
 
        ## process user
 
        if not check_location:
 
            check_location = 'unspecified'
 
        if self.check_permissions(user.permissions, repo_name, group_name):
 
            log.debug('Permission to %s granted for user: %s @ %s',
 
                      check_scope, user, check_location)
 
            return True
 

	
 
        else:
 
            log.debug('Permission to %s denied for user: %s @ %s',
 
                      check_scope, user, check_location)
 
            return False
 

	
 
    def check_permissions(self, perm_defs, repo_name=None, group_name=None):
 
        """
 
        implement in child class should return True if permissions are ok,
 
        False otherwise
 

	
 
        :param perm_defs: dict with permission definitions
 
        :param repo_name: repo name
 
        """
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAnyApi(_BaseApiPerm):
 
    def check_permissions(self, perm_defs, repo_name=None, group_name=None):
 
        if self.required_perms.intersection(perm_defs.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAnyApi(_BaseApiPerm):
 
    def check_permissions(self, perm_defs, repo_name=None, group_name=None):
 
        try:
 
            _user_perms = set([perm_defs['repositories'][repo_name]])
 
        except KeyError:
 
            log.warning(traceback.format_exc())
 
            return False
 
        if self.required_perms.intersection(_user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoGroupPermissionAnyApi(_BaseApiPerm):
 
    def check_permissions(self, perm_defs, repo_name=None, group_name=None):
 
        try:
 
            _user_perms = set([perm_defs['repositories_groups'][group_name]])
 
        except KeyError:
 
            log.warning(traceback.format_exc())
 
            return False
 
        if self.required_perms.intersection(_user_perms):
 
            return True
 
        return False
 

	
 

	
 
def check_ip_access(source_ip, allowed_ips=None):
 
    """
 
    Checks if source_ip is a subnet of any of allowed_ips.
0 comments (0 inline, 0 general)